Go Client Library
driver-go
is the official Go language connector for TDengine, implementing the interface of Go's database/sql package. Go developers can use it to develop applications that access data from TDengine clusters.
Compatibility
Supports Go version 1.14 and above; the latest Go version is recommended.
Supported Platforms
The native connection supports the same platforms as the TDengine client driver.
REST connections support all platforms that can run Go.
Version Support
Please refer to the version support list.
Handling Exceptions
If it is a TDengine error, you can obtain the error code and message as follows:
// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}
For errors from other TDengine functional modules, please refer to the error codes.
Data Type Mapping
TDengine DataType | Go Type |
---|---|
TIMESTAMP | time.Time |
TINYINT | int8 |
SMALLINT | int16 |
INT | int32 |
BIGINT | int64 |
TINYINT UNSIGNED | uint8 |
SMALLINT UNSIGNED | uint16 |
INT UNSIGNED | uint32 |
BIGINT UNSIGNED | uint64 |
FLOAT | float32 |
DOUBLE | float64 |
BOOL | bool |
BINARY | string |
NCHAR | string |
JSON | []byte |
GEOMETRY | []byte |
VARBINARY | []byte |
Note: The JSON type is only supported in tags.
The GEOMETRY type is binary data in little-endian byte order, compliant with the WKB standard. For more details, please refer to the data types.
The WKB standard can be referenced at Well-Known Binary (WKB).
Example Programs Summary
Source code for example programs can be found here: Example Programs.
Common Issues
-
Crashes related to stmt (parameter binding) interfaces in
database/sql
REST does not support parameter binding-related interfaces; it is recommended to use
db.Exec
anddb.Query
. -
Error
[0x217] Database not specified or available
after using theuse db
statementIn the REST interface, the execution of SQL statements has no contextual association, so using the
use db
statement will not take effect. For solutions, refer to the usage restrictions section above. -
Error
[0x217] Database not specified or available
when usingtaosSql
but not withtaosRestful
Because the REST interface is stateless, using the
use db
statement will not take effect. For solutions, refer to the usage restrictions section above. -
No significant effect after increasing the
readBufferSize
parameterIncreasing
readBufferSize
reducessyscall
calls when obtaining results. If the amount of data in the query result is small, modifying this parameter will not yield significant improvement. If this parameter is set too large, the bottleneck may be in parsing JSON data. To optimize query speed, adjust this value according to actual conditions for the best effect. -
Query efficiency decreases when the
disableCompression
parameter is set tofalse
When the
disableCompression
parameter is set tofalse
, the query results are transmitted after being compressed withgzip
, and the data must be decompressed after retrieval. -
go get
command fails to retrieve the package, or the retrieval times outSet the Go proxy using
go env -w GOPROXY=https://goproxy.cn,direct
.
API Reference
database/sql Driver
driver-go
implements the Go database/sql/driver
interface, allowing direct use of the Go database/sql
package. It provides three drivers: github.com/taosdata/driver-go/v3/taosSql
, github.com/taosdata/driver-go/v3/taosRestful
, and github.com/taosdata/driver-go/v3/taosWS
, corresponding to native connection
, REST connection
, and WebSocket connection
, respectively.
DSN Specification
The Data Source Name (DSN) has a general format similar to PEAR DB, but without a type prefix (square brackets indicate optional):
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN]
The full form of the DSN is:
username:password@protocol(address)/dbname?param=value
Native Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
Use taosSql
as the driverName
and a valid DSN as the dataSourceName
like this:
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
Supported DSN parameters:
cfg
: Specifies the directory fortaos.cfg
cgoThread
: Specifies the number of concurrent cgo executions, defaulting to the system core countcgoAsyncHandlerPoolSize
: Specifies the size of the async function handler, defaulting to 10,000
REST Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
Use taosRestful
as the driverName
and a valid DSN as the dataSourceName
like this:
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
Supported DSN parameters:
disableCompression
: Whether to accept compressed data, defaulting totrue
(do not accept compressed data). Set tofalse
if using gzip compression for data transmission.readBufferSize
: The size of the read buffer, defaulting to 4K (4096). This value can be increased if the amount of data in the query result is large.token
: The token used when connecting to cloud services.skipVerify
: Whether to skip certificate verification, defaulting tofalse
(do not skip). Set totrue
if connecting to an insecure service.
WebSocket Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)
Use taosWS
as the driverName
and a valid DSN as the dataSourceName
like this:
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
Supported DSN parameters:
enableCompression
: Whether to send compressed data, defaulting tofalse
(do not send). Set totrue
if using compression for data transmission.readTimeout
: The read timeout for data, defaulting to 5 minutes.writeTimeout
: The write timeout for data, defaulting to 10 seconds.
- Unlike the native connection method, the REST interface is stateless. When using REST connections, it is necessary to specify the database name of the table or supertable in the SQL.
- If a
dbname
is specified in the DSN, the REST connection will default to using/rest/sql/dbname
as the URL for the RESTful request, and thedbname
does not need to be specified in the SQL.
Connection Features
The Go driver supports creating connections that return objects conforming to the sql/driver
standard Connector
interface. It also provides the af
package, which extends some schemaless write interfaces.
Standard Interfaces
The database/sql
package provides the following interface for creating connections:
func Open(driverName, dataSourceName string) (*DB, error)
- Interface Description: Connects to the database.
- Parameters:
driverName
: The name of the driver.dataSourceName
: The connection parameters DSN.
- Returns: A connection object and an error.
Extended Interfaces
The af
package provides the following interface for creating connections:
func Open(host, user, pass, db string, port int) (*Connector, error)
- Interface Description: Connects to the database.
- Parameters:
host
: The host address.user
: The username.pass
: The password.db
: The database name.port
: The port number.
- Returns: A connection object and an error.
Schemaless Writing
The af
package provides interfaces for schemaless writing using native connections:
-
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error
- Interface Description: Schemaless insert of InfluxDB formatted data.
- Parameters:
lines
: The data to write.precision
: The time precision.
- Returns: An error.
-
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error
- Interface Description: Schemaless insert of OpenTSDB JSON formatted data.
- Parameters:
payload
: The data to write.
- Returns: An error.
-
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error
- Interface Description: Schemaless insert of OpenTSDB Telnet formatted data.
- Parameters:
lines
: The data to write.
- Returns: An error.
The ws/schemaless
package provides interfaces for schemaless writing using WebSocket:
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
- Interface Description: Schemaless insert of data.
- Parameters:
lines
: The data to write.protocol
: The protocol supported for writing (e.g.,InfluxDBLineProtocol = 1
,OpenTSDBTelnetLineProtocol = 2
,OpenTSDBJsonFormatProtocol = 3
).precision
: The time precision.ttl
: The data expiration time; 0 means no expiration.reqID
: The request ID.
- Returns: An error.
Execute SQL
The Go driver provides interfaces conforming to the database/sql
standard, supporting the following functionalities:
- Execute SQL Statements: Execute static SQL statements and return their resulting objects.
- Query Execution: Execute queries that return datasets (e.g.,
SELECT
statements). - Update Execution: Execute SQL statements that affect rows, such as
INSERT
,UPDATE
,DELETE
, etc. - Get Results: Retrieve result sets returned after executing queries and iterate through the data.
- Get Update Count: For non-query SQL statements, retrieve the number of rows affected after execution.
- Close Resources: Release database resources.
Standard Interfaces
-
func (db *DB) Close() error
- Interface Description: Closes the connection.
- Returns: An error.
-
func (db *DB) Exec(query string, args ...any) (Result, error)
- Interface Description: Executes a query but does not return any rows.
- Parameters:
query
: The command to execute.args
: The command parameters.
- Returns: A Result object (only affecting row count) and an error.
-
func (db *DB) Query(query string, args ...any) (*Rows, error)
- Interface Description: Executes a query and returns the result rows.
- Parameters:
query
: The command to execute.args
: The command parameters.
- Returns: A Rows object and an error.
-
func (db *DB) QueryRow(query string, args ...any) *Row
- Interface Description: Executes a query and returns a single row result.
- Parameters:
query
: The command to execute.args
: The command parameters.
- Returns: A Row object.
Extended Interfaces
-
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- Interface Description: Executes a query but does not return any rows.
- Parameters:
ctx
: The context, using Value to pass the request ID for tracing, where the key istaos_req_id
and the value is an int64.query
: The command to execute.args
: The command parameters.
- Returns: A Result object (only the affected row count) and an error.
-
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- Interface Description: Executes a query and returns the result rows.
- Parameters:
ctx
: The context, using Value to pass the request ID for tracing, where the key istaos_req_id
and the value is an int64.query
: The command to execute.args
: The command parameters.
- Returns: A Rows object and an error.
-
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row
- Interface Description: Executes a query and returns a single row result; error information is delayed until scanning the Row.
- Parameters:
ctx
: The context, using Value to pass the request ID for tracing, where the key istaos_req_id
and the value is an int64.query
: The command to execute.args
: The command parameters.
- Returns: A Row object.
Result Retrieval
The Go driver supports retrieving query result sets and corresponding result metadata, providing methods for reading metadata and data within result sets.
Result Set
The Rows
object provides the following methods to access the query result set:
-
func (rs *Rows) Next() bool
- Interface Description: Prepares the next row of data.
- Returns: Whether there is a next row of data.
-
func (rs *Rows) Columns() ([]string, error)
- Interface Description: Returns the column names.
- Returns: Column names and an error.
-
func (rs *Rows) Scan(dest ...any) error
- Interface Description: Copies the column values of the current row into the values pointed to by
dest
. - Parameters:
dest
: The target values.
- Returns: An error.
- Interface Description: Copies the column values of the current row into the values pointed to by
-
func (rs *Rows) Close() error
- Interface Description: Closes the rows.
- Returns: An error.
-
func (r *Row) Scan(dest ...any) error
- Interface Description: Copies the column values of the current row into the values pointed to by
dest
. - Parameters:
dest
: The target values.
- Returns: An error.
- Interface Description: Copies the column values of the current row into the values pointed to by
To retrieve update results, the Result
object provides the following method:
func (dr driverResult) RowsAffected() (int64, error)
- Interface Description: Returns the number of rows affected.
- Returns: The number of affected rows and an error.
Result Set Metadata
The Rows
object provides methods to retrieve metadata about the query result set:
-
func (rs *Rows) ColumnTypes() ([]*ColumnType, error)
- Interface Description: Returns the column types.
- Returns: Column types and an error.
-
func (ci *ColumnType) Name() string
- Interface Description: Returns the column name.
- Returns: The column name.
-
func (ci *ColumnType) Length() (length int64, ok bool)
- Interface Description: Returns the length of the column.
- Returns: The column length and whether a length exists.
-
func (ci *ColumnType) ScanType() reflect.Type
- Interface Description: Returns the Go type corresponding to the column type.
- Returns: The column type.
-
func (ci *ColumnType) DatabaseTypeName() string
- Interface Description: Returns the name of the column type in the database.
- Returns: The column type name.
Parameter Binding
Prepare allows the use of precompiled SQL statements, which can improve performance and provide the ability for parameterized queries, thereby increasing security.
Standard Interfaces
Using the Prepare
method from the Conn
interface in sql/driver
, prepare a statement bound to this connection, returning a Stmt
object for use.
-
Prepare(query string) (Stmt, error)
- Interface Description: Prepares and returns a statement (stmt) bound to this connection.
- Parameters:
query
: The statement for parameter binding.
- Returns: A
Stmt
object and an error.
-
func (s *Stmt) Exec(args ...any) (Result, error)
- Interface Description: Executes the prepared statement using the given parameters and returns a result summarizing the effect of the statement (only column values can be bound; binding table names and tags is not supported).
- Parameters:
args
: Command parameters. Go primitive types will be automatically converted to database types; type mismatches may cause loss of precision. It is recommended to use types that match the database, and for time types, useint64
or strings formatted inRFC3339Nano
.
- Returns: A
Result
object (only the affected row count) and an error.
-
func (s *Stmt) Query(args ...any) (*Rows, error)
- Interface Description: Executes the prepared statement using the given parameters and returns the result rows.
- Parameters:
args
: Command parameters. Go primitive types will be automatically converted to database types; type mismatches may cause loss of precision. It is recommended to use types that match the database, and for time types, useint64
or strings formatted inRFC3339Nano
.
- Returns: A
Rows
object and an error.
-
func (s *Stmt) Close() error
- Interface Description: Closes the statement.
- Returns: An error.
Extended Interfaces
The af
package provides additional interfaces for parameter binding using native connections.
-
func (conn *Connector) Stmt() *Stmt
- Interface Description: Returns a
Stmt
object bound to this connection. - Returns: A
Stmt
object.
- Interface Description: Returns a
-
func (s *Stmt) Prepare(sql string) error
- Interface Description: Prepares an SQL statement.
- Parameters:
sql
: The SQL statement for parameter binding.
- Returns: An error.
-
func (s *Stmt) NumParams() (int, error)
- Interface Description: Returns the number of parameters.
- Returns: The number of parameters and an error.
-
func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)
- Interface Description: Sets the table name and tags.
- Parameters:
tableName
: The table name.tags
: The tags.
- Returns: An error.
-
func (s *Stmt) SetTableName(tableName string) error
- Interface Description: Sets the table name.
- Parameters:
tableName
: The table name.
- Returns: An error.
-
func (s *Stmt) BindRow(row *param.Param) error
- Interface Description: Binds a row.
- Parameters:
row
: The row data.
- Returns: An error.
-
func (s *Stmt) GetAffectedRows() int
- Interface Description: Gets the number of affected rows.
- Returns: The number of affected rows.
-
func (s *Stmt) AddBatch() error
- Interface Description: Adds to the batch.
- Returns: An error.
-
func (s *Stmt) Execute() error
- Interface Description: Executes the batch.
- Returns: An error.
-
func (s *Stmt) UseResult() (driver.Rows, error)
- Interface Description: Uses the result.
- Returns: A
Rows
object and an error.
-
func (s *Stmt) Close() error
- Interface Description: Closes the statement.
- Returns: An error.
The ws/stmt
package provides interfaces for parameter binding over WebSocket.
-
func (c *Connector) Init() (*Stmt, error)
- Interface Description: Initializes.
- Returns: A
Stmt
object and an error.
-
func (s *Stmt) Prepare(sql string) error
- Interface Description: Prepares an SQL statement.
- Parameters:
sql
: The SQL statement for parameter binding.
- Returns: An error.
-
func (s *Stmt) SetTableName(name string) error
- Interface Description: Sets the table name.
- Parameters:
name
: The table name.
- Returns: An error.
-
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)
- Interface Description: Sets the tags.
- Parameters:
tags
: The tags.bindType
: Type information.
- Returns: An error.
-
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
- Interface Description: Binds parameters.
- Parameters:
params
: The parameters.bindType
: Type information.
- Returns: An error.
-
func (s *Stmt) AddBatch() error
- Interface Description: Adds to the batch.
- Returns: An error.
-
func (s *Stmt) Exec() error
- Interface Description: Executes the batch.
- Returns: An error.
-
func (s *Stmt) GetAffectedRows() int
- Interface Description: Gets the number of affected rows.
- Returns: The number of affected rows.
-
func (s *Stmt) UseResult() (*Rows, error)
- Interface Description: Uses the result.
- Returns: A
Rows
object and an error.
The Rows
interface for row results references the sql/driver
package and provides the following interfaces:
-
func (rs *Rows) Columns() []string
- Interface Description: Returns the column names.
- Returns: The column names.
-
func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string
- Interface Description: Returns the database name of the column type.
- Parameters:
i
: Column index.
- Returns: The column type name.
-
func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)
- Interface Description: Returns the column length.
- Parameters:
i
: Column index.
- Returns: The column length and whether it has a length.
-
func (rs *Rows) ColumnTypeScanType(i int) reflect.Type
- Interface Description: Returns the Go type corresponding to the column type.
- Parameters:
i
: Column index.
- Returns: The column type.
-
func (rs *Rows) Next(dest []driver.Value) error
- Interface Description: Prepares the next row of data and assigns it to the destination.
- Parameters:
dest
: The target values.
- Returns: An error.
-
func (rs *Rows) Close() error
- Interface Description: Closes the rows.
- Returns: An error.
Parameter Binding
The common/param
package provides data structures for parameter binding.
Parameter Offset Setting Interfaces
-
func NewParam(size int) *Param
- Interface Description: Creates a parameter binding data structure.
- Parameters:
size
: The number of parameters.
- Returns: A
Param
object.
-
func (p *Param) SetBool(offset int, value bool)
- Interface Description: Sets a boolean value.
- Parameters:
offset
: The offset (column or tag).value
: The boolean value.
-
func (p *Param) SetNull(offset int)
- Interface Description: Sets a null value.
- Parameters:
offset
: The offset (column or tag).
-
func (p *Param) SetTinyint(offset int, value int)
- Interface Description: Sets a Tinyint value.
- Parameters:
offset
: The offset (column or tag).value
: The Tinyint value.
-
func (p *Param) SetSmallint(offset int, value int)
- Interface Description: Sets a Smallint value.
- Parameters:
offset
: The offset (column or tag).value
: The Smallint value.
-
func (p *Param) SetInt(offset int, value int)
- Interface Description: Sets an Int value.
- Parameters:
offset
: The offset (column or tag).value
: The Int value.
-
func (p *Param) SetBigint(offset int, value int)
- Interface Description: Sets a Bigint value.
- Parameters:
offset
: The offset (column or tag).value
: The Bigint value.
-
func (p *Param) SetUTinyint(offset int, value uint)
- Interface Description: Sets a UTinyint value.
- Parameters:
offset
: The offset (column or tag).value
: The UTinyint value.
-
func (p *Param) SetUSmallint(offset int, value uint)
- Interface Description: Sets a USmallint value.
- Parameters:
offset
: The offset (column or tag).value
: The USmallint value.
-
func (p *Param) SetUInt(offset int, value uint)
- Interface Description: Sets a UInt value.
- Parameters:
offset
: The offset (column or tag).value
: The UInt value.
-
func (p *Param) SetUBigint(offset int, value uint)
- Interface Description: Sets a UBigint value.
- Parameters:
offset
: The offset (column or tag).value
: The UBigint value.
-
func (p *Param) SetFloat(offset int, value float32)
- Interface Description: Sets a Float value.
- Parameters:
offset
: The offset (column or tag).value
: The Float value.
-
func (p *Param) SetDouble(offset int, value float64)
- Interface Description: Sets a Double value.
- Parameters:
offset
: The offset (column or tag).value
: The Double value.
-
func (p *Param) SetBinary(offset int, value []byte)
- Interface Description: Sets a Binary value.
- Parameters:
offset
: The offset (column or tag).value
: The Binary value.
-
func (p *Param) SetVarBinary(offset int, value []byte)
- Interface Description: Sets a VarBinary value.
- Parameters:
offset
: The offset (column or tag).value
: The VarBinary value.
-
func (p *Param) SetNchar(offset int, value string)
- Interface Description: Sets an Nchar value.
- Parameters:
offset
: The offset (column or tag).value
: The Nchar value.
-
func (p *Param) SetTimestamp(offset int, value time.Time, precision int)
- Interface Description: Sets a Timestamp value.
- Parameters:
offset
: The offset (column or tag).value
: The Timestamp value.precision
: The time precision.
-
func (p *Param) SetJson(offset int, value []byte)
- Interface Description: Sets a Json value.
- Parameters:
offset
: The offset (column or tag).value
: The Json value.
-
func (p *Param) SetGeometry(offset int, value []byte)
- Interface Description: Sets a Geometry value.
- Parameters:
offset
: The offset (column or tag).value
: The Geometry value.
Chained Call Parameter Setting Interfaces
func (p *Param) AddBool(value bool) *Param
- Interface Description: Adds a boolean value.
- Parameters:
value
: The boolean value.
- Returns: A
Param
object.
Other types are similar to boolean values, and the specific interfaces are as follows:
- AddNull
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
Below are the interfaces for setting column type information:
-
func NewColumnType(size int) *ColumnType
- Interface Description: Creates a data structure for column type information.
- Parameter Description:
size
: Number of columns.
- Return Value: ColumnType object.
-
func (c *ColumnType) AddBool() *ColumnType
- Interface Description: Adds a boolean type.
- Return Value: ColumnType object.
Other types are similar to the boolean type, and the specific interfaces are as follows:
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
Data Subscription
The Go driver supports data subscription functionality and provides data subscription interfaces based on native connections and WebSocket connections. The native implementation is in the af/tmq
package, and the WebSocket implementation is in the ws/tmq
package.
Consumer
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- Interface Description: Creates a consumer.
- Parameter Description:
conf
: Configuration information.
- Return Value: Consumer object, error information.
The configuration information is defined as:
type ConfigValue interface{}
type ConfigMap map[string]ConfigValue
The list of supported properties for creating a consumer:
ws.url
: WebSocket connection address.ws.message.channelLen
: WebSocket message channel cache length, default is 0.ws.message.timeout
: WebSocket message timeout, default is 5m.ws.message.writeWait
: WebSocket message write timeout, default is 10s.ws.message.enableCompression
: Whether WebSocket compression is enabled, default is false.ws.autoReconnect
: Whether WebSocket automatically reconnects, default is false.ws.reconnectIntervalMs
: WebSocket reconnection interval in milliseconds, default is 2000.ws.reconnectRetryCount
: WebSocket reconnection retry count, default is 3.
For other parameters, please refer to the Consumer parameter list. Note that the default value of auto.offset.reset
in message subscriptions has changed since TDengine server version 3.2.0.0.
-
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- Interface Description: Subscribes to a topic.
- Parameter Description:
topic
: Topic.rebalanceCb
: Rebalance callback (not used).
- Return Value: Error information.
-
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- Interface Description: Subscribes to a list of topics.
- Parameter Description:
topics
: List of topics.rebalanceCb
: Rebalance callback (not used).
- Return Value: Error information.
-
func (c *Consumer) Unsubscribe() error
- Interface Description: Unsubscribes.
- Return Value: Error information.
-
func (c *Consumer) Poll(timeoutMs int) tmq.Event
- Interface Description: Polls for events.
- Parameter Description:
timeoutMs
: Timeout period.
- Return Value: Event.
-
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- Interface Description: Commits offsets.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)
- Interface Description: Gets assignment information.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error
- Interface Description: Jumps to an offset.
- Parameter Description:
partition
: Partition and offset information.ignoredTimeoutMs
: Timeout period (not used).
- Return Value: Error information.
-
func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)
- Interface Description: Gets committed offsets.
- Parameter Description:
partitions
: List of partitions.timeoutMs
: Timeout period.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)
- Interface Description: Commits offsets.
- Parameter Description:
offsets
: List of offsets.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)
- Interface Description: Gets current offsets.
- Parameter Description:
partitions
: List of partitions.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Close() error
- Interface Description: Closes the consumer.
- Return Value: Error information.
Consumption Records
When Poll
returns a tmq.Event
event, you can determine the type of tmq.Event
to obtain consumption records or error information. When the type is *tmq.DataMessage
, you can access the consumption records.
-
func (m *DataMessage) Topic() string
- Interface Description: Get the topic.
- Return Value: The topic.
-
func (m *DataMessage) DBName() string
- Interface Description: Get the database name.
- Return Value: The database name.
-
func (m *DataMessage) Offset() Offset
- Interface Description: Get the offset.
- Return Value: The offset.
-
func (m *DataMessage) Value() interface{}
- Interface Description: Get the value, specifically the value is
[]*tmq.data
. - Return Value: The consumed value.
- Interface Description: Get the value, specifically the value is
The structure of tmq.data
is as follows:
type Data struct {
TableName string
Data [][]driver.Value
}
TableName
is the name of the table.Data
is the data, where each element is a row of data, and each row of data is an array with the elements being column values.
When the Poll returns a type of tmq.Error
, you can use func (e Error) Error() string
to obtain the error information.
Partition Information
When the consumed data type is *tmq.DataMessage
, you can obtain partition information from the TopicPartition
attribute.
type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
Topic
: The topic.Partition
: The partition.Offset
: The offset.Metadata
: Metadata (not used).Error
: Error information.
You can use func (p TopicPartition) String() string
to obtain partition information.
Offset Metadata
The offset information obtained from TopicPartition
can be accessed via the Offset
attribute. An offset of -2147467247
indicates that the offset has not been set.
Deserialization
When the consumed data type is *tmq.DataMessage
, you can use func (m *DataMessage) Value() interface{}
to obtain the data, which is of type []*tmq.data
.