Skip to main content

Manage Consumers

TDengine provides data subscription and consumption interfaces similar to those of message queue products. In many scenarios, using TDengine's time-series big data platform eliminates the need to integrate message queue products, thereby simplifying application design and reducing operational costs. This chapter introduces the relevant API for data subscription for various language connectors and how to use them. For basic knowledge of data subscription, please refer to Data Subscription.

Creating a Topic

You can use taos shell or refer to the Execute SQL chapter to execute the SQL for creating a topic: CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters

The above SQL will create a subscription named topic_meters. Each record in the messages retrieved using this subscription consists of the columns selected by this query statement SELECT ts, current, voltage, phase, groupid, location FROM meters.

note

In the TDengine connector implementation, there are the following limitations for subscription queries:

  • Query Limitations: Subscription queries can only use select statements and do not support other types of SQL, such as insert, update, or delete.
  • Raw Data Query: Subscription queries can only query raw data and cannot query aggregated or computed results.
  • Time Order Limit: Subscription queries can only query data in chronological order.

Creating a Consumer

The concept of a TDengine consumer is similar to that of Kafka. Consumers receive data streams by subscribing to topics. Consumers can configure various parameters, such as connection methods, server addresses, and automatic offset commits, to suit different data processing needs. Some language connectors' consumers also support advanced features such as automatic reconnection and data transmission compression to ensure efficient and stable data reception.

Creating Parameters

The parameters for creating consumers are numerous and flexible, supporting various connection types, offset commit methods, compression, reconnection, deserialization, and other features. The common basic configuration items applicable to all language connectors are shown in the following table:

Parameter NameTypeParameter DescriptionRemarks
td.connect.ipstringServer's IP address
td.connect.userstringUsername
td.connect.passstringPassword
td.connect.portintegerServer's port number
group.idstringConsumer group ID, shared consumption progress among the same group
Required. Maximum length: 192.
A maximum of 100 consumer groups can be established for each topic.
client.idstringClient IDMaximum length: 192
auto.offset.resetenumInitial position for the consumer group subscription
earliest: default (version < 3.2.0.0); subscribe from the beginning;
latest: default (version >= 3.2.0.0); only start from the latest data;
none: cannot subscribe without a committed offset.
enable.auto.commitbooleanWhether to enable automatic offset submission, true: automatically submit, the client application does not need to commit; false: the client application needs to commit itselfDefault value is true
auto.commit.interval.msintegerInterval for automatically committing consumed offsets, in millisecondsDefault value is 5000
msg.with.table.namebooleanWhether to allow parsing the table name from the message; not applicable for column subscriptions (for column subscriptions, tbname can be written as a column in the subquery) (this parameter is deprecated from version 3.2.0.0 and is always true)Default is off
enable.replaybooleanWhether to enable data replay functionalityDefault is off
session.timeout.msintegerTimeout period after the consumer's heartbeat is lost; after timeout, the rebalance logic will be triggered, and if successful, the consumer will be deleted (supported from version 3.3.3.0)Default value is 12000, value range [6000, 1800000]
max.poll.interval.msintegerMaximum time interval for the consumer to poll and fetch data; exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and if successful, the consumer will be deleted (supported from version 3.3.3.0)Default value is 300000, [1000, INT32_MAX]

Here are the creation parameters for various language connectors:

The parameters for creating a consumer in the Java connector are Properties. For the list of configurable parameters, please refer to Consumer. Other parameters can be referenced in the common basic configuration items above.

Websocket Connection

This section introduces how to create a consumer using WebSocket connection in various language connectors. Specify the server address to connect, set automatic commits, start consuming from the latest messages, and specify group.id and client.id information. Some language connectors also support deserialization parameters.

Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Native Connection

This section introduces how to create a consumer using native connection in various language connectors. Specify the server address to connect, set automatic commits, start consuming from the latest messages, and specify group.id and client.id information. Some language connectors also support deserialization parameters.

Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Subscribing to Consume Data

After the consumer subscribes to a topic, it can start receiving and processing messages from that topic. Here are example codes for subscribing to consume data:

Websocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  • The parameter of the subscribe method represents the list of topics to subscribe to (i.e., names), supporting multiple topics simultaneously.
  • The poll method fetches a message each time it is called, and a single message may contain multiple records.
  • ResultBean is a custom internal class whose field names and data types correspond to the names and data types of the columns, allowing deserialization into objects of type ResultBean based on the deserialization class specified by the value.deserializer property.

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  • The parameter of the subscribe method represents the list of topics to subscribe to (i.e., names), supporting multiple topics simultaneously.
  • The poll method fetches a message each time it is called, and a single message may contain multiple records.
  • ResultBean is a custom internal class whose field names and data types correspond to the names and data types of the columns, allowing deserialization into objects of type ResultBean based on the deserialization class specified by the value.deserializer property.

Specifying the Subscription Offset

Consumers can specify the offset from which to start reading messages in a partition. This allows consumers to re-read messages or skip processed messages. The following shows how to specify the subscription offset in various language connectors.

Websocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  1. Use the consumer.poll method to poll data until data is retrieved.
  2. For the first batch of data polled, print the content of the first message and retrieve the current consumer's partition assignment information.
  3. Use the consumer.seekToBeginning method to reset the offsets of all partitions to the starting position and print a message indicating successful reset.
  4. Call the consumer.poll method again to poll data and print the content of the first message.

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

  1. Use the consumer.poll method to poll data until data is retrieved.
  2. For the first batch of data polled, print the content of the first message and retrieve the current consumer's partition assignment information.
  3. Use the consumer.seekToBeginning method to reset the offsets of all partitions to the starting position and print a message indicating successful reset.
  4. Call the consumer.poll method again to poll data and print the content of the first message.

Committing Offset

After the consumer reads and processes messages, it can commit the offset, indicating that the consumer has successfully processed messages up to this offset. Offset commits can be automatic (periodically submitted according to configuration) or manual (controlled by the application).

When creating a consumer, if the enable.auto.commit attribute is set to false, the offset can be committed manually.

note

Before manually committing the consumption progress, ensure that the message has been processed successfully; otherwise, incorrectly processed messages will not be consumed again. Automatic commits may submit the consumption progress of the previous message during the current poll, so ensure that messages are processed before performing the next poll or message retrieval.

Websocket Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Native Connection

List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}

view source code

Unsubscribing and Closing Consumption

Consumers can unsubscribe from topics to stop receiving messages. When a consumer is no longer needed, it should close the consumer instance to free resources and disconnect from the TDengine server.

Websocket Connection

try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}

view source code

Native Connection

try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}

view source code

Complete Example

Websocket Connection

Complete Code Example
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";

public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}


public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}

public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {

}

// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public double getCurrent() {
return current;
}

public void setCurrent(double current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public double getPhase() {
return phase;
}

public void setPhase(double phase) {
this.phase = phase;
}

public int getGroupid() {
return groupid;
}

public void setGroupid(int groupid) {
this.groupid = groupid;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}
}

public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");

try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}

public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}

try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}


public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();

// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();

// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});

try {
TaosConsumer<ResultBean> consumer = getConsumer();

pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();

seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();

commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();

unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}

stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();

try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}

closeConnection();
System.out.println("program end.");
}
}

view source code

note

The value of the value.deserializer configuration parameter should be adjusted according to the package path of the testing environment.

Native Connection

Complete Code Example
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";

public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");

try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JsonUtil.getObjectMapper().writeValueAsString(assignment));

ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}

consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}


public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException, JsonProcessingException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JsonUtil.getObjectMapper().writeValueAsString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}

public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {

}

// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public double getCurrent() {
return current;
}

public void setCurrent(double current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public double getPhase() {
return phase;
}

public void setPhase(double phase) {
this.phase = phase;
}

public int getGroupid() {
return groupid;
}

public void setGroupid(int groupid) {
this.groupid = groupid;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}
}

public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}

public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");

try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}

public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}

try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}


public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();

// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();

// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});

try {
TaosConsumer<ResultBean> consumer = getConsumer();

pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();

seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();

commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();

unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}

stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();

try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}

closeConnection();
System.out.println("program end.");
}
}

view source code

note

The value of the value.deserializer configuration parameter should be adjusted according to the package path of the testing environment.