Skip to main content

Data Subscription

TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.

To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.

By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.

To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.

Tips: Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter WAL_RETENTION_PERIOD or WAL_RETENTION_SIZE when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.

Data Schema and API

The related schemas and APIs in various languages are described as follows:

void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

void commitAsync();

void commitAsync(OffsetCommitCallback callback);

void commitSync() throws SQLException;

void close() throws SQLException;

Insert Data into TDengine

A database including one supertable and two subtables is created as follows:

DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');

Create a Topic

The following SQL statement creates a topic in TDengine:

CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;

Multiple subscription types are supported.

Subscribe to a Column

Syntax:

CREATE TOPIC topic_name as subquery

You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as SELECT * and SELECT ts, cl are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:

  • The schema of topics created in this manner is determined by the subscribed data.
  • You cannot modify (ALTER <table> MODIFY) or delete (ALTER <table> DROP) columns or tags that are used in a subscription or calculation.
  • Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.

Subscribe to a Supertable

Syntax:

CREATE TOPIC topic_name AS STABLE stb_name

Creating a topic in this manner differs from a SELECT * from stbName statement as follows:

  • The table schema can be modified.
  • Unstructured data is returned. The format of the data returned changes based on the supertable schema.
  • A different table schema may exist for every data block to be processed.
  • The data returned does not include tags.

Subscribe to a Database

Syntax:

CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;

This SQL statement creates a subscription to all tables in the database. You can add the WITH META parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.

Create a Consumer

You configure the following parameters when creating a consumer:

ParameterTypeDescriptionRemarks
td.connect.ipstringIP address of the server side
td.connect.userstringUser Name
td.connect.passstringPassword
td.connect.portstringPort of the server side
group.idstringConsumer group ID; consumers with the same ID are in the same groupRequired. Maximum length: 192.
client.idstringClient IDMaximum length: 192.
auto.offset.resetenumInitial offset for the consumer groupSpecify earliest, latest, or none(default)
enable.auto.commitbooleanCommit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itselfDefault value is true
auto.commit.interval.msintegerInterval for automatic commits, in milliseconds
msg.with.table.namebooleanSpecify whether to deserialize table names from messagesdefault value: false

The method of specifying these parameters depends on the language used:

Java programs use the following parameters:

ParameterTypeDescription
td.connect.typestringconnection type: "jni" means native connection, "ws" means websocket connection, the default is "jni"
bootstrap.serversstringConnection address, such as localhost:6030
value.deserializerstringValue deserializer; to use this method, implement the com.taosdata.jdbc.tmq.Deserializer interface or inherit the com.taosdata.jdbc.tmq.ReferenceDeserializer type
value.deserializer.encodingstringSpecify the encoding for string deserialization

Note: The bootstrap.servers parameter is used instead of td.connect.ip and td.connect.port to provide an interface that is consistent with Kafka.

Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.

Subscribe to a Topic

A single consumer can subscribe to multiple topics.

List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);

Consume messages

The following code demonstrates how to consume the messages in a queue.

while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}

Close the consumer

After message consumption is finished, the consumer is unsubscribed.

/* Unsubscribe */
consumer.unsubscribe();

/* Close consumer */
consumer.close();

Delete a Topic

You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.

/* Delete topic/
DROP TOPIC topic_name;

Check Status

  1. Query all existing topics.
SHOW TOPICS;
  1. Query the status and subscribed topics of all consumers.
SHOW CONSUMERS;
  1. Query the relationships between consumers and vgroups.
SHOW SUBSCRIPTIONS;

Examples

The following section shows sample code in various languages.

package com.taos.example;

import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);

public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}

// create consumer
Properties properties = new Properties();
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test1");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");

// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}

view source code

package com.taos.example;

import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

view source code

package com.taos.example;

import java.sql.Timestamp;

public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}

public float getCurrent() {
return current;
}

public void setCurrent(float current) {
this.current = current;
}

public int getVoltage() {
return voltage;
}

public void setVoltage(int voltage) {
this.voltage = voltage;
}

public int getGroupid() {
return groupid;
}

public void setGroupid(int groupid) {
this.groupid = groupid;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}

view source code