Managing Consumers
TDengine provides data subscription and consumption interfaces similar to those of message queue products. In many scenarios, by adopting TDengine's time-series big data platform, there is no need to integrate additional message queue products, thus simplifying application design and reducing maintenance costs. This chapter introduces the related APIs and usage methods for data subscription with various language connectors. For basic information on data subscription, please refer to Data Subscription
Creating Topics
Please use taos shell or refer to the Execute SQL section to execute the SQL for creating topics: CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters
The above SQL will create a subscription named topic_meters. Each record in the messages obtained using this subscription is composed of the columns selected by this query statement SELECT ts, current, voltage, phase, groupid, location FROM meters
.
Note In the implementation of TDengine connectors, there are the following limitations for subscription queries.
- Query statement limitation: Subscription queries can only use select statements and do not support other types of SQL, such as subscribing to databases, subscribing to supertables (non-select methods), insert, update, or delete, etc.
- Raw data query: Subscription queries can only query raw data, not aggregated or calculated results.
- Time order limitation: Subscription queries can only query data in chronological order.
Creating Consumers
The concept of TDengine consumers is similar to Kafka, where consumers receive data streams by subscribing to topics. Consumers can be configured with various parameters, such as connection methods, server addresses, automatic Offset submission, etc., to suit different data processing needs. Some language connectors' consumers also support advanced features such as automatic reconnection and data transmission compression to ensure efficient and stable data reception.
Creation Parameters
There are many parameters for creating consumers, which flexibly support various connection types, Offset submission methods, compression, reconnection, deserialization, and other features. The common basic configuration items applicable to all language connectors are shown in the following table:
Parameter Name | Type | Description | Remarks |
---|---|---|---|
td.connect.ip | string | Server IP address | |
td.connect.user | string | Username | |
td.connect.pass | string | Password | |
td.connect.port | integer | Server port number | |
group.id | string | Consumer group ID, the same consumer group shares consumption progress | Required. Maximum length: 192. Each topic can have up to 100 consumer groups |
client.id | string | Client ID | Maximum length: 192 |
auto.offset.reset | enum | Initial position of the consumer group subscription | earliest : default(version < 3.2.0.0); subscribe from the beginning; latest : default(version >= 3.2.0.0); only subscribe from the latest data; none : cannot subscribe without a committed offset |
enable.auto.commit | boolean | Whether to enable automatic consumption point submission, true: automatic submission, client application does not need to commit; false: client application needs to commit manually | Default is true |
auto.commit.interval.ms | integer | Time interval for automatically submitting consumption records, in milliseconds | Default is 5000 |
msg.with.table.name | boolean | Whether to allow parsing the table name from the message, not applicable to column subscription (column subscription can write tbname as a column in the subquery statement) (from version 3.2.0.0 this parameter is deprecated, always true) | Default is off |
enable.replay | boolean | Whether to enable data replay function | Default is off |
session.timeout.ms | integer | Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 12000, range [6000, 1800000] |
max.poll.interval.ms | integer | The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 300000, range [1000, INT32_MAX] |
Below are the connection parameters for connectors in various languages:
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
The parameters for creating a consumer with the Java connector are Properties. For a list of parameters you can set, please refer to Consumer Parameters
For other parameters, refer to the common basic configuration items mentioned above.
The td.connect.websocket.scheme
parameter is provided to indicate the protocol type, other parameters are the same as the common basic configuration items.
Supported properties list for creating consumers:
ws.url
: WebSocket connection address.ws.message.channelLen
: WebSocket message channel buffer length, default 0.ws.message.timeout
: WebSocket message timeout, default 5m.ws.message.writeWait
: WebSocket message write timeout, default 10s.ws.message.enableCompression
: Whether to enable compression for WebSocket, default false.ws.autoReconnect
: Whether WebSocket should automatically reconnect, default false.ws.reconnectIntervalMs
: WebSocket reconnect interval in milliseconds, default 2000.ws.reconnectRetryCount
: WebSocket reconnect retry count, default 3.
See the table above for other parameters.
The parameters for creating a consumer with the Rust connector are DSN. For a list of parameters you can set, please refer to DSN
For other parameters, refer to the common basic configuration items mentioned above.
The WS_URL
parameter is provided to indicate the server address to connect to, other parameters are the same as the common basic configuration items.
Supported properties list for creating consumers:
useSSL
: Whether to use SSL connection, default false.token
: Token for connecting to TDengine cloud.ws.message.enableCompression
: Whether to enable WebSocket compression, default false.ws.autoReconnect
: Whether to automatically reconnect, default false.ws.reconnect.retry.count
: Reconnect attempts, default 3.ws.reconnect.interval.ms
: Reconnect interval in milliseconds, default 2000.
See the table above for other parameters.
- WebSocket connection: Since it uses dsn, the four configuration items
td.connect.ip
,td.connect.port
,td.connect.user
, andtd.connect.pass
are not needed, the rest are the same as the common configuration items. - Native connection: Same as the common basic configuration items.
Not supported
WebSocket Connection
Introduces how connectors in various languages use WebSocket connection method to create consumers. Specify the server address to connect, set auto-commit, start consuming from the latest message, specify group.id
and client.id
, etc. Some language connectors also support deserialization parameters.
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
def create_consumer():
try:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": tdConnWsScheme,
"group.id": groupId,
"client.id": clientId,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": port,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
})
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}.");
return consumer;
except Exception as err:
print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.");
raise err
// create consumer
wsUrl := "ws://127.0.0.1:6041"
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": wsUrl,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
let dsn = "ws://localhost:6041".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "td.connect.port", "6041" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// create a consumer object
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
ws_tmq_conf_destroy(conf);
return tmq;
}
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
Call the build_consumer
function to attempt to obtain the consumer instance tmq
. Print a success log if successful, and a failure log if not.
Not supported
Native Connection
Introduce how connectors in various languages use native connections to create consumers. Specify the server address for the connection, set auto-commit, start consuming from the latest message, and specify information such as group.id
and client.id
. Some language connectors also support deserialization parameters.
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
from taos.tmq import Consumer
def create_consumer():
try:
consumer = Consumer(
{
"group.id": groupId,
"client.id": clientId,
"td.connect.user": user,
"td.connect.pass": password,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": str(port),
}
)
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
return consumer
except Exception as err:
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// create consumer
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
Not supported
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.port", "6030" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
return tmq;
}
tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
}
Call the build_consumer
function to attempt to obtain the consumer instance tmq
. Print a success log if successful, and a failure log if not.
Not supported
Subscribe to Consume Data
After subscribing to a topic, consumers can start receiving and processing messages from these topics. The example code for subscribing to consume data is as follows:
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- The parameters of the
subscribe
method mean: the list of topics subscribed to (i.e., names), supporting subscription to multiple topics simultaneously. poll
is called each time to fetch a message, which may contain multiple records.ResultBean
is a custom internal class, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of typeResultBean
to be deserialized using thevalue.deserializer
property's corresponding deserialization class.
def subscribe(consumer):
try:
consumer.subscribe([topic])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
- The parameters of the
subscribe
method mean: the list of topics subscribed to (i.e., names), supporting subscription to multiple topics simultaneously. poll
is called each time to fetch a message, which may contain multiple records.records
contains multiple block segments, each of which may contain multiple records.
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
case tmqcommon.Error:
log.Fatalf(
"Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.Error(),
)
}
}
}
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
- Consumers can subscribe to one or more
TOPIC
, generally it is recommended that a consumer subscribes to only oneTOPIC
. - TMQ message queue is a futures::Stream type, which can be used with the corresponding API to consume each message and mark it as consumed through
.commit
. Record
is a custom structure, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of typeRecord
to be deserialized usingserde
.
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 1; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
await wsSql.close();
}
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
await consumer.commit();
}
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}
test()
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// build a topic list used to subscribe
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
return topicList;
}
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
Steps for subscribing and consuming data:
- Call the
ws_build_topic_list
function to create a topic listtopic_list
. - If
topic_list
isNULL
, it means creation failed, and the function returns-1
. - Use the
ws_tmq_subscribe
function to subscribe to the topic list specified bytmq
. If the subscription fails, print an error message. - Destroy the topic list
topic_list
to free resources. - Call the
basic_consume_loop
function to start the basic consumption loop, processing the subscribed messages.
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- The parameters of the
subscribe
method mean: the list of topics (i.e., names) to subscribe to, supporting subscription to multiple topics simultaneously. poll
is called each time to get a message, which may contain multiple records.ResultBean
is a custom internal class, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of typeResultBean
to be deserialized based on thevalue.deserializer
property's corresponding deserialization class.
def subscribe(consumer):
try:
# subscribe to the topics
consumer.subscribe(["topic_meters"])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
data = block.fetchall()
print(f"data: {data}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
- The parameters of the
subscribe
method mean: the list of topics (i.e., names) to subscribe to, supporting subscription to multiple topics simultaneously. poll
is called each time to get a message, which may contain multiple records.records
contains multiple blocks, each of which may contain multiple records.
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
case tmqcommon.Error:
log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
}
}
}
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
- Consumers can subscribe to one or more
TOPIC
, generally it is recommended that a consumer subscribes to only oneTOPIC
. - The TMQ message queue is a futures::Stream type, which can be used with the corresponding API to consume each message and mark it as consumed with
.commit
. Record
is a custom structure, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of typeRecord
to be deserialized throughserde
.
Not supported
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
return topicList;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int32_t msg_process(TAOS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
Subscription and consumption data steps:
- Call the
build_topic_list
function to create a topic listtopic_list
. - If
topic_list
isNULL
, it means creation failed, and the function returns-1
. - Use the
tmq_subscribe
function to subscribe to the topic list specified bytmq
. If the subscription fails, print an error message. - Destroy the topic list
topic_list
to free resources. - Call the
basic_consume_loop
function to start the basic consumption loop, processing the subscribed messages.
Not supported
Specifying the Subscription Offset
Consumers can specify to start reading messages from a specific Offset in the partition, allowing them to reread messages or skip processed messages. Below is how connectors in various languages specify the subscription Offset.
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- Use the consumer.poll method to poll data until data is obtained.
- For the first batch of polled data, print the content of the first message and obtain the current consumer's partition assignment information.
- Use the consumer.seekToBeginning method to reset the offset of all partitions to the starting position and print the successful reset message.
- Poll data again using the consumer.poll method and print the content of the first message.
try:
assignments = consumer.assignment()
for assignment in assignments:
topic = assignment.topic()
print(f"topic: {topic}")
for assign in assignment.assignments():
print(
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
consumer.seek(topic, assign.vg_id(), assign.begin())
print("Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf(
"Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
- By calling the consumer.assignments() method, obtain the consumer's current partition assignment information and record the initial assignment status.
- Traverse each partition assignment information, for each partition: extract the topic, consumer group ID (vgroup_id), current offset (current), starting offset (begin), and ending offset (end). Record this information.
- Call the consumer.offset_seek method to set the offset to the starting position. If the operation fails, record the error information and current assignment status.
- After adjusting the offset for all partitions, obtain and record the consumer's partition assignment information again to confirm the status after the offset adjustment.
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
await consumer.commit();
}
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
ws_tmq_topic_assignment* p = &pAssign[i];
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
}
- Use the
ws_tmq_get_topic_assignment
function to obtain the assignment information for a specific topic, including the number of assignments and the details of each assignment. - If fetching the assignment information fails, print an error message and return.
- For each assignment, use the
ws_tmq_offset_seek
function to set the consumer's offset to the earliest offset. - If setting the offset fails, print an error message.
- Release the assignment information array to free resources.
- Call the
basic_consume_loop
function to start a new consumption loop and process messages.
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
- Use the consumer.poll method to poll data until data is obtained.
- For the first batch of polled data, print the content of the first data item and obtain the current consumer's partition assignment information.
- Use the consumer.seekToBeginning method to reset the offset of all partitions to the beginning position and print a message of successful reset.
- Poll data again using the consumer.poll method and print the content of the first data item.
try:
assignments = consumer.assignment()
if assignments:
for partition in assignments:
partition.offset = 0
consumer.seek(partition)
print(f"Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
- Obtain the consumer's current partition assignment information by calling the consumer.assignments() method and record the initial assignment status.
- For each partition assignment, extract the topic, consumer group ID (vgroup_id), current offset, beginning offset, and ending offset. Record this information.
- Use the consumer.offset_seek method to set the offset to the beginning position. If the operation fails, record the error information and the current assignment status.
- After adjusting the offset for all partitions, obtain and record the consumer's partition assignment information again to confirm the status after the offset adjustment.
Not supported
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
// let's consume the messages again
basic_consume_loop(tmq);
}
- Use the
tmq_get_topic_assignment
function to obtain the assignment information for a specific topic, including the number of assignments and the details of each assignment. - If fetching the assignment information fails, print an error message and return.
- For each assignment, use the
tmq_offset_seek
function to set the consumer's offset to the earliest offset. - If setting the offset fails, print an error message.
- Release the assignment information array to free resources.
- Call the
basic_consume_loop
function to start a new consumption loop and process messages.
Not supported
Commit Offset
After a consumer has read and processed messages, it can commit the Offset, indicating that the consumer has successfully processed messages up to this Offset. Offset commits can be automatic (committed periodically based on configuration) or manual (controlled by the application when to commit).
When creating a consumer, if the property enable.auto.commit
is set to false, the offset can be manually committed.
Note: Before manually submitting the consumption progress, ensure that the message has been processed correctly; otherwise, the incorrectly processed message will not be consumed again. Automatic submission may commit the consumption progress of the previous message during the current poll
, so please ensure that the message processing is completed before the next poll
or message retrieval.
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
def commit_offset(consumer):
try:
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.")
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
You can manually submit the consumption progress using the consumer.commit
method.
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
You can manually submit the consumption progress using the ws_tmq_commit_sync
function.
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
try:
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
print(block.fetchall())
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.");
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
You can manually submit the consumption progress using the consumer.commit
method.
Not supported
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
void manual_commit(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message
taos_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
You can manually commit the consumption progress using the tmq_commit_sync
function.
Not supported
Unsubscribe and Close Consumption
Consumers can unsubscribe from topics and stop receiving messages. When a consumer is no longer needed, the consumer instance should be closed to release resources and disconnect from the TDengine server.
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
Note: Once the consumer unsubscribes and is closed, it cannot be reused. If you want to subscribe to a new topic
, please recreate the consumer.
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// unsubscribe the topic
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
Note: After the consumer unsubscribes, it is closed and cannot be reused. If you want to subscribe to a new topic
, please create a new consumer.
Not supported
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
Not supported
Complete Examples
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Complete code example
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
Note: The value of the value.deserializer
configuration parameter should be adjusted according to the package path of the test environment.
Complete code example
#!/usr/bin/python3
import taosws
db = "power"
topic = "topic_meters"
user = "root"
password = "taosdata"
host = "localhost"
port = 6041
groupId = "group1"
clientId = "1"
tdConnWsScheme = "ws"
autoOffsetReset = "latest"
autoCommitState = "true"
autoCommitIntv = "1000"
def prepareMeta():
conn = None
try:
conn = taosws.connect(user=user, password=password, host=host, port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
assert rowsAffected == 0
# change database.
rowsAffected = conn.execute(f"USE {db}")
assert rowsAffected == 0
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(64))"
)
assert rowsAffected == 0
# create table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
assert rowsAffected == 0
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err:
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()
# ANCHOR: create_consumer
def create_consumer():
try:
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": tdConnWsScheme,
"group.id": groupId,
"client.id": clientId,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": port,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
})
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}.");
return consumer;
except Exception as err:
print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.");
raise err
# ANCHOR_END: create_consumer
def seek_offset(consumer):
# ANCHOR: assignment
try:
assignments = consumer.assignment()
for assignment in assignments:
topic = assignment.topic()
print(f"topic: {topic}")
for assign in assignment.assignments():
print(
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
consumer.seek(topic, assign.vg_id(), assign.begin())
print("Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: assignment
# ANCHOR: subscribe
def subscribe(consumer):
try:
consumer.subscribe([topic])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: subscribe
# ANCHOR: commit_offset
def commit_offset(consumer):
try:
for i in range(50):
records = consumer.poll(timeout=1.0)
if records:
for block in records:
for row in block:
print(f"data: {row}")
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.")
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: commit_offset
#
def unsubscribe(consumer):
# ANCHOR: unsubscribe
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
# ANCHOR_END: unsubscribe
if __name__ == "__main__":
consumer = None
try:
prepareMeta()
consumer = create_consumer()
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
finally:
if consumer:
unsubscribe(consumer)
Complete code example
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
var done = make(chan struct{})
var groupID string
var clientID string
var host string
var topic string
func main() {
// init env
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
conn, err := sql.Open("taosWS", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
}
defer func() {
conn.Close()
}()
initEnv(conn)
// ANCHOR: create_consumer
// create consumer
wsUrl := "ws://127.0.0.1:6041"
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": wsUrl,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
// ANCHOR_END: create_consumer
// ANCHOR: subscribe
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// ANCHOR: commit_offset
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset
case tmqcommon.Error:
log.Fatalf(
"Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.Error(),
)
}
}
}
// ANCHOR_END: subscribe
// ANCHOR: seek
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf(
"Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek
// ANCHOR: close
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}
Complete code example
use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "ws://localhost:6041".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
// ANCHOR_END: create_topic
// ANCHOR: create_consumer_ac
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create websocket consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});
// ANCHOR: consume
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
// ANCHOR_END: consume
// ANCHOR: consumer_commit_manually
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await;
handle.join().unwrap();
taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}
Complete code example
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");
// ANCHOR: create_consumer
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}
async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
await wsSql.close();
}
async function subscribe(consumer) {
// ANCHOR: commit
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
// ANCHOR_END: commit
}
async function test() {
// ANCHOR: unsubscribe
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
// ANCHOR_END: unsubscribe
}
test()
Complete code example
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace TMQExample
{
internal class SubscribeDemo
{
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args)
{
try
{
var builder =
new ConnectionStringBuilder(
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("CREATE DATABASE IF NOT EXISTS power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC IF NOT EXISTS topic_meters as SELECT * from power.meters");
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(e.Message);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(e.Message);
throw;
}
}
static void InsertData()
{
var builder =
new ConnectionStringBuilder(
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer()
{
// ANCHOR: create_consumer
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "td.connect.port", "6041" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: create_consumer
return consumer;
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}
Complete code example
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taosws.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"};
void* prepare_data(void* arg) {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = ws_query(pConn, buf);
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
}
ws_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(WS_RES* msg) {
int32_t rows = 0;
const char* topicName = ws_tmq_get_topic_name(msg);
const char* dbName = ws_tmq_get_db_name(msg);
int32_t vgroupId = ws_tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
WS_ROW row = ws_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
WS_TAOS* init_env() {
int code = 0;
char* dsn = "ws://localhost:6041";
WS_TAOS* pConn = ws_connect(dsn);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return NULL;
}
WS_RES* pRes;
// drop database if exists
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP DATABASE IF EXISTS power");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create database
pRes = ws_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
// create super table
pRes =
ws_query(pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
goto END;
}
ws_free_result(pRes);
return pConn;
END:
ws_free_result(pRes);
ws_close(pConn);
return NULL;
}
void deinit_env(WS_TAOS* pConn) {
if (pConn) ws_close(pConn);
}
int32_t create_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
int32_t drop_topic(WS_TAOS* pConn) {
WS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = ws_query(pConn, "USE power");
code = ws_errno(pRes);
if (ws_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
pRes = ws_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = ws_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(pRes));
return -1;
}
ws_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(ws_tmq_t* tmq, int32_t code, void* param) {
count += 1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
ws_tmq_t* build_consumer(const ConsumerConfig* config) {
ws_tmq_conf_res_t code;
ws_tmq_t* tmq = NULL;
// create a configuration object
ws_tmq_conf_t* conf = ws_tmq_conf_new();
// set the configuration parameters
code = ws_tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "group.id", config->group_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "client.id", config->client_id);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
code = ws_tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (WS_TMQ_CONF_OK != code) {
ws_tmq_conf_destroy(conf);
return NULL;
}
// create a consumer object
tmq = ws_tmq_consumer_new(conf, "taos://localhost:6041", NULL, 0);
_end:
// destroy the configuration object
ws_tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
ws_tmq_list_t* build_topic_list() {
// create a empty topic list
ws_tmq_list_t* topicList = ws_tmq_list_new();
// append topic name to the list
int32_t code = ws_tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
ws_tmq_list_destroy(topicList);
fprintf(stderr,
"Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(NULL));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(ws_tmq_t* tmq) {
int32_t numOfAssignment = 0;
ws_tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = ws_tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
ws_tmq_topic_assignment* p = &pAssign[i];
code = ws_tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr,
"Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, ws_tmq_errstr(tmq));
break;
}
}
if (code == 0) fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
ws_tmq_free_assignment(pAssign, numOfAssignment);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(ws_tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
WS_RES* tmqmsg = ws_tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = ws_tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr,
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
// free the message
ws_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
ws_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
WS_TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
ws_tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n", config.td_connect_host,
config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
ws_tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n", topic_name, config.group_id,
config.client_id);
return -1;
}
if ((code = ws_tmq_subscribe(tmq, topic_list))) {
fprintf(stderr,
"Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
ws_tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = ws_tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr,
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = ws_tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, ws_tmq_errstr(tmq));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
Complete code example
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
Note: The value of the value.deserializer
configuration parameter should be adjusted according to the package path in the test environment.
Complete code example
#!/usr/bin/python3
import taos
db = "power"
topic = "topic_meters"
user = "root"
password = "taosdata"
host = "localhost"
port = 6030
groupId = "group1"
clientId = "1"
tdConnWsScheme = "ws"
autoOffsetReset = "latest"
autoCommitState = "true"
autoCommitIntv = "1000"
def prepareMeta():
conn = None
try:
conn = taos.connect(host=host, user=user, password=password, port=port)
conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
# change database. same as execute "USE db"
conn.select_db(db)
# create super table
conn.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
sql = """
INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
affectedRows = conn.execute(sql)
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err:
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
raise err
finally:
if conn:
conn.close()
# ANCHOR: create_consumer
from taos.tmq import Consumer
def create_consumer():
try:
consumer = Consumer(
{
"group.id": groupId,
"client.id": clientId,
"td.connect.user": user,
"td.connect.pass": password,
"enable.auto.commit": autoCommitState,
"auto.commit.interval.ms": autoCommitIntv,
"auto.offset.reset": autoOffsetReset,
"td.connect.ip": host,
"td.connect.port": str(port),
}
)
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
return consumer
except Exception as err:
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: create_consumer
# ANCHOR: subscribe
def subscribe(consumer):
try:
# subscribe to the topics
consumer.subscribe(["topic_meters"])
print("Subscribe topics successfully")
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
data = block.fetchall()
print(f"data: {data}")
except Exception as err:
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: subscribe
def commit_offset(consumer):
# ANCHOR: commit_offset
try:
for i in range(50):
records = consumer.poll(1)
if records:
err = records.error()
if err is not None:
print(f"Poll data error, {err}")
raise err
val = records.value()
if val:
for block in val:
print(block.fetchall())
# after processing the data, commit the offset manually
consumer.commit(records)
print("Commit offset manually successfully.");
except Exception as err:
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: commit_offset
def seek_offset(consumer):
# ANCHOR: assignment
try:
assignments = consumer.assignment()
if assignments:
for partition in assignments:
partition.offset = 0
consumer.seek(partition)
print(f"Assignment seek to beginning successfully.")
except Exception as err:
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
# ANCHOR_END: assignment
def unsubscribe(consumer):
# ANCHOR: unsubscribe
try:
consumer.unsubscribe()
print("Consumer unsubscribed successfully.");
except Exception as err:
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
raise err
finally:
if consumer:
consumer.close()
print("Consumer closed successfully.");
# ANCHOR_END: unsubscribe
if __name__ == "__main__":
consumer = None
try:
prepareMeta()
consumer = create_consumer()
subscribe(consumer)
seek_offset(consumer)
commit_offset(consumer)
finally:
if consumer:
unsubscribe(consumer);
Complete code example
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
var done = make(chan struct{})
var groupID string
var clientID string
var host string
var topic string
func main() {
// init env
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
conn, err := sql.Open("taosSql", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
}
defer func() {
conn.Close()
}()
initEnv(conn)
// ANCHOR: create_consumer
// create consumer
groupID = "group1"
clientID = "client1"
host = "127.0.0.1"
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"group.id": groupID,
"client.id": clientID,
})
if err != nil {
log.Fatalf(
"Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
host,
groupID,
clientID,
err.Error(),
)
}
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
// ANCHOR_END: create_consumer
// ANCHOR: subscribe
topic = "topic_meters"
err = consumer.Subscribe(topic, nil)
if err != nil {
log.Fatalf(
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ {
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("data:%v\n", e)
// ANCHOR: commit_offset
// commit offset
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
if err != nil {
log.Fatalf(
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
e.TopicPartition,
err.Error(),
)
}
log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset
case tmqcommon.Error:
log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
}
}
}
// ANCHOR_END: subscribe
// ANCHOR: seek
// get assignment
partitions, err := consumer.Assignment()
if err != nil {
log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
}
fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ {
// seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
log.Fatalf(
"Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
topic,
groupID,
clientID,
partitions[i].Partition,
0,
err.Error(),
)
}
}
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek
// ANCHOR: close
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}
Complete code example
use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
// ANCHOR: create_consumer_dsn
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
// ANCHOR_END: create_consumer_dsn
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
// ANCHOR_END: create_topic
// ANCHOR: create_consumer_ac
let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};
// ANCHOR_END: create_consumer_ac
let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});
// ANCHOR: consume
let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;
// ANCHOR_END: consume
// ANCHOR: consumer_commit_manually
consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: seek_offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);
match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}
let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);
// ANCHOR_END: seek_offset
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");
// ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await;
handle.join().unwrap();
taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}
Not supported
Complete code example
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace TMQExample
{
internal class SubscribeDemo
{
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args)
{
try
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("CREATE DATABASE IF NOT EXISTS power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC IF NOT EXISTS topic_meters as SELECT * from power.meters");
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(e.Message);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(e.Message);
throw;
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer()
{
// ANCHOR: create_consumer
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.port", "6030" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: create_consumer
return consumer;
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}
Complete code example
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taos.h"
volatile int thread_stop = 0;
static int running = 1;
static int count = 0;
const char* topic_name = "topic_meters";
typedef struct {
const char* enable_auto_commit;
const char* auto_commit_interval_ms;
const char* group_id;
const char* client_id;
const char* td_connect_host;
const char* td_connect_port;
const char* td_connect_user;
const char* td_connect_pass;
const char* auto_offset_reset;
} ConsumerConfig;
ConsumerConfig config = {
.enable_auto_commit = "true",
.auto_commit_interval_ms = "1000",
.group_id = "group1",
.client_id = "client1",
.td_connect_host = "localhost",
.td_connect_port = "6030",
.td_connect_user = "root",
.td_connect_pass = "taosdata",
.auto_offset_reset = "latest"
};
void* prepare_data(void* arg) {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
int i = 1;
while (!thread_stop) {
char buf[200] = {0};
i++;
snprintf(
buf, sizeof(buf),
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
"219, 0.31000)",
i);
pRes = taos_query(pConn, buf);
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to insert data to power.meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
}
taos_free_result(pRes);
sleep(1);
}
fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
// ANCHOR: msg_process
int32_t msg_process(TAOS_RES* msg) {
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
while (true) {
// get one row data from message
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
// Add your data processing logic here
rows++;
}
return rows;
}
// ANCHOR_END: msg_process
TAOS* init_env() {
const char* host = "localhost";
const char* user = "root";
const char* password = "taosdata";
uint16_t port = 6030;
int code = 0;
TAOS* pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return NULL;
}
TAOS_RES* pRes;
// drop database if exists
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP DATABASE IF EXISTS power");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "CREATE DATABASE power PRECISION 'ms' WAL_RETENTION_PERIOD 3600");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create super table
pRes = taos_query(
pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create super table meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
return pConn;
END:
taos_free_result(pRes);
taos_close(pConn);
return NULL;
}
void deinit_env(TAOS* pConn) {
if (pConn)
taos_close(pConn);
}
int32_t create_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to create topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t drop_topic(TAOS* pConn) {
TAOS_RES* pRes;
int code = 0;
if (!pConn) {
fprintf(stderr, "Invalid input parameter.\n");
return -1;
}
pRes = taos_query(pConn, "USE power");
code = taos_errno(pRes);
if (taos_errno(pRes) != 0) {
fprintf(stderr, "Failed to use power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "DROP TOPIC IF EXISTS topic_meters");
code = taos_errno(pRes);
if (code != 0) {
fprintf(stderr, "Failed to drop topic topic_meters, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
count +=1;
fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p, count: %d.\n", code, tmq, param, count);
}
// ANCHOR: create_consumer_1
tmq_t* build_consumer(const ConsumerConfig* config) {
tmq_conf_res_t code;
tmq_t* tmq = NULL;
// create a configuration object
tmq_conf_t* conf = tmq_conf_new();
// set the configuration parameters
code = tmq_conf_set(conf, "enable.auto.commit", config->enable_auto_commit);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", config->auto_commit_interval_ms);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", config->group_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", config->client_id);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.ip", config->td_connect_host);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.port", config->td_connect_port);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", config->td_connect_user);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", config->td_connect_pass);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", config->auto_offset_reset);
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
// set the callback function for auto commit
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
// create a consumer object
tmq = tmq_consumer_new(conf, NULL, 0);
_end:
// destroy the configuration object
tmq_conf_destroy(conf);
return tmq;
}
// ANCHOR_END: create_consumer_1
// ANCHOR: build_topic_list
// build a topic list used to subscribe
tmq_list_t* build_topic_list() {
// create a empty topic list
tmq_list_t* topicList = tmq_list_new();
// append topic name to the list
int32_t code = tmq_list_append(topicList, topic_name);
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return NULL;
}
// if success, return the list
return topicList;
}
// ANCHOR_END: build_topic_list
// ANCHOR: basic_consume_loop
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// Add your data processing logic here
totalRows += msg_process(tmqmsg);
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
// ANCHOR_END: basic_consume_loop
// ANCHOR: consume_repeatly
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
// get the topic assignment
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
fprintf(stderr, "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
return;
}
// seek to the earliest offset
for (int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, vgId: %d, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, p->vgId, code, tmq_err2str(code));
break;
}
}
if (code == 0)
fprintf(stdout, "Assignment seek to beginning successfully.\n");
// free the assignment array
tmq_free_assignment(pAssign);
// let's consume the messages again
basic_consume_loop(tmq);
}
// ANCHOR_END: consume_repeatly
// ANCHOR: manual_commit
void manual_commit(tmq_t* tmq) {
int32_t totalRows = 0; // total rows consumed
int32_t msgCnt = 0; // total messages consumed
int32_t timeout = 5000; // poll timeout
while (running) {
// poll message from TDengine
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
// process the message
totalRows += msg_process(tmqmsg);
// commit the message
int32_t code = tmq_commit_sync(tmq, tmqmsg);
if (code) {
fprintf(stderr, "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
// free the message
taos_free_result(tmqmsg);
break;
} else {
fprintf(stdout, "Commit offset manually successfully.\n");
}
// free the message
taos_free_result(tmqmsg);
}
if (msgCnt > 50) {
// consume 50 messages and break
break;
}
}
// print the result: total messages and total rows consumed
fprintf(stdout, "%d msg consumed, include %d rows.\n", msgCnt, totalRows);
}
// ANCHOR_END: manual_commit
int main(int argc, char* argv[]) {
int32_t code;
pthread_t thread_id;
TAOS* pConn = init_env();
if (pConn == NULL) {
fprintf(stderr, "Failed to init env.\n");
return -1;
}
if (create_topic(pConn) < 0) {
fprintf(stderr, "Failed to create topic.\n");
return -1;
}
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
fprintf(stderr, "Failed to create thread.\n");
return -1;
}
// ANCHOR: create_consumer_2
tmq_t* tmq = build_consumer(&config);
if (NULL == tmq) {
fprintf(stderr, "Failed to create native consumer, host: %s, groupId: %s, , clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
return -1;
} else {
fprintf(stdout, "Create consumer successfully, host: %s, groupId: %s, clientId: %s.\n",
config.td_connect_host, config.group_id, config.client_id);
}
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list, topic: %s, groupId: %s, clientId: %s.\n",
topic_name, config.group_id, config.client_id);
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to subscribe topic_list, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
// ANCHOR_END: subscribe_3
consume_repeatly(tmq);
manual_commit(tmq);
// ANCHOR: unsubscribe_and_close
// unsubscribe the topic
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer unsubscribed successfully.\n");
}
// close the consumer
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrCode: 0x%x, ErrMessage: %s.\n",
topic_name, config.group_id, config.client_id, code, tmq_err2str(code));
} else {
fprintf(stdout, "Consumer closed successfully.\n");
}
// ANCHOR_END: unsubscribe_and_close
thread_stop = 1;
pthread_join(thread_id, NULL);
if (drop_topic(pConn) < 0) {
fprintf(stderr, "Failed to drop topic.\n");
return -1;
}
deinit_env(pConn);
return 0;
}
Not supported