Skip to main content

TDengine Go Client Library

driver-go is the official Go language client library for TDengine. It implements the database/sql package, the generic Go language interface to SQL databases. Go developers can use it to develop applications that access TDengine cluster data.

Connection types

driver-go provides 3 connection types.

  • Native connection, which connects to TDengine instances natively through the TDengine client driver (taosc), supporting data writing, querying, subscriptions, schemaless writing, and bind interface.
  • REST connection, which is implemented through taosAdapter. Some features like schemaless and subscriptions are not supported.
  • Websocket connection which is implemented through taosAdapter. The set of features implemented by the WebSocket connection differs slightly from those implemented by the native connection.

For a detailed introduction of the connection types, please refer to: Establish Connection

Compatibility

Supports minimum Go version 1.14, it is recommended to use the latest Go version

Supported platforms

Native connections are supported on the same platforms as the TDengine client driver. REST connections are supported on all platforms that can run Go.

Version support

Please refer to version support list

Handling exceptions

If it is a TDengine error, you can get the error code and error information in the following ways.

// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}

TDengine DataType vs. Go DataType

TDengine DataTypeGo Type
TIMESTAMPtime.Time
TINYINTint8
SMALLINTint16
INTint32
BIGINTint64
TINYINT UNSIGNEDuint8
SMALLINT UNSIGNEDuint16
INT UNSIGNEDuint32
BIGINT UNSIGNEDuint64
FLOATfloat32
DOUBLEfloat64
BOOLbool
BINARYstring
NCHARstring
JSON[]byte

Note: Only TAG supports JSON types

Installation Steps

Pre-installation preparation

  • Install Go development environment (Go 1.14 and above, GCC 4.8.5 and above)
  • If you use the native connector, please install the TDengine client driver. Please refer to Install Client Driver for specific steps

Configure the environment variables and check the command.

  • go env
  • gcc -v

Install the client library

  1. Initialize the project with the go mod command.

    go mod init taos-demo
  2. Introduce taosSql

    import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosSql"
    )
  3. Update the dependency packages with go mod tidy.

    go mod tidy
  4. Run the program with go run taos-demo or compile the binary with the go build command.

go run taos-demo
go build

Establishing a connection

Data source names have a standard format, e.g. PEAR DB, but no type prefix (square brackets indicate optionally):

[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]

DSN in full form.

username:password@protocol(address)/dbname?param=value

taosRestful implements Go's database/sql/driver interface via http client. You can use the database/sql interface by simply introducing the driver.

Use taosRestful as driverName and use a correct DSN as dataSourceName with the following parameters supported by the DSN.

  • disableCompression whether to accept compressed data, default is true do not accept compressed data, set to false if transferring data using gzip compression.
  • readBufferSize The default size of the buffer for reading data is 4K (4096), which can be adjusted upwards when the query result has a lot of data.

For example:

package main

import (
"database/sql"
"fmt"

_ "github.com/taosdata/driver-go/v3/taosRestful"
)

func main() {
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
}

Specify the URL and Properties to get the connection

The Go client library does not support this feature

Priority of configuration parameters

The Go client library does not support this feature

Usage examples

Create database and tables

_, err = taos.Exec("CREATE DATABASE if not exists power")
if err != nil {
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("failed to create stable, err:", err)
}

view source code

Insert data

affected, err := taos.Exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ")
if err != nil {
log.Fatalln("failed to insert data, err:", err)
}
log.Println("affected rows:", affected)

view source code

Querying data

rows, err := taos.Query("SELECT * FROM power.meters")
if err != nil {
log.Fatalln("failed to select from table, err:", err)
}

defer rows.Close()
for rows.Next() {
var (
ts time.Time
current float32
voltage int
phase float32
groupId int
location string
)
err := rows.Scan(&ts, &current, &voltage, &phase, &groupId, &location)
if err != nil {
log.Fatalln("scan error:\n", err)
return
}
log.Println(ts, current, voltage, phase, groupId, location)
}

view source code

execute SQL with reqId

The reqId is very similar to TraceID in distributed tracing systems. In a distributed system, a request may need to pass through multiple services or modules to be completed. The reqId is used to identify and associate all related operations of this request, allowing us to track and understand the complete execution path of the request. Here are some primary usage of reqId:

  • Request Tracing: By associating the same reqId with all related operations of a request, we can trace the complete path of the request within the system.
  • Performance Analysis: By analyzing a request's reqId, we can understand the processing time of the request across various services or modules, thereby identifying performance bottlenecks.
  • Fault Diagnosis: When a request fails, we can identify the location of the issue by examining the reqId associated with that request.

If the user does not set a reqId, the client library will generate one randomly internally, but it is still recommended for the user to set it, as it can better associate with the user's request.

ctx := context.WithValue(context.Background(), common.ReqIDKey, common.GetReqID())
_, err = taos.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("failed to create database, err:", err)
}

view source code

Writing data via parameter binding

package main

import (
"fmt"
"strconv"
"time"

"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)

const (
NumOfSubTable = 10
NumOfRow = 10
)

func main() {
prepare()
db, err := af.Open("", "root", "taosdata", "power", 0)
if err != nil {
panic(err)
}
defer db.Close()
stmt := db.InsertStmt()
defer stmt.Close()
err = stmt.Prepare("INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)")
if err != nil {
panic(err)
}
for i := 1; i <= NumOfSubTable; i++ {
tags := param.NewParam(2).AddInt(i).AddBinary([]byte("location"))
err = stmt.SetTableNameWithTags("d_bind_"+strconv.Itoa(i), tags)
if err != nil {
panic(err)
}
now := time.Now()
params := make([]*param.Param, 4)
params[0] = param.NewParam(NumOfRow)
params[1] = param.NewParam(NumOfRow)
params[2] = param.NewParam(NumOfRow)
params[3] = param.NewParam(NumOfRow)
for i := 0; i < NumOfRow; i++ {
params[0].SetTimestamp(i, now.Add(time.Duration(i)*time.Second), common.PrecisionMilliSecond)
params[1].SetFloat(i, float32(i))
params[2].SetInt(i, i)
params[3].SetFloat(i, float32(i))
}
paramTypes := param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
err = stmt.BindParam(params, paramTypes)
if err != nil {
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Execute()
if err != nil {
panic(err)
}
affected := stmt.GetAffectedRows()
fmt.Println("affected rows:", affected)
}
}

func prepare() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
panic(err)
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
panic(err)
}
}

view source code

Schemaless Writing

package main

import (
"fmt"

"github.com/taosdata/driver-go/v3/af"
)

const LineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"

const TelnetDemo = "stb0_0 1707095283260 4 host=host0 interface=eth0"

const JsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"

func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
}
defer conn.Close()
_, err = conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
panic(err)
}
_, err = conn.Exec("use power")
if err != nil {
panic(err)
}
err = conn.InfluxDBInsertLines([]string{LineDemo}, "ns")
if err != nil {
panic(err)
}
err = conn.OpenTSDBInsertTelnetLines([]string{TelnetDemo})
if err != nil {
panic(err)
}
err = conn.OpenTSDBInsertJsonPayload(JsonDemo)
if err != nil {
panic(err)
}
}

view source code

Schemaless with reqId

func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error

You can get the unique id by common.GetReqID().

Data Subscription

The TDengine Go client library supports subscription functionality with the following application API.

Create a Topic

_, err = db.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
panic(err)
}

view source code

Create a Consumer

consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}

view source code

Subscribe to consume data

go func() {
for {
_, err = db.Exec("insert into power.d001 values (now, 1.1, 220, 0.1)")
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 100)
}
}()

err = consumer.Subscribe("topic_meters", nil)
if err != nil {
panic(err)
}

for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}

view source code

Assignment subscription Offset

partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}

view source code

Close subscriptions

err = consumer.Close()
if err != nil {
panic(err)
}

view source code

Full Sample Code

package main

import (
"fmt"
"os"
"time"

"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)

func main() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists power WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
panic(err)
}
_, err = db.Exec("create table if not exists power.d001 using power.meters tags(1,'location')")
if err != nil {
panic(err)
}
// ANCHOR: create_topic
_, err = db.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
panic(err)
}
// ANCHOR_END: create_topic
// ANCHOR: create_consumer
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
// ANCHOR_END: create_consumer
// ANCHOR: poll_data
go func() {
for {
_, err = db.Exec("insert into power.d001 values (now, 1.1, 220, 0.1)")
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 100)
}
}()

err = consumer.Subscribe("topic_meters", nil)
if err != nil {
panic(err)
}

for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
// ANCHOR_END: poll_data
// ANCHOR: consumer_seek
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}
// ANCHOR_END: consumer_seek
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
}
// ANCHOR: consumer_close
err = consumer.Close()
if err != nil {
panic(err)
}
// ANCHOR_END: consumer_close
}

view source code

More sample programs

Frequently Asked Questions

  1. bind interface in database/sql crashes

    REST does not support parameter binding related interface. It is recommended to use db.Exec and db.Query.

  2. error [0x217] Database not specified or available after executing other statements with use db statement

    The execution of SQL command in the REST interface is not contextual, so using use db statement will not work, see the usage restrictions section above.

  3. use taosSql without error but use taosRestful with error [0x217] Database not specified or available

    Because the REST interface is stateless, using the use db statement will not take effect. See the usage restrictions section above.

  4. readBufferSize parameter has no significant effect after being increased

    Increasing readBufferSize will reduce the number of syscall calls when fetching results. If the query result is smaller, modifying this parameter will not improve performance significantly. If you increase the parameter value too much, the bottleneck will be parsing JSON data. If you need to optimize the query speed, you must adjust the value based on the actual situation to achieve the best query performance.

  5. disableCompression parameter is set to false when the query efficiency is reduced

    When set disableCompression parameter to false, the query result will be compressed by gzip and then transmitted, so you have to decompress the data by gzip after getting it.

  6. go get command can't get the package, or timeout to get the package

    Set Go proxy go env -w GOPROXY=https://goproxy.cn,direct.

API Reference

Full API see driver-go documentation