Skip to main content

Go Client Library

driver-go is the official Go language connector for TDengine, implementing the interface of Go's database/sql package. Go developers can use it to develop applications that access data from TDengine clusters.

Compatibility

Supports Go version 1.14 and above; the latest Go version is recommended.

Supported Platforms

The native connection supports the same platforms as the TDengine client driver.
REST connections support all platforms that can run Go.

Version Support

Please refer to the version support list.

Handling Exceptions

If it is a TDengine error, you can obtain the error code and message as follows:

// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}

For errors from other TDengine functional modules, please refer to the error codes.

Data Type Mapping

TDengine DataTypeGo Type
TIMESTAMPtime.Time
TINYINTint8
SMALLINTint16
INTint32
BIGINTint64
TINYINT UNSIGNEDuint8
SMALLINT UNSIGNEDuint16
INT UNSIGNEDuint32
BIGINT UNSIGNEDuint64
FLOATfloat32
DOUBLEfloat64
BOOLbool
BINARYstring
NCHARstring
JSON[]byte
GEOMETRY[]byte
VARBINARY[]byte

Note: The JSON type is only supported in tags.
The GEOMETRY type is binary data in little-endian byte order, compliant with the WKB standard. For more details, please refer to the data types.
The WKB standard can be referenced at Well-Known Binary (WKB).

Example Programs Summary

Source code for example programs can be found here: Example Programs.

Common Issues

  1. Crashes related to stmt (parameter binding) interfaces in database/sql

    REST does not support parameter binding-related interfaces; it is recommended to use db.Exec and db.Query.

  2. Error [0x217] Database not specified or available after using the use db statement

    In the REST interface, the execution of SQL statements has no contextual association, so using the use db statement will not take effect. For solutions, refer to the usage restrictions section above.

  3. Error [0x217] Database not specified or available when using taosSql but not with taosRestful

    Because the REST interface is stateless, using the use db statement will not take effect. For solutions, refer to the usage restrictions section above.

  4. No significant effect after increasing the readBufferSize parameter

    Increasing readBufferSize reduces syscall calls when obtaining results. If the amount of data in the query result is small, modifying this parameter will not yield significant improvement. If this parameter is set too large, the bottleneck may be in parsing JSON data. To optimize query speed, adjust this value according to actual conditions for the best effect.

  5. Query efficiency decreases when the disableCompression parameter is set to false

    When the disableCompression parameter is set to false, the query results are transmitted after being compressed with gzip, and the data must be decompressed after retrieval.

  6. go get command fails to retrieve the package, or the retrieval times out

    Set the Go proxy using go env -w GOPROXY=https://goproxy.cn,direct.

API Reference

database/sql Driver

driver-go implements the Go database/sql/driver interface, allowing direct use of the Go database/sql package. It provides three drivers: github.com/taosdata/driver-go/v3/taosSql, github.com/taosdata/driver-go/v3/taosRestful, and github.com/taosdata/driver-go/v3/taosWS, corresponding to native connection, REST connection, and WebSocket connection, respectively.

DSN Specification

The Data Source Name (DSN) has a general format similar to PEAR DB, but without a type prefix (square brackets indicate optional):

[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]

The full form of the DSN is:

username:password@protocol(address)/dbname?param=value
Native Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)

Use taosSql as the driverName and a valid DSN as the dataSourceName like this:

var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)

Supported DSN parameters:

  • cfg: Specifies the directory for taos.cfg
  • cgoThread: Specifies the number of concurrent cgo executions, defaulting to the system core count
  • cgoAsyncHandlerPoolSize: Specifies the size of the async function handler, defaulting to 10,000
REST Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)

Use taosRestful as the driverName and a valid DSN as the dataSourceName like this:

var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)

Supported DSN parameters:

  • disableCompression: Whether to accept compressed data, defaulting to true (do not accept compressed data). Set to false if using gzip compression for data transmission.
  • readBufferSize: The size of the read buffer, defaulting to 4K (4096). This value can be increased if the amount of data in the query result is large.
  • token: The token used when connecting to cloud services.
  • skipVerify: Whether to skip certificate verification, defaulting to false (do not skip). Set to true if connecting to an insecure service.
WebSocket Connection

Import the driver:

import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)

Use taosWS as the driverName and a valid DSN as the dataSourceName like this:

var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)

Supported DSN parameters:

  • enableCompression: Whether to send compressed data, defaulting to false (do not send). Set to true if using compression for data transmission.
  • readTimeout: The read timeout for data, defaulting to 5 minutes.
  • writeTimeout: The write timeout for data, defaulting to 10 seconds.
note
  • Unlike the native connection method, the REST interface is stateless. When using REST connections, it is necessary to specify the database name of the table or supertable in the SQL.
  • If a dbname is specified in the DSN, the REST connection will default to using /rest/sql/dbname as the URL for the RESTful request, and the dbname does not need to be specified in the SQL.

Connection Features

The Go driver supports creating connections that return objects conforming to the sql/driver standard Connector interface. It also provides the af package, which extends some schemaless write interfaces.

Standard Interfaces

The database/sql package provides the following interface for creating connections:

  • func Open(driverName, dataSourceName string) (*DB, error)
    • Interface Description: Connects to the database.
    • Parameters:
      • driverName: The name of the driver.
      • dataSourceName: The connection parameters DSN.
    • Returns: A connection object and an error.

Extended Interfaces

The af package provides the following interface for creating connections:

  • func Open(host, user, pass, db string, port int) (*Connector, error)
    • Interface Description: Connects to the database.
    • Parameters:
      • host: The host address.
      • user: The username.
      • pass: The password.
      • db: The database name.
      • port: The port number.
    • Returns: A connection object and an error.

Schemaless Writing

The af package provides interfaces for schemaless writing using native connections:

  • func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error

    • Interface Description: Schemaless insert of InfluxDB formatted data.
    • Parameters:
      • lines: The data to write.
      • precision: The time precision.
    • Returns: An error.
  • func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error

    • Interface Description: Schemaless insert of OpenTSDB JSON formatted data.
    • Parameters:
      • payload: The data to write.
    • Returns: An error.
  • func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error

    • Interface Description: Schemaless insert of OpenTSDB Telnet formatted data.
    • Parameters:
      • lines: The data to write.
    • Returns: An error.

The ws/schemaless package provides interfaces for schemaless writing using WebSocket:

  • func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
    • Interface Description: Schemaless insert of data.
    • Parameters:
      • lines: The data to write.
      • protocol: The protocol supported for writing (e.g., InfluxDBLineProtocol = 1, OpenTSDBTelnetLineProtocol = 2, OpenTSDBJsonFormatProtocol = 3).
      • precision: The time precision.
      • ttl: The data expiration time; 0 means no expiration.
      • reqID: The request ID.
    • Returns: An error.

Execute SQL

The Go driver provides interfaces conforming to the database/sql standard, supporting the following functionalities:

  1. Execute SQL Statements: Execute static SQL statements and return their resulting objects.
  2. Query Execution: Execute queries that return datasets (e.g., SELECT statements).
  3. Update Execution: Execute SQL statements that affect rows, such as INSERT, UPDATE, DELETE, etc.
  4. Get Results: Retrieve result sets returned after executing queries and iterate through the data.
  5. Get Update Count: For non-query SQL statements, retrieve the number of rows affected after execution.
  6. Close Resources: Release database resources.

Standard Interfaces

  • func (db *DB) Close() error

    • Interface Description: Closes the connection.
    • Returns: An error.
  • func (db *DB) Exec(query string, args ...any) (Result, error)

    • Interface Description: Executes a query but does not return any rows.
    • Parameters:
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Result object (only affecting row count) and an error.
  • func (db *DB) Query(query string, args ...any) (*Rows, error)

    • Interface Description: Executes a query and returns the result rows.
    • Parameters:
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Rows object and an error.
  • func (db *DB) QueryRow(query string, args ...any) *Row

    • Interface Description: Executes a query and returns a single row result.
    • Parameters:
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Row object.

Extended Interfaces

  • func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

    • Interface Description: Executes a query but does not return any rows.
    • Parameters:
      • ctx: The context, using Value to pass the request ID for tracing, where the key is taos_req_id and the value is an int64.
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Result object (only the affected row count) and an error.
  • func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

    • Interface Description: Executes a query and returns the result rows.
    • Parameters:
      • ctx: The context, using Value to pass the request ID for tracing, where the key is taos_req_id and the value is an int64.
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Rows object and an error.
  • func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row

    • Interface Description: Executes a query and returns a single row result; error information is delayed until scanning the Row.
    • Parameters:
      • ctx: The context, using Value to pass the request ID for tracing, where the key is taos_req_id and the value is an int64.
      • query: The command to execute.
      • args: The command parameters.
    • Returns: A Row object.

Result Retrieval

The Go driver supports retrieving query result sets and corresponding result metadata, providing methods for reading metadata and data within result sets.

Result Set

The Rows object provides the following methods to access the query result set:

  • func (rs *Rows) Next() bool

    • Interface Description: Prepares the next row of data.
    • Returns: Whether there is a next row of data.
  • func (rs *Rows) Columns() ([]string, error)

    • Interface Description: Returns the column names.
    • Returns: Column names and an error.
  • func (rs *Rows) Scan(dest ...any) error

    • Interface Description: Copies the column values of the current row into the values pointed to by dest.
    • Parameters:
      • dest: The target values.
    • Returns: An error.
  • func (rs *Rows) Close() error

    • Interface Description: Closes the rows.
    • Returns: An error.
  • func (r *Row) Scan(dest ...any) error

    • Interface Description: Copies the column values of the current row into the values pointed to by dest.
    • Parameters:
      • dest: The target values.
    • Returns: An error.

To retrieve update results, the Result object provides the following method:

  • func (dr driverResult) RowsAffected() (int64, error)
    • Interface Description: Returns the number of rows affected.
    • Returns: The number of affected rows and an error.

Result Set Metadata

The Rows object provides methods to retrieve metadata about the query result set:

  • func (rs *Rows) ColumnTypes() ([]*ColumnType, error)

    • Interface Description: Returns the column types.
    • Returns: Column types and an error.
  • func (ci *ColumnType) Name() string

    • Interface Description: Returns the column name.
    • Returns: The column name.
  • func (ci *ColumnType) Length() (length int64, ok bool)

    • Interface Description: Returns the length of the column.
    • Returns: The column length and whether a length exists.
  • func (ci *ColumnType) ScanType() reflect.Type

    • Interface Description: Returns the Go type corresponding to the column type.
    • Returns: The column type.
  • func (ci *ColumnType) DatabaseTypeName() string

    • Interface Description: Returns the name of the column type in the database.
    • Returns: The column type name.

Parameter Binding

Prepare allows the use of precompiled SQL statements, which can improve performance and provide the ability for parameterized queries, thereby increasing security.

Standard Interfaces

Using the Prepare method from the Conn interface in sql/driver, prepare a statement bound to this connection, returning a Stmt object for use.

  • Prepare(query string) (Stmt, error)

    • Interface Description: Prepares and returns a statement (stmt) bound to this connection.
    • Parameters:
      • query: The statement for parameter binding.
    • Returns: A Stmt object and an error.
  • func (s *Stmt) Exec(args ...any) (Result, error)

    • Interface Description: Executes the prepared statement using the given parameters and returns a result summarizing the effect of the statement (only column values can be bound; binding table names and tags is not supported).
    • Parameters:
      • args: Command parameters. Go primitive types will be automatically converted to database types; type mismatches may cause loss of precision. It is recommended to use types that match the database, and for time types, use int64 or strings formatted in RFC3339Nano.
    • Returns: A Result object (only the affected row count) and an error.
  • func (s *Stmt) Query(args ...any) (*Rows, error)

    • Interface Description: Executes the prepared statement using the given parameters and returns the result rows.
    • Parameters:
      • args: Command parameters. Go primitive types will be automatically converted to database types; type mismatches may cause loss of precision. It is recommended to use types that match the database, and for time types, use int64 or strings formatted in RFC3339Nano.
    • Returns: A Rows object and an error.
  • func (s *Stmt) Close() error

    • Interface Description: Closes the statement.
    • Returns: An error.

Extended Interfaces

The af package provides additional interfaces for parameter binding using native connections.

  • func (conn *Connector) Stmt() *Stmt

    • Interface Description: Returns a Stmt object bound to this connection.
    • Returns: A Stmt object.
  • func (s *Stmt) Prepare(sql string) error

    • Interface Description: Prepares an SQL statement.
    • Parameters:
      • sql: The SQL statement for parameter binding.
    • Returns: An error.
  • func (s *Stmt) NumParams() (int, error)

    • Interface Description: Returns the number of parameters.
    • Returns: The number of parameters and an error.
  • func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)

    • Interface Description: Sets the table name and tags.
    • Parameters:
      • tableName: The table name.
      • tags: The tags.
    • Returns: An error.
  • func (s *Stmt) SetTableName(tableName string) error

    • Interface Description: Sets the table name.
    • Parameters:
      • tableName: The table name.
    • Returns: An error.
  • func (s *Stmt) BindRow(row *param.Param) error

    • Interface Description: Binds a row.
    • Parameters:
      • row: The row data.
    • Returns: An error.
  • func (s *Stmt) GetAffectedRows() int

    • Interface Description: Gets the number of affected rows.
    • Returns: The number of affected rows.
  • func (s *Stmt) AddBatch() error

    • Interface Description: Adds to the batch.
    • Returns: An error.
  • func (s *Stmt) Execute() error

    • Interface Description: Executes the batch.
    • Returns: An error.
  • func (s *Stmt) UseResult() (driver.Rows, error)

    • Interface Description: Uses the result.
    • Returns: A Rows object and an error.
  • func (s *Stmt) Close() error

    • Interface Description: Closes the statement.
    • Returns: An error.

The ws/stmt package provides interfaces for parameter binding over WebSocket.

  • func (c *Connector) Init() (*Stmt, error)

    • Interface Description: Initializes.
    • Returns: A Stmt object and an error.
  • func (s *Stmt) Prepare(sql string) error

    • Interface Description: Prepares an SQL statement.
    • Parameters:
      • sql: The SQL statement for parameter binding.
    • Returns: An error.
  • func (s *Stmt) SetTableName(name string) error

    • Interface Description: Sets the table name.
    • Parameters:
      • name: The table name.
    • Returns: An error.
  • func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)

    • Interface Description: Sets the tags.
    • Parameters:
      • tags: The tags.
      • bindType: Type information.
    • Returns: An error.
  • func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

    • Interface Description: Binds parameters.
    • Parameters:
      • params: The parameters.
      • bindType: Type information.
    • Returns: An error.
  • func (s *Stmt) AddBatch() error

    • Interface Description: Adds to the batch.
    • Returns: An error.
  • func (s *Stmt) Exec() error

    • Interface Description: Executes the batch.
    • Returns: An error.
  • func (s *Stmt) GetAffectedRows() int

    • Interface Description: Gets the number of affected rows.
    • Returns: The number of affected rows.
  • func (s *Stmt) UseResult() (*Rows, error)

    • Interface Description: Uses the result.
    • Returns: A Rows object and an error.

The Rows interface for row results references the sql/driver package and provides the following interfaces:

  • func (rs *Rows) Columns() []string

    • Interface Description: Returns the column names.
    • Returns: The column names.
  • func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string

    • Interface Description: Returns the database name of the column type.
    • Parameters:
      • i: Column index.
    • Returns: The column type name.
  • func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)

    • Interface Description: Returns the column length.
    • Parameters:
      • i: Column index.
    • Returns: The column length and whether it has a length.
  • func (rs *Rows) ColumnTypeScanType(i int) reflect.Type

    • Interface Description: Returns the Go type corresponding to the column type.
    • Parameters:
      • i: Column index.
    • Returns: The column type.
  • func (rs *Rows) Next(dest []driver.Value) error

    • Interface Description: Prepares the next row of data and assigns it to the destination.
    • Parameters:
      • dest: The target values.
    • Returns: An error.
  • func (rs *Rows) Close() error

    • Interface Description: Closes the rows.
    • Returns: An error.

Parameter Binding

The common/param package provides data structures for parameter binding.

Parameter Offset Setting Interfaces

  • func NewParam(size int) *Param

    • Interface Description: Creates a parameter binding data structure.
    • Parameters:
      • size: The number of parameters.
    • Returns: A Param object.
  • func (p *Param) SetBool(offset int, value bool)

    • Interface Description: Sets a boolean value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The boolean value.
  • func (p *Param) SetNull(offset int)

    • Interface Description: Sets a null value.
    • Parameters:
      • offset: The offset (column or tag).
  • func (p *Param) SetTinyint(offset int, value int)

    • Interface Description: Sets a Tinyint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Tinyint value.
  • func (p *Param) SetSmallint(offset int, value int)

    • Interface Description: Sets a Smallint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Smallint value.
  • func (p *Param) SetInt(offset int, value int)

    • Interface Description: Sets an Int value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Int value.
  • func (p *Param) SetBigint(offset int, value int)

    • Interface Description: Sets a Bigint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Bigint value.
  • func (p *Param) SetUTinyint(offset int, value uint)

    • Interface Description: Sets a UTinyint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The UTinyint value.
  • func (p *Param) SetUSmallint(offset int, value uint)

    • Interface Description: Sets a USmallint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The USmallint value.
  • func (p *Param) SetUInt(offset int, value uint)

    • Interface Description: Sets a UInt value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The UInt value.
  • func (p *Param) SetUBigint(offset int, value uint)

    • Interface Description: Sets a UBigint value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The UBigint value.
  • func (p *Param) SetFloat(offset int, value float32)

    • Interface Description: Sets a Float value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Float value.
  • func (p *Param) SetDouble(offset int, value float64)

    • Interface Description: Sets a Double value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Double value.
  • func (p *Param) SetBinary(offset int, value []byte)

    • Interface Description: Sets a Binary value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Binary value.
  • func (p *Param) SetVarBinary(offset int, value []byte)

    • Interface Description: Sets a VarBinary value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The VarBinary value.
  • func (p *Param) SetNchar(offset int, value string)

    • Interface Description: Sets an Nchar value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Nchar value.
  • func (p *Param) SetTimestamp(offset int, value time.Time, precision int)

    • Interface Description: Sets a Timestamp value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Timestamp value.
      • precision: The time precision.
  • func (p *Param) SetJson(offset int, value []byte)

    • Interface Description: Sets a Json value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Json value.
  • func (p *Param) SetGeometry(offset int, value []byte)

    • Interface Description: Sets a Geometry value.
    • Parameters:
      • offset: The offset (column or tag).
      • value: The Geometry value.

Chained Call Parameter Setting Interfaces

  • func (p *Param) AddBool(value bool) *Param
    • Interface Description: Adds a boolean value.
    • Parameters:
      • value: The boolean value.
    • Returns: A Param object.

Other types are similar to boolean values, and the specific interfaces are as follows:

  • AddNull
  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

Below are the interfaces for setting column type information:

  • func NewColumnType(size int) *ColumnType

    • Interface Description: Creates a data structure for column type information.
    • Parameter Description:
      • size: Number of columns.
    • Return Value: ColumnType object.
  • func (c *ColumnType) AddBool() *ColumnType

    • Interface Description: Adds a boolean type.
    • Return Value: ColumnType object.

Other types are similar to the boolean type, and the specific interfaces are as follows:

  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

Data Subscription

The Go driver supports data subscription functionality and provides data subscription interfaces based on native connections and WebSocket connections. The native implementation is in the af/tmq package, and the WebSocket implementation is in the ws/tmq package.

Consumer

  • func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
    • Interface Description: Creates a consumer.
    • Parameter Description:
      • conf: Configuration information.
    • Return Value: Consumer object, error information.

The configuration information is defined as:

type ConfigValue interface{}
type ConfigMap map[string]ConfigValue

The list of supported properties for creating a consumer:

  • ws.url: WebSocket connection address.
  • ws.message.channelLen: WebSocket message channel cache length, default is 0.
  • ws.message.timeout: WebSocket message timeout, default is 5m.
  • ws.message.writeWait: WebSocket message write timeout, default is 10s.
  • ws.message.enableCompression: Whether WebSocket compression is enabled, default is false.
  • ws.autoReconnect: Whether WebSocket automatically reconnects, default is false.
  • ws.reconnectIntervalMs: WebSocket reconnection interval in milliseconds, default is 2000.
  • ws.reconnectRetryCount: WebSocket reconnection retry count, default is 3.

For other parameters, please refer to the Consumer parameter list. Note that the default value of auto.offset.reset in message subscriptions has changed since TDengine server version 3.2.0.0.

  • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

    • Interface Description: Subscribes to a topic.
    • Parameter Description:
      • topic: Topic.
      • rebalanceCb: Rebalance callback (not used).
    • Return Value: Error information.
  • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

    • Interface Description: Subscribes to a list of topics.
    • Parameter Description:
      • topics: List of topics.
      • rebalanceCb: Rebalance callback (not used).
    • Return Value: Error information.
  • func (c *Consumer) Unsubscribe() error

    • Interface Description: Unsubscribes.
    • Return Value: Error information.
  • func (c *Consumer) Poll(timeoutMs int) tmq.Event

    • Interface Description: Polls for events.
    • Parameter Description:
      • timeoutMs: Timeout period.
    • Return Value: Event.
  • func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

    • Interface Description: Commits offsets.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

    • Interface Description: Gets assignment information.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

    • Interface Description: Jumps to an offset.
    • Parameter Description:
      • partition: Partition and offset information.
      • ignoredTimeoutMs: Timeout period (not used).
    • Return Value: Error information.
  • func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)

    • Interface Description: Gets committed offsets.
    • Parameter Description:
      • partitions: List of partitions.
      • timeoutMs: Timeout period.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)

    • Interface Description: Commits offsets.
    • Parameter Description:
      • offsets: List of offsets.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)

    • Interface Description: Gets current offsets.
    • Parameter Description:
      • partitions: List of partitions.
    • Return Value: List of TopicPartition, error information.
  • func (c *Consumer) Close() error

    • Interface Description: Closes the consumer.
    • Return Value: Error information.

Consumption Records

When Poll returns a tmq.Event event, you can determine the type of tmq.Event to obtain consumption records or error information. When the type is *tmq.DataMessage, you can access the consumption records.

  • func (m *DataMessage) Topic() string

    • Interface Description: Get the topic.
    • Return Value: The topic.
  • func (m *DataMessage) DBName() string

    • Interface Description: Get the database name.
    • Return Value: The database name.
  • func (m *DataMessage) Offset() Offset

    • Interface Description: Get the offset.
    • Return Value: The offset.
  • func (m *DataMessage) Value() interface{}

    • Interface Description: Get the value, specifically the value is []*tmq.data.
    • Return Value: The consumed value.

The structure of tmq.data is as follows:

type Data struct {
TableName string
Data [][]driver.Value
}
  • TableName is the name of the table.
  • Data is the data, where each element is a row of data, and each row of data is an array with the elements being column values.

When the Poll returns a type of tmq.Error, you can use func (e Error) Error() string to obtain the error information.

Partition Information

When the consumed data type is *tmq.DataMessage, you can obtain partition information from the TopicPartition attribute.

type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
  • Topic: The topic.
  • Partition: The partition.
  • Offset: The offset.
  • Metadata: Metadata (not used).
  • Error: Error information.

You can use func (p TopicPartition) String() string to obtain partition information.

Offset Metadata

The offset information obtained from TopicPartition can be accessed via the Offset attribute. An offset of -2147467247 indicates that the offset has not been set.

Deserialization

When the consumed data type is *tmq.DataMessage, you can use func (m *DataMessage) Value() interface{} to obtain the data, which is of type []*tmq.data.

Appendix