Skip to main content

Managing Consumers

TDengine provides data subscription and consumption interfaces similar to those of message queue products. In many scenarios, by adopting TDengine's time-series big data platform, there is no need to integrate additional message queue products, thus simplifying application design and reducing maintenance costs. This chapter introduces the related APIs and usage methods for data subscription with various language connectors. For basic information on data subscription, please refer to Data Subscription

Creating Topics

Please use taos shell or refer to the Execute SQL section to execute the SQL for creating topics: CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters

The above SQL will create a subscription named topic_meters. Each record in the messages obtained using this subscription is composed of the columns selected by this query statement SELECT ts, current, voltage, phase, groupid, location FROM meters.

Note In the implementation of TDengine connectors, there are the following limitations for subscription queries.

  • Query statement limitation: Subscription queries can only use select statements and do not support other types of SQL, such as subscribing to databases, subscribing to supertables (non-select methods), insert, update, or delete, etc.
  • Raw data query: Subscription queries can only query raw data, not aggregated or calculated results.
  • Time order limitation: Subscription queries can only query data in chronological order.

Creating Consumers

The concept of TDengine consumers is similar to Kafka, where consumers receive data streams by subscribing to topics. Consumers can be configured with various parameters, such as connection methods, server addresses, automatic Offset submission, etc., to suit different data processing needs. Some language connectors' consumers also support advanced features such as automatic reconnection and data transmission compression to ensure efficient and stable data reception.

Creation Parameters

There are many parameters for creating consumers, which flexibly support various connection types, Offset submission methods, compression, reconnection, deserialization, and other features. The common basic configuration items applicable to all language connectors are shown in the following table:

Parameter NameTypeDescriptionRemarks
td.connect.ipstringServer IP address
td.connect.userstringUsername
td.connect.passstringPassword
td.connect.portintegerServer port number
group.idstringConsumer group ID, the same consumer group shares consumption progress
Required. Maximum length: 192.
Each topic can have up to 100 consumer groups
client.idstringClient IDMaximum length: 192
auto.offset.resetenumInitial position of the consumer group subscription
earliest: default(version < 3.2.0.0); subscribe from the beginning;
latest: default(version >= 3.2.0.0); only subscribe from the latest data;
none: cannot subscribe without a committed offset
enable.auto.commitbooleanWhether to enable automatic consumption point submission, true: automatic submission, client application does not need to commit; false: client application needs to commit manuallyDefault is true
auto.commit.interval.msintegerTime interval for automatically submitting consumption records, in millisecondsDefault is 5000
msg.with.table.namebooleanWhether to allow parsing the table name from the message, not applicable to column subscription (column subscription can write tbname as a column in the subquery statement) (from version 3.2.0.0 this parameter is deprecated, always true)Default is off
enable.replaybooleanWhether to enable data replay functionDefault is off
session.timeout.msintegerTimeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0)Default is 12000, range [6000, 1800000]
max.poll.interval.msintegerThe longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0)Default is 300000, range [1000, INT32_MAX]

Below are the connection parameters for connectors in various languages:

The parameters for creating a consumer with the Java connector are Properties. For a list of parameters you can set, please refer to Consumer Parameters
For other parameters, refer to the common basic configuration items mentioned above.

WebSocket Connection

Introduces how connectors in various languages use WebSocket connection method to create consumers. Specify the server address to connect, set auto-commit, start consuming from the latest message, specify group.id and client.id, etc. Some language connectors also support deserialization parameters.

Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Native Connection

Introduce how connectors in various languages use native connections to create consumers. Specify the server address for the connection, set auto-commit, start consuming from the latest message, and specify information such as group.id and client.id. Some language connectors also support deserialization parameters.

Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Subscribe to Consume Data

After subscribing to a topic, consumers can start receiving and processing messages from these topics. The example code for subscribing to consume data is as follows:

WebSocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  • The parameters of the subscribe method mean: the list of topics subscribed to (i.e., names), supporting subscription to multiple topics simultaneously.
  • poll is called each time to fetch a message, which may contain multiple records.
  • ResultBean is a custom internal class, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of type ResultBean to be deserialized using the value.deserializer property's corresponding deserialization class.

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  • The parameters of the subscribe method mean: the list of topics (i.e., names) to subscribe to, supporting subscription to multiple topics simultaneously.
  • poll is called each time to get a message, which may contain multiple records.
  • ResultBean is a custom internal class, whose field names and data types correspond one-to-one with the column names and data types, allowing objects of type ResultBean to be deserialized based on the value.deserializer property's corresponding deserialization class.

Specifying the Subscription Offset

Consumers can specify to start reading messages from a specific Offset in the partition, allowing them to reread messages or skip processed messages. Below is how connectors in various languages specify the subscription Offset.

WebSocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  1. Use the consumer.poll method to poll data until data is obtained.
  2. For the first batch of polled data, print the content of the first message and obtain the current consumer's partition assignment information.
  3. Use the consumer.seekToBeginning method to reset the offset of all partitions to the starting position and print the successful reset message.
  4. Poll data again using the consumer.poll method and print the content of the first message.

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  1. Use the consumer.poll method to poll data until data is obtained.
  2. For the first batch of polled data, print the content of the first data item and obtain the current consumer's partition assignment information.
  3. Use the consumer.seekToBeginning method to reset the offset of all partitions to the beginning position and print a message of successful reset.
  4. Poll data again using the consumer.poll method and print the content of the first data item.

Commit Offset

After a consumer has read and processed messages, it can commit the Offset, indicating that the consumer has successfully processed messages up to this Offset. Offset commits can be automatic (committed periodically based on configuration) or manual (controlled by the application when to commit). When creating a consumer, if the property enable.auto.commit is set to false, the offset can be manually committed.

Note: Before manually submitting the consumption progress, ensure that the message has been processed correctly; otherwise, the incorrectly processed message will not be consumed again. Automatic submission may commit the consumption progress of the previous message during the current poll, so please ensure that the message processing is completed before the next poll or message retrieval.

WebSocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Unsubscribe and Close Consumption

Consumers can unsubscribe from topics and stop receiving messages. When a consumer is no longer needed, the consumer instance should be closed to release resources and disconnect from the TDengine server.

WebSocket Connection