Go Client Library
driver-go
is the official Go language connector for TDengine, implementing the interface of the Go language database/sql package. Go developers can use it to develop applications that access data in the TDengine cluster.
Compatibility
Supports a minimum Go version of 1.14, but the latest version of Go is recommended.
Supported Platforms
Native connections support the same platforms as the TDengine client driver. REST connections support all platforms that can run Go.
Version Support
Please refer to the version support list.
Handling Exceptions
If it is a TDengine error, you can obtain the error code and error message as follows.
// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}
For errors in other TDengine modules, please refer to Error Codes.
Data Type Mapping
TDengine DataType | Go Type |
---|---|
TIMESTAMP | time.Time |
TINYINT | int8 |
SMALLINT | int16 |
INT | int32 |
BIGINT | int64 |
TINYINT UNSIGNED | uint8 |
SMALLINT UNSIGNED | uint16 |
INT UNSIGNED | uint32 |
BIGINT UNSIGNED | uint64 |
FLOAT | float32 |
DOUBLE | float64 |
BOOL | bool |
BINARY | string |
NCHAR | string |
JSON | []byte |
GEOMETRY | []byte |
VARBINARY | []byte |
Note: The JSON type is only supported in tags. The GEOMETRY type is binary data in little endian byte order, conforming to the WKB standard. For more details, please refer to Data Types For the WKB standard, please refer to Well-Known Binary (WKB).
Example Programs Summary
For the source code of the example programs, please refer to: Example Programs.
Frequently Asked Questions
-
Crashes related to stmt (parameter binding) interfaces in database/sql
REST does not support interfaces related to parameter binding, it is recommended to use
db.Exec
anddb.Query
. -
Error
[0x217] Database not specified or available
occurs after executing other statements following theuse db
statementIn the REST interface, the execution of SQL statements has no context association, and the
use db
statement will not take effect. See the usage restrictions section above for solutions. -
No error with taosSql but error
[0x217] Database not specified or available
with taosRestfulSince the REST interface is stateless, the
use db
statement will not take effect. See the usage restrictions section above for solutions. -
No significant effect after increasing the
readBufferSize
parameterIncreasing
readBufferSize
will reduce the number ofsyscall
calls when fetching results. If the data volume of the query results is not large, modifying this parameter will not bring significant improvement. If this parameter is increased too much, the bottleneck will be in parsing JSON data. To optimize query speed, adjust this value according to the actual situation to achieve the best query effect. -
Query efficiency decreases when
disableCompression
parameter is set tofalse
When the
disableCompression
parameter is set tofalse
, the query results will be transmitted after being compressed withgzip
, and the data must be decompressed withgzip
after being received. -
go get
command cannot fetch packages, or fetching packages times outSet the Go proxy
go env -w GOPROXY=https://goproxy.cn,direct
.
API Reference
database/sql Driver
driver-go
implements the Go database/sql/driver
interface, allowing direct use of the Go database/sql
package. It provides three drivers: github.com/taosdata/driver-go/v3/taosSql
, github.com/taosdata/driver-go/v3/taosRestful
, and github.com/taosdata/driver-go/v3/taosWS
corresponding to native connection
, REST connection
, and WebSocket connection
.
DSN Specification
The Data Source Name has a generic format, similar to PEAR DB, but without the type prefix (brackets indicate optional):
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN]
Full form of DSN:
username:password@protocol(address)/dbname?param=value
Native Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
Use taosSql
as driverName
and a correct DSN as dataSourceName
as follows:
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
Supported DSN parameters:
cfg
specifies the taos.cfg directorycgoThread
specifies the number of cgo executions at the same time, default is the number of system corescgoAsyncHandlerPoolSize
specifies the size of the async function handle, default is 10000
Rest Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
Use taosRestful
as driverName
and a correct DSN as dataSourceName
as follows:
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
Supported DSN parameters:
disableCompression
whether to accept compressed data, default is true which means not accepting compressed data, set to false if transferring data using gzip compression.readBufferSize
the size of the buffer for reading data defaults to 4K (4096), this value can be increased appropriately when the query result data volume is large.token
the token used when connecting to cloud services.skipVerify
whether to skip certificate verification, default is false which means not skipping certificate verification, set to true if connecting to an insecure service.
WebSocket Connection
Import the driver:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)
Use taosWS
as driverName
and use a correct DSN as dataSourceName
as follows:
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
Supported DSN parameters:
enableCompression
whether to send compressed data, default is false which means not sending compressed data, set to true if transferring data using compression.readTimeout
the timeout for reading data, default is 5m.writeTimeout
the timeout for writing data, default is 10s.
- Unlike the native connection method, the REST interface is stateless. When using REST connections, you need to specify the database name of tables and supertables in SQL.
- If
dbname
is specified in the DSN, then, the REST connection will default to using /rest/sql/dbname as the restful request URL, and it is not necessary to specify dbname in SQL.
Connection Features
The Go driver supports creating connections, returning objects that support the sql/driver
standard Connector
interface, and also provides the af
package, which expands some schema-less writing interfaces.
Standard Interface
Interface for creating connections in the database/sql
package
func Open(driverName, dataSourceName string) (*DB, error)
- Interface Description: Connect to the database (
database/sql
). - Parameter Description:
driverName
: Driver name.dataSourceName
: Connection parameters DSN.
- Return Value: Connection object, error information.
- Interface Description: Connect to the database (
Extended Interface
Interface for creating connections in the af
package
func Open(host, user, pass, db string, port int) (*Connector, error)
- Interface Description: Connect to the database.
- Parameter Description:
host
: Host address.user
: Username.pass
: Password.db
: Database name.port
: Port number.
- Return Value: Connection object, error information.
Schema-less Writing
Interfaces for schema-less writing using native connections in the af
package.
-
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error
- Interface Description: Schema-less writing of InfluxDB format data.
- Parameter Description:
lines
: Data to write.precision
: Time precision.
- Return Value: Error information.
-
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error
- Interface Description: Schema-less writing of OpenTSDB JSON format data.
- Parameter Description:
payload
: Data to write.
- Return Value: Error information.
-
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error
- Interface Description: Schema-less writing of OpenTSDB Telnet format data.
- Parameter Description:
lines
: Data to write.
- Return Value: Error information.
ws/schemaless
package uses WebSocket schemaless write interface
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
- Interface Description: Schemaless data insertion.
- Parameter Description:
lines
: Data to be written.protocol
: Supported protocols for data writingInfluxDBLineProtocol = 1
OpenTSDBTelnetLineProtocol = 2
OpenTSDBJsonFormatProtocol = 3
.precision
: Time precision.ttl
: Data expiration time, 0 means never expires.reqID
: Request ID.
- Return Value: Error information.
Execute SQL
The Go driver provides interfaces compliant with database/sql
standards, supporting the following features:
- Execute SQL Statements: Execute static SQL statements and return the resulting object.
- Query Execution: Can execute queries that return data sets (
SELECT
statements). - Update Execution: Can execute SQL statements that affect rows, such as
INSERT
,UPDATE
,DELETE
, etc. - Get Results: Can obtain and traverse the result set returned after query execution.
- Get Update Count: For non-query SQL statements, can obtain the number of rows affected after execution.
- Close Resources: Release database resources.
Standard Interfaces
-
func (db *DB) Close() error
- Interface Description: Close the connection.
- Return Value: Error information.
-
func (db *DB) Exec(query string, args ...any) (Result, error)
- Interface Description: Execute a query without returning any rows.
- Parameter Description:
query
: Command to execute.args
: Command arguments.
- Return Value: Result object (only affected rows), error information.
-
func (db *DB) Query(query string, args ...any) (*Rows, error)
- Interface Description: Execute a query and return the results as rows.
- Parameter Description:
query
: Command to execute.args
: Command arguments.
- Return Value: Rows object, error information.
-
func (db *DB) QueryRow(query string, args ...any) *Row
- Interface Description: Execute a query and return a single row result.
- Parameter Description:
query
: Command to execute.args
: Command arguments.
- Return Value: Row object.
Extended Interfaces
-
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- Interface Description: Execute a query without returning any rows.
- Parameter Description:
ctx
: Context, use Value to pass request ID for link tracing, key istaos_req_id
value is an int64 type.query
: Command to execute.args
: Command arguments.
- Return Value: Result object (only affected rows), error information.
-
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- Interface Description: Execute a query and return the results as rows.
- Parameter Description:
ctx
: Context, use Value to pass request ID for link tracing, key istaos_req_id
value is an int64 type.query
: Command to execute.args
: Command arguments.
- Return Value: Rows object, error information.
-
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row
- Interface Description: Executes a query and returns a single row result. Errors are deferred until the row is scanned.
- Parameter Description:
ctx
: Context, uses Value to pass the request ID for link tracing, key istaos_req_id
and value is an int64 type.query
: Command to execute.args
: Command parameters.
- Return Value: Single row result Row object.
Result Retrieval
The Go driver supports retrieving query result sets and corresponding metadata, providing methods to read metadata and data from the result set.
Result Set
Retrieve the query result set through the Rows
object, which provides the following methods:
-
func (rs *Rows) Next() bool
- Interface Description: Prepares the next row of data.
- Return Value: Whether there is another row of data.
-
func (rs *Rows) Columns() ([]string, error)
- Interface Description: Returns column names.
- Return Value: Column names, error information.
-
func (rs *Rows) Scan(dest ...any) error
- Interface Description: Copies the current row's column values into the values pointed to by dest.
- Parameter Description:
dest
: Target values.
- Return Value: Error information.
-
func (rs *Rows) Close() error
- Interface Description: Closes the rows.
- Return Value: Error information.
-
func (r *Row) Scan(dest ...any) error
- Interface Description: Copies the current row's column values into the values pointed to by dest.
- Parameter Description:
dest
: Target values.
- Return Value: Error information.
Retrieve the update result set through the Result
object, which provides the following method:
func (dr driverResult) RowsAffected() (int64, error)
- Interface Description: Returns the number of rows affected.
- Return Value: Number of rows affected, error information.
Result Set Metadata
Retrieve query result set metadata through the Rows
object, providing the following methods:
-
func (rs *Rows) ColumnTypes() ([]*ColumnType, error)
- Interface Description: Returns column types.
- Return Value: Column types, error information.
-
func (ci *ColumnType) Name() string
- Interface Description: Returns the column name.
- Return Value: Column name.
-
func (ci *ColumnType) Length() (length int64, ok bool)
- Interface Description: Returns the column length.
- Return Value: Column length, whether there is a length.
-
func (ci *ColumnType) ScanType() reflect.Type
- Interface Description: Returns the Go type corresponding to the column type.
- Return Value: Column type.
-
func (ci *ColumnType) DatabaseTypeName() string
- Interface Description: Returns the database name of the column type.
- Return Value: Column type name.
Parameter Binding
Prepare allows the use of precompiled SQL statements, which can improve performance and provide the ability for parameterized queries, thereby increasing security.
Standard Interface
Use the Prepare
method in the Conn
interface of sql/driver
to prepare a statement bound to this connection, returning a Stmt
object for use.
-
Prepare(query string) (Stmt, error)
- Interface Description: Prepares and returns a statement bound to this connection.
- Parameter Description:
query
: Statement for parameter binding.
- Return Value: Stmt object, error information.
-
func (s *Stmt) Exec(args ...any) (Result, error)
- Interface Description: Executes the prepared statement with the given parameters and returns a result summarizing the effect of the statement (only column values can be bound, not table names or tags).
- Parameter Description:
args
: Command parameters, Go native types are automatically converted to database types, type mismatches may lose precision, it is recommended to use the same type as the database, time types use int64 orRFC3339Nano
formatted strings.
- Return Value: Result object (only affected rows), error information.
-
func (s *Stmt) Query(args ...any) (*Rows, error)
- Interface Description: Executes the prepared statement with the given arguments and returns the result rows.
- Parameter Description:
args
: Command arguments, Go native types will automatically convert to database types, type mismatches may lose precision, it is recommended to use the same type as the database, time types use int64 orRFC3339Nano
formatted strings.
- Return Value: Result set Rows object, error information.
-
func (s *Stmt) Close() error
- Interface Description: Closes the statement.
- Return Value: Error information.
Extended Interfaces
The af
package provides more interfaces using native connections for parameter binding
-
func (conn *Connector) Stmt() *Stmt
- Interface Description: Returns a Stmt object bound to this connection.
- Return Value: Stmt object.
-
func (s *Stmt) Prepare(sql string) error
- Interface Description: Prepares an SQL.
- Parameter Description:
sql
: The statement for parameter binding.
- Return Value: Error information.
-
func (s *Stmt) NumParams() (int, error)
- Interface Description: Returns the number of parameters.
- Return Value: Number of parameters, error information.
-
func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param) error
- Interface Description: Sets the table name and tags.
- Parameter Description:
tableName
: Table name.tags
: Tags.
- Return Value: Error information.
-
func (s *Stmt) SetTableName(tableName string) error
- Interface Description: Sets the table name.
- Parameter Description:
tableName
: Table name.
- Return Value: Error information.
-
func (s *Stmt) BindRow(row *param.Param) error
- Interface Description: Binds a row.
- Parameter Description:
row
: Row data.
- Return Value: Error information.
-
func (s *Stmt) GetAffectedRows() int
- Interface Description: Gets the number of affected rows.
- Return Value: Number of affected rows.
-
func (s *Stmt) AddBatch() error
- Interface Description: Adds a batch.
- Return Value: Error information.
-
func (s *Stmt) Execute() error
- Interface Description: Executes the batch.
- Return Value: Error information.
-
func (s *Stmt) UseResult() (driver.Rows, error)
- Interface Description: Uses the result.
- Return Value: Result set Rows object, error information.
-
func (s *Stmt) Close() error
- Interface Description: Closes the statement.
- Return Value: Error information.
The ws/stmt
package provides interfaces for parameter binding via WebSocket
-
func (c *Connector) Init() (*Stmt, error)
- Interface Description: Initialization.
- Return Value: Stmt object, error information.
-
func (s *Stmt) Prepare(sql string) error
- Interface Description: Prepares an SQL.
- Parameter Description:
sql
: The statement for parameter binding.
- Return Value: Error information.
-
func (s *Stmt) SetTableName(name string) error
- Interface Description: Sets the table name.
- Parameter Description:
name
: Table name.
- Return Value: Error information.
-
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)
- Interface Description: Set tags.
- Parameter Description:
tags
: tag.bindType
: Type information.
- Return Value: Error information.
-
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
- Interface Description: Bind parameters.
- Parameter Description:
params
: Parameters.bindType
: Type information.
- Return Value: Error information.
-
func (s *Stmt) AddBatch() error
- Interface Description: Add batch processing.
- Return Value: Error information.
-
func (s *Stmt) Exec() error
- Interface Description: Execute batch processing.
- Return Value: Error information.
-
func (s *Stmt) GetAffectedRows() int
- Interface Description: Get the number of affected rows.
- Return Value: Number of affected rows.
-
func (s *Stmt) UseResult() (*Rows, error)
- Interface Description: Use result.
- Return Value: Rows object, error information.
-
func (s *Stmt) Close() error
- Interface Description: Close statement.
- Return Value: Error information.
Rows row results refer to the Rows
interface in the sql/driver
package, providing the following interfaces:
-
func (rs *Rows) Columns() []string
- Interface Description: Return column names.
- Return Value: Column names.
-
func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string
- Interface Description: Return the database name of the column type.
- Parameter Description:
i
: Column index.
- Return Value: Column type name.
-
func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)
- Interface Description: Return column length.
- Parameter Description:
i
: Column index.
- Return Value: Column length, whether there is a length.
-
func (rs *Rows) ColumnTypeScanType(i int) reflect.Type
- Interface Description: Return the Go type corresponding to the column type.
- Parameter Description:
i
: Column index.
- Return Value: Column type.
-
func (rs *Rows) Next(dest []driver.Value) error
- Interface Description: Prepare the next row of data and assign it to the target.
- Parameter Description:
dest
: Target values.
- Return Value: Error information.
-
func (rs *Rows) Close() error
- Interface Description: Close rows.
- Return Value: Error information.
The common/param
package provides a parameter binding data structure.
Below are the interfaces for setting parameters by offset:
-
func NewParam(size int) *Param
- Interface Description: Create a parameter binding data structure.
- Parameter Description:
size
: Number of parameters.
- Return Value: Param object.
-
func (p *Param) SetBool(offset int, value bool)
- Interface Description: Set a boolean value.
- Parameter Description:
offset
: Offset (column or tag).value
: Boolean value.
-
func (p *Param) SetNull(offset int)
- Interface Description: Set a null value.
- Parameter Description:
offset
: Offset (column or tag).
-
func (p *Param) SetTinyint(offset int, value int)
- Interface Description: Set Tinyint value.
- Parameter Description:
offset
: Offset (column or tag).value
: Tinyint value.
-
func (p *Param) SetSmallint(offset int, value int)
- Interface Description: Set Smallint value.
- Parameter Description:
offset
: Offset (column or tag).value
: Smallint value.
-
func (p *Param) SetInt(offset int, value int)
- Interface Description: Set Int value.
- Parameter Description:
offset
: Offset (column or tag).value
: Int value.
-
func (p *Param) SetBigint(offset int, value int)
- Interface Description: Set Bigint value.
- Parameter Description:
offset
: Offset (column or tag).value
: Bigint value.
-
func (p *Param) SetUTinyint(offset int, value uint)
- Interface Description: Set UTinyint value.
- Parameter Description:
offset
: Offset (column or tag).value
: UTinyint value.
-
func (p *Param) SetUSmallint(offset int, value uint)
- Interface Description: Set USmallint value.
- Parameter Description:
offset
: Offset (column or tag).value
: USmallint value.
-
func (p *Param) SetUInt(offset int, value uint)
- Interface Description: Set UInt value.
- Parameter Description:
offset
: Offset (column or tag).value
: UInt value.
-
func (p *Param) SetUBigint(offset int, value uint)
- Interface Description: Set UBigint value.
- Parameter Description:
offset
: Offset (column or tag).value
: UBigint value.
-
func (p *Param) SetFloat(offset int, value float32)
- Interface Description: Set Float value.
- Parameter Description:
offset
: Offset (column or tag).value
: Float value.
-
func (p *Param) SetDouble(offset int, value float64)
- Interface Description: Set Double value.
- Parameter Description:
offset
: Offset (column or tag).value
: Double value.
-
func (p *Param) SetBinary(offset int, value []byte)
- Interface Description: Set Binary value.
- Parameter Description:
offset
: Offset (column or tag).value
: Binary value.
-
func (p *Param) SetVarBinary(offset int, value []byte)
- Interface Description: Set VarBinary value.
- Parameter Description:
offset
: Offset (column or tag).value
: VarBinary value.
-
func (p *Param) SetNchar(offset int, value string)
- Interface Description: Set Nchar value.
- Parameter Description:
offset
: Offset (column or tag).value
: Nchar value.
-
func (p *Param) SetTimestamp(offset int, value time.Time, precision int)
- Interface Description: Sets the Timestamp value.
- Parameter Description:
offset
: Offset (column or tag).value
: Timestamp value.precision
: Time precision.
-
func (p *Param) SetJson(offset int, value []byte)
- Interface Description: Sets the Json value.
- Parameter Description:
offset
: Offset (column or tag).value
: Json value.
-
func (p *Param) SetGeometry(offset int, value []byte)
- Interface Description: Sets the Geometry value.
- Parameter Description:
offset
: Offset (column or tag).value
: Geometry value.
Below are the interfaces for setting parameters via chain calls:
func (p *Param) AddBool(value bool) *Param
- Interface Description: Adds a boolean value.
- Parameter Description:
value
: Boolean value.
- Return Value: Param object.
Other types similar to boolean are as follows:
- AddNull
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
Below are the interfaces for setting column type information:
-
func NewColumnType(size int) *ColumnType
- Interface Description: Creates a column type information data structure.
- Parameter Description:
size
: Number of columns.
- Return Value: ColumnType object.
-
func (c *ColumnType) AddBool() *ColumnType
- Interface Description: Adds a boolean type.
- Return Value: ColumnType object.
Other types similar to boolean are as follows:
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
Data Subscription
The Go driver supports data subscription features, providing interfaces for data subscription via native connections and WebSocket connections. Native implementation is in the af/tmq
package, WebSocket implementation is in the ws/tmq
package.
Consumer
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- Interface Description: Creates a consumer.
- Parameter Description:
conf
: Configuration information.
- Return Value: Consumer object, error information.
Configuration information is defined as:
type ConfigValue interface{}
type ConfigMap map[string]ConfigValue
Creating a consumer supports the following properties:
ws.url
: WebSocket connection URL.ws.message.channelLen
: WebSocket message channel buffer length, default 0.ws.message.timeout
: WebSocket message timeout, default 5m.ws.message.writeWait
: WebSocket message write timeout, default 10s.ws.message.enableCompression
: Whether WebSocket compression is enabled, default false.ws.autoReconnect
: Whether WebSocket automatically reconnects, default false.ws.reconnectIntervalMs
: WebSocket reconnect interval in milliseconds, default 2000.ws.reconnectRetryCount
: WebSocket reconnect retry count, default 3.
For other parameters, please refer to: Consumer Parameter List, note that starting from version 3.2.0.0 of the TDengine server, the default value of auto.offset.reset
in message subscriptions has changed.
-
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- Interface Description: Subscribe to a topic.
- Parameter Description:
topic
: Topic.rebalanceCb
: Rebalance callback (unused).
- Return Value: Error information.
-
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- Interface Description: Subscribe to a list of topics.
- Parameter Description:
topics
: List of topics.rebalanceCb
: Rebalance callback (unused).
- Return Value: Error information.
-
func (c *Consumer) Unsubscribe() error
- Interface Description: Unsubscribe.
- Return Value: Error information.
-
func (c *Consumer) Poll(timeoutMs int) tmq.Event
- Interface Description: Poll for events.
- Parameter Description:
timeoutMs
: Timeout in milliseconds.
- Return Value: Event.
-
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- Interface Description: Commit offsets.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)
- Interface Description: Get assignment information.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error
- Interface Description: Seek to an offset.
- Parameter Description:
partition
: Partition and offset information.ignoredTimeoutMs
: Timeout in milliseconds (unused).
- Return Value: Error information.
-
func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)
- Interface Description: Get committed offsets.
- Parameter Description:
partitions
: List of partitions.timeoutMs
: Timeout in milliseconds.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)
- Interface Description: Commit offsets.
- Parameter Description:
offsets
: List of offsets.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)
- Interface Description: Get current offsets.
- Parameter Description:
partitions
: List of partitions.
- Return Value: List of TopicPartition, error information.
-
func (c *Consumer) Close() error
- Interface Description: Close the consumer.
- Return Value: Error information.
Consumption Records
When Poll
returns a tmq.Event
event, you can obtain the consumption record or error information by determining the type of tmq.Event
. When the type is *tmq.DataMessage
, you can get the consumption record.
-
func (m *DataMessage) Topic() string
- Interface Description: Get the topic.
- Return Value: Topic.
-
func (m *DataMessage) DBName() string
- Interface Description: Get the database name.
- Return Value: Database name.
-
func (m *DataMessage) Offset() Offset
- Interface Description: Get the offset.
- Return Value: Offset.
-
func (m *DataMessage) Value() interface{}
- Interface Description: Get the value, the specific value is
[]*tmq.data
. - Return Value: Consumed value.
- Interface Description: Get the value, the specific value is
Structure of tmq.data:
type Data struct {
TableName string
Data [][]driver.Value
}
- TableName is the table name.
- Data is the data, each element is a row of data, each row of data is an array, and the elements of the array are column values.
When Poll returns a type of tmq.Error
, you can use func (e Error) Error() string
to get the error information.
Partition Information
When the consumed data type is *tmq.DataMessage
, you can obtain partition information from the TopicPartition
attribute.
type TopicPartition struct {
Topic *string
Partition int32
Offset Offset
Metadata *string
Error error
}
Topic
: Topic.Partition
: Partition.Offset
: Offset.Metadata
: Metadata (unused).Error
: Error information.
You can use func (p TopicPartition) String() string
to get partition information.
Offset Metadata
The offset information obtained from TopicPartition
can be accessed through the Offset
attribute. When the offset is -2147467247
, it indicates that the offset is not set.
Deserialization
When the consumed data type is *tmq.DataMessage
, you can use func (m *DataMessage) Value() interface{}
to get the data, the data type is []*tmq.data
.