Skip to main content

Go Client Library

driver-go is the official Go language connector for TDengine, implementing the interface of the Go language database/sql package. Go developers can use it to develop applications that access data in the TDengine cluster.

Compatibility

Supports a minimum Go version of 1.14, but the latest version of Go is recommended.

Supported Platforms

Native connections support the same platforms as the TDengine client driver. REST connections support all platforms that can run Go.

Version Support

Please refer to the version support list.

Handling Exceptions

If it is a TDengine error, you can obtain the error code and error message as follows.

// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}

For errors in other TDengine modules, please refer to Error Codes.

Data Type Mapping

TDengine DataTypeGo Type
TIMESTAMPtime.Time
TINYINTint8
SMALLINTint16
INTint32
BIGINTint64
TINYINT UNSIGNEDuint8
SMALLINT UNSIGNEDuint16
INT UNSIGNEDuint32
BIGINT UNSIGNEDuint64
FLOATfloat32
DOUBLEfloat64
BOOLbool
BINARYstring
NCHARstring
JSON[]byte
GEOMETRY[]byte
VARBINARY[]byte

Note: The JSON type is only supported in tags. The GEOMETRY type is binary data in little endian byte order, conforming to the WKB standard. For more details, please refer to Data Types For the WKB standard, please refer to Well-Known Binary (WKB).

Example Programs Summary

For the source code of the example programs, please refer to: Example Programs.

Frequently Asked Questions

  1. Crashes related to stmt (parameter binding) interfaces in database/sql

    REST does not support interfaces related to parameter binding, it is recommended to use db.Exec and db.Query.

  2. Error [0x217] Database not specified or available occurs after executing other statements following the use db statement

    In the REST interface, the execution of SQL statements has no context association, and the use db statement will not take effect. See the usage restrictions section above for solutions.

  3. No error with taosSql but error [0x217] Database not specified or available with taosRestful

    Since the REST interface is stateless, the use db statement will not take effect. See the usage restrictions section above for solutions.

  4. No significant effect after increasing the readBufferSize parameter

    Increasing readBufferSize will reduce the number of syscall calls when fetching results. If the data volume of the query results is not large, modifying this parameter will not bring significant improvement. If this parameter is increased too much, the bottleneck will be in parsing JSON data. To optimize query speed, adjust this value according to the actual situation to achieve the best query effect.

  5. Query efficiency decreases when disableCompression parameter is set to false

    When the disableCompression parameter is set to false, the query results will be transmitted after being compressed with gzip, and the data must be decompressed with gzip after being received.

  6. go get command cannot fetch packages, or fetching packages times out

    Set the Go proxy go env -w GOPROXY=https://goproxy.cn,direct.

API Reference

database/sql Driver

driver-go implements the Go database/sql/driver interface, allowing direct use of the Go database/sql package. It provides three drivers: github.com/taosdata/driver-go/v3/taosSql, github.com/taosdata/driver-go/v3/taosRestful, and github.com/taosdata/driver-go/v3/taosWS corresponding to native connection, REST connection, and WebSocket connection.

DSN Specification

The Data Source Name has a generic format, similar to PEAR DB, but without the type prefix (brackets indicate optional):

[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]

Full form of DSN:

username:password@protocol(address)/dbname?param=value
Native Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)

Use taosSql as driverName and a correct DSN as dataSourceName as follows:

var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)

Supported DSN parameters:

  • cfg specifies the taos.cfg directory
  • cgoThread specifies the number of cgo executions at the same time, default is the number of system cores
  • cgoAsyncHandlerPoolSize specifies the size of the async function handle, default is 10000
Rest Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)

Use taosRestful as driverName and a correct DSN as dataSourceName as follows:

var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)

Supported DSN parameters:

  • disableCompression whether to accept compressed data, default is true which means not accepting compressed data, set to false if transferring data using gzip compression.
  • readBufferSize the size of the buffer for reading data defaults to 4K (4096), this value can be increased appropriately when the query result data volume is large.
  • token the token used when connecting to cloud services.
  • skipVerify whether to skip certificate verification, default is false which means not skipping certificate verification, set to true if connecting to an insecure service.
WebSocket Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)

Use taosWS as driverName and use a correct DSN as dataSourceName as follows:

var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)

Supported DSN parameters:

  • enableCompression whether to send compressed data, default is false which means not sending compressed data, set to true if transferring data using compression.
  • readTimeout the timeout for reading data, default is 5m.
  • writeTimeout the timeout for writing data, default is 10s.
note
  • Unlike the native connection method, the REST interface is stateless. When using REST connections, you need to specify the database name of tables and supertables in SQL.
  • If dbname is specified in the DSN, then, the REST connection will default to using /rest/sql/dbname as the restful request URL, and it is not necessary to specify dbname in SQL.

Connection Features

The Go driver supports creating connections, returning objects that support the sql/driver standard Connector interface, and also provides the af package, which expands some schema-less writing interfaces.

Standard Interface

Interface for creating connections in the database/sql package

  • func Open(driverName, dataSourceName string) (*DB, error)
    • Interface Description: Connect to the database (database/sql).
    • Parameter Description:
      • driverName: Driver name.
      • dataSourceName: Connection parameters DSN.
    • Return Value: Connection object, error information.

Extended Interface

Interface for creating connections in the af package

  • func Open(host, user, pass, db string, port int) (*Connector, error)
    • Interface Description: Connect to the database.
    • Parameter Description:
      • host: Host address.
      • user: Username.
      • pass: Password.
      • db: Database name.
      • port: Port number.
    • Return Value: Connection object, error information.

Schema-less Writing

Interfaces for schema-less writing using native connections in the af package.

  • func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error

    • Interface Description: Schema-less writing of InfluxDB format data.
    • Parameter Description:
      • lines: Data to write.
      • precision: Time precision.
    • Return Value: Error information.
  • func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error

    • Interface Description: Schema-less writing of OpenTSDB JSON format data.
    • Parameter Description:
      • payload: Data to write.
    • Return Value: Error information.
  • func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error

    • Interface Description: Schema-less writing of OpenTSDB Telnet format data.
    • Parameter Description:
      • lines: Data to write.
    • Return Value: Error information.

ws/schemaless package uses WebSocket schemaless write interface

  • func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
    • Interface Description: Schemaless data insertion.
    • Parameter Description:
      • lines: Data to be written.
      • protocol: Supported protocols for data writing InfluxDBLineProtocol = 1 OpenTSDBTelnetLineProtocol = 2 OpenTSDBJsonFormatProtocol = 3.
      • precision: Time precision.
      • ttl: Data expiration time, 0 means never expires.
      • reqID: Request ID.
    • Return Value: Error information.

Execute SQL

The Go driver provides interfaces compliant with database/sql standards, supporting the following features:

  1. Execute SQL Statements: Execute static SQL statements and return the resulting object.
  2. Query Execution: Can execute queries that return data sets (SELECT statements).
  3. Update Execution: Can execute SQL statements that affect rows, such as INSERT, UPDATE, DELETE, etc.
  4. Get Results: Can obtain and traverse the result set returned after query execution.
  5. Get Update Count: For non-query SQL statements, can obtain the number of rows affected after execution.
  6. Close Resources: Release database resources.

Standard Interfaces

  • func (db *DB) Close() error

    • Interface Description: Close the connection.
    • Return Value: Error information.
  • func (db *DB) Exec(query string, args ...any) (Result, error)

    • Interface Description: Execute a query without returning any rows.
    • Parameter Description:
      • query: Command to execute.
      • args: Command arguments.
    • Return Value: Result object (only affected rows), error information.
  • func (db *DB) Query(query string, args ...any) (*Rows, error)

    • Interface Description: Execute a query and return the results as rows.
    • Parameter Description:
      • query: Command to execute.
      • args: Command arguments.
    • Return Value: Rows object, error information.
  • func (db *DB) QueryRow(query string, args ...any) *Row

    • Interface Description: Execute a query and return a single row result.
    • Parameter Description:
      • query: Command to execute.
      • args: Command arguments.
    • Return Value: Row object.

Extended Interfaces

  • func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

    • Interface Description: Execute a query without returning any rows.
    • Parameter Description:
      • ctx: Context, use Value to pass request ID for link tracing, key is taos_req_id value is an int64 type.
      • query: Command to execute.
      • args: Command arguments.
    • Return Value: Result object (only affected rows), error information.
  • func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

    • Interface Description: Execute a query and return the results as rows.
    • Parameter Description:
      • ctx: Context, use Value to pass request ID for link tracing, key is taos_req_id value is an int64 type.
      • query: Command to execute.
      • args: Command arguments.
    • Return Value: Rows object, error information.
  • func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row

    • Interface Description: Executes a query and returns a single row result. Errors are deferred until the row is scanned.
    • Parameter Description:
      • ctx: Context, uses Value to pass the request ID for link tracing, key is taos_req_id and value is an int64 type.
      • query: Command to execute.
      • args: Command parameters.
    • Return Value: Single row result Row object.

Result Retrieval

The Go driver supports retrieving query result sets and corresponding metadata, providing methods to read metadata and data from the result set.

Result Set

Retrieve the query result set through the Rows object, which provides the following methods:

  • func (rs *Rows) Next() bool

    • Interface Description: Prepares the next row of data.
    • Return Value: Whether there is another row of data.
  • func (rs *Rows) Columns() ([]string, error)

    • Interface Description: Returns column names.
    • Return Value: Column names, error information.
  • func (rs *Rows) Scan(dest ...any) error

    • Interface Description: Copies the current row's column values into the values pointed to by dest.
    • Parameter Description:
      • dest: Target values.
    • Return Value: Error information.
  • func (rs *Rows) Close() error

    • Interface Description: Closes the rows.
    • Return Value: Error information.
  • func (r *Row) Scan(dest ...any) error

    • Interface Description: Copies the current row's column values into the values pointed to by dest.
    • Parameter Description:
      • dest: Target values.
    • Return Value: Error information.

Retrieve the update result set through the Result object, which provides the following method:

  • func (dr driverResult) RowsAffected() (int64, error)
    • Interface Description: Returns the number of rows affected.
    • Return Value: Number of rows affected, error information.

Result Set Metadata

Retrieve query result set metadata through the Rows object, providing the following methods:

  • func (rs *Rows) ColumnTypes() ([]*ColumnType, error)

    • Interface Description: Returns column types.
    • Return Value: Column types, error information.
  • func (ci *ColumnType) Name() string

    • Interface Description: Returns the column name.
    • Return Value: Column name.
  • func (ci *ColumnType) Length() (length int64, ok bool)

    • Interface Description: Returns the column length.
    • Return Value: Column length, whether there is a length.
  • func (ci *ColumnType) ScanType() reflect.Type

    • Interface Description: Returns the Go type corresponding to the column type.
    • Return Value: Column type.
  • func (ci *ColumnType) DatabaseTypeName() string

    • Interface Description: Returns the database name of the column type.
    • Return Value: Column type name.

Parameter Binding

Prepare allows the use of precompiled SQL statements, which can improve performance and provide the ability for parameterized queries, thereby increasing security.

Standard Interface

Use the Prepare method in the Conn interface of sql/driver to prepare a statement bound to this connection, returning a Stmt object for use.

  • Prepare(query string) (Stmt, error)

    • Interface Description: Prepares and returns a statement bound to this connection.
    • Parameter Description:
      • query: Statement for parameter binding.
    • Return Value: Stmt object, error information.
  • func (s *Stmt) Exec(args ...any) (Result, error)

    • Interface Description: Executes the prepared statement with the given parameters and returns a result summarizing the effect of the statement (only column values can be bound, not table names or tags).
    • Parameter Description:
      • args: Command parameters, Go native types are automatically converted to database types, type mismatches may lose precision, it is recommended to use the same type as the database, time types use int64 or RFC3339Nano formatted strings.
    • Return Value: Result object (only affected rows), error information.
  • func (s *Stmt) Query(args ...any) (*Rows, error)

    • Interface Description: Executes the prepared statement with the given arguments and returns the result rows.
    • Parameter Description:
      • args: Command arguments, Go native types will automatically convert to database types, type mismatches may lose precision, it is recommended to use the same type as the database, time types use int64 or RFC3339Nano formatted strings.
    • Return Value: Result set Rows object, error information.
  • func (s *Stmt) Close() error

    • Interface Description: Closes the statement.
    • Return Value: Error information.

Extended Interfaces

The af package provides more interfaces using native connections for parameter binding

  • func (conn *Connector) Stmt() *Stmt

    • Interface Description: Returns a Stmt object bound to this connection.
    • Return Value: Stmt object.
  • func (s *Stmt) Prepare(sql string) error

    • Interface Description: Prepares an SQL.
    • Parameter Description:
      • sql: The statement for parameter binding.
    • Return Value: Error information.
  • func (s *Stmt) NumParams() (int, error)

    • Interface Description: Returns the number of parameters.
    • Return Value: Number of parameters, error information.
  • func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param) error

    • Interface Description: Sets the table name and tags.
    • Parameter Description:
      • tableName: Table name.
      • tags: Tags.
    • Return Value: Error information.
  • func (s *Stmt) SetTableName(tableName string) error

    • Interface Description: Sets the table name.
    • Parameter Description:
      • tableName: Table name.
    • Return Value: Error information.
  • func (s *Stmt) BindRow(row *param.Param) error

    • Interface Description: Binds a row.
    • Parameter Description:
      • row: Row data.
    • Return Value: Error information.
  • func (s *Stmt) GetAffectedRows() int

    • Interface Description: Gets the number of affected rows.
    • Return Value: Number of affected rows.
  • func (s *Stmt) AddBatch() error

    • Interface Description: Adds a batch.
    • Return Value: Error information.
  • func (s *Stmt) Execute() error

    • Interface Description: Executes the batch.
    • Return Value: Error information.
  • func (s *Stmt) UseResult() (driver.Rows, error)

    • Interface Description: Uses the result.
    • Return Value: Result set Rows object, error information.
  • func (s *Stmt) Close() error

    • Interface Description: Closes the statement.
    • Return Value: Error information.

The ws/stmt package provides interfaces for parameter binding via WebSocket

  • func (c *Connector) Init() (*Stmt, error)

    • Interface Description: Initialization.
    • Return Value: Stmt object, error information.
  • func (s *Stmt) Prepare(sql string) error

    • Interface Description: Prepares an SQL.
    • Parameter Description:
      • sql: The statement for parameter binding.
    • Return Value: Error information.
  • func (s *Stmt) SetTableName(name string) error

    • Interface Description: Sets the table name.
    • Parameter Description:
      • name: Table name.
    • Return Value: Error information.
  • func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)

    • Interface Description: Set tags.
    • Parameter Description:
      • tags: tag.
      • bindType: Type information.
    • Return Value: Error information.
  • func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

    • Interface Description: Bind parameters.
    • Parameter Description:
      • params: Parameters.
      • bindType: Type information.
    • Return Value: Error information.
  • func (s *Stmt) AddBatch() error

    • Interface Description: Add batch processing.
    • Return Value: Error information.
  • func (s *Stmt) Exec() error

    • Interface Description: Execute batch processing.
    • Return Value: Error information.
  • func (s *Stmt) GetAffectedRows() int

    • Interface Description: Get the number of affected rows.
    • Return Value: Number of affected rows.
  • func (s *Stmt) UseResult() (*Rows, error)

    • Interface Description: Use result.
    • Return Value: Rows object, error information.
  • func (s *Stmt) Close() error

    • Interface Description: Close statement.
    • Return Value: Error information.

Rows row results refer to the Rows interface in the sql/driver package, providing the following interfaces:

  • func (rs *Rows) Columns() []string

    • Interface Description: Return column names.
    • Return Value: Column names.
  • func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string

    • Interface Description: Return the database name of the column type.
    • Parameter Description:
      • i: Column index.
    • Return Value: Column type name.
  • func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)

    • Interface Description: Return column length.
    • Parameter Description:
      • i: Column index.
    • Return Value: Column length, whether there is a length.
  • func (rs *Rows) ColumnTypeScanType(i int) reflect.Type

    • Interface Description: Return the Go type corresponding to the column type.
    • Parameter Description:
      • i: Column index.
    • Return Value: Column type.
  • func (rs *Rows) Next(dest []driver.Value) error

    • Interface Description: Prepare the next row of data and assign it to the target.
    • Parameter Description:
      • dest: Target values.
    • Return Value: Error information.
  • func (rs *Rows) Close() error

    • Interface Description: Close rows.
    • Return Value: Error information.

The common/param package provides a parameter binding data structure.

Below are the interfaces for setting parameters by offset:

  • func NewParam(size int) *Param

    • Interface Description: Create a parameter binding data structure.
    • Parameter Description:
      • size: Number of parameters.
    • Return Value: Param object.
  • func (p *Param) SetBool(offset int, value bool)

    • Interface Description: Set a boolean value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Boolean value.
  • func (p *Param) SetNull(offset int)

    • Interface Description: Set a null value.
    • Parameter Description:
      • offset: Offset (column or tag).
  • func (p *Param) SetTinyint(offset int, value int)

    • Interface Description: Set Tinyint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Tinyint value.
  • func (p *Param) SetSmallint(offset int, value int)

    • Interface Description: Set Smallint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Smallint value.
  • func (p *Param) SetInt(offset int, value int)

    • Interface Description: Set Int value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Int value.
  • func (p *Param) SetBigint(offset int, value int)

    • Interface Description: Set Bigint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Bigint value.
  • func (p *Param) SetUTinyint(offset int, value uint)

    • Interface Description: Set UTinyint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: UTinyint value.
  • func (p *Param) SetUSmallint(offset int, value uint)

    • Interface Description: Set USmallint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: USmallint value.
  • func (p *Param) SetUInt(offset int, value uint)

    • Interface Description: Set UInt value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: UInt value.
  • func (p *Param) SetUBigint(offset int, value uint)

    • Interface Description: Set UBigint value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: UBigint value.
  • func (p *Param) SetFloat(offset int, value float32)

    • Interface Description: Set Float value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Float value.
  • func (p *Param) SetDouble(offset int, value float64)

    • Interface Description: Set Double value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Double value.
  • func (p *Param) SetBinary(offset int, value []byte)

    • Interface Description: Set Binary value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Binary value.
  • func (p *Param) SetVarBinary(offset int, value []byte)

    • Interface Description: Set VarBinary value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: VarBinary value.
  • func (p *Param) SetNchar(offset int, value string)

    • Interface Description: Set Nchar value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Nchar value.
  • func (p *Param) SetTimestamp(offset int, value time.Time, precision int)

    • Interface Description: Sets the Timestamp value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Timestamp value.
      • precision: Time precision.
  • func (p *Param) SetJson(offset int, value []byte)

    • Interface Description: Sets the Json value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Json value.
  • func (p *Param) SetGeometry(offset int, value []byte)

    • Interface Description: Sets the Geometry value.
    • Parameter Description:
      • offset: Offset (column or tag).
      • value: Geometry value.

Below are the interfaces for setting parameters via chain calls:

  • func (p *Param) AddBool(value bool) *Param
    • Interface Description: Adds a boolean value.
    • Parameter Description:
      • value: Boolean value.
    • Return Value: Param object.

Other types similar to boolean are as follows:

  • AddNull
  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

Below are the interfaces for setting column type information:

  • func NewColumnType(size int) *ColumnType

    • Interface Description: Creates a column type information data structure.
    • Parameter Description:
      • size: Number of columns.
    • Return Value: ColumnType object.
  • func (c *ColumnType) AddBool() *ColumnType

    • Interface Description: Adds a boolean type.
    • Return Value: ColumnType object.

Other types similar to boolean are as follows:

  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

Data Subscription

The Go driver supports data subscription features, providing interfaces for data subscription via native connections and WebSocket connections. Native implementation is in the af/tmq package, WebSocket implementation is in the ws/tmq package.

Consumer

  • func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
    • Interface Description: Creates a consumer.
    • Parameter Description:
      • conf: Configuration information.
    • Return Value: Consumer object, error information.

Configuration information is defined as:

type ConfigValue interface{}
type ConfigMap map[string]ConfigValue

Creating a consumer supports the following properties:

  • ws.url: WebSocket connection URL.
  • ws.message.channelLen: WebSocket message channel buffer length, default 0.
  • ws.message.timeout: WebSocket message timeout, default 5m.
  • ws.message.writeWait: WebSocket message write timeout, default 10s.
  • ws.message.enableCompression: Whether WebSocket compression is enabled, default false.
  • ws.autoReconnect: Whether WebSocket automatically reconnects, default false.
  • ws.reconnectIntervalMs: WebSocket reconnect interval in milliseconds, default 2000.
  • ws.reconnectRetryCount: WebSocket reconnect retry count, default 3.

For other parameters, please refer to: Consumer Parameter List, note that starting from version 3.2.0.0 of the TDengine server, the default value of auto.offset.reset in message subscriptions has changed.

  • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

    • Interface Description: Subscribe to a topic.
    • Parameter Description:
      • topic: Topic.
      • rebalanceCb: Rebalance callback (unused).
    • Return Value: Error information.
  • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

    • Interface Description: Subscribe to a list of topics.
    • Parameter Description:
      • topics: List of topics.
      • rebalanceCb: Rebalance callback (unused).
    • Return Value: Error information.
  • func (c *Consumer) Unsubscribe() error

    • Interface Description: Unsubscribe.
    • Return Value: Error information.
  • func (c *Consumer) Poll(timeoutMs int) tmq.Event

    • Interface Description: Poll for events.
    • Parameter Description:
      • timeoutMs: Timeout in milliseconds.
    • Return Value: Event.
  • func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

    • Interface Description: Commit offsets.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

    • Interface Description: Get assignment information.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

    • Interface Description: Seek to an offset.
    • Parameter Description:
      • partition: Partition and offset information.
      • ignoredTimeoutMs: Timeout in milliseconds (unused).
    • Return Value: Error information.
  • func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)

    • Interface Description: Get committed offsets.
    • Parameter Description:
      • partitions: List of partitions.
      • timeoutMs: Timeout in milliseconds.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)

    • Interface Description: Commit offsets.
    • Parameter Description:
      • offsets: List of offsets.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)

    • Interface Description: Get current offsets.
    • Parameter Description:
      • partitions: List of partitions.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Close() error

    • Interface Description: Close the consumer.
    • Return Value: Error information.

Consumption Records

When Poll returns a tmq.Event event, you can obtain the consumption record or error information by determining the type of tmq.Event. When the type is *tmq.DataMessage, you can get the consumption record.

  • func (m *DataMessage) Topic() string

    • Interface Description: Get the topic.
    • Return Value: Topic.
  • func (m *DataMessage) DBName() string

    • Interface Description: Get the database name.
    • Return Value: Database name.
  • func (m *DataMessage) Offset() Offset

    • Interface Description: Get the offset.
    • Return Value: Offset.
  • func (m *DataMessage) Value() interface{}

    • Interface Description: Get the value, the specific value is []*tmq.data.
    • Return Value: Consumed value.

Structure of tmq.data:

type Data struct {
TableName string
Data [][]driver.Value
}
  • TableName is the table name.
  • Data is the data, each element is a row of data, each row of data is an array, and the elements of the array are column values.

When Poll returns a type of tmq.Error, you can use func (e Error) Error() string to get the error information.

Partition Information

When the consumed data type is *tmq.DataMessage, you can obtain partition information from the TopicPartition attribute.

type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
  • Topic: Topic.
  • Partition: Partition.
  • Offset: Offset.
  • Metadata: Metadata (unused).
  • Error: Error information.

You can use func (p TopicPartition) String() string to get partition information.

Offset Metadata

The offset information obtained from TopicPartition can be accessed through the Offset attribute. When the offset is -2147467247, it indicates that the offset is not set.

Deserialization

When the consumed data type is *tmq.DataMessage, you can use func (m *DataMessage) Value() interface{} to get the data, the data type is []*tmq.data.

Appendix