Skip to main content

Data Subscription

TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.

To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.

By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.

The topic introduces how to share data from TDengine instance through the access control management of TDengine Cloud and the subscription interfaces of each supported client library. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscribers of the topic. The subscriber of the topic can get the details about how to access the shared data from TDengine in the data subscription way. In this document we will briefly explain these main steps of data sharing.

Create Topic

You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way.

To Database

The default selection in the Add New Topic dialog is database type. After select a database in the selection, you can click Confirm button to create a topic to a database.

To Super Table

In the opened Add New Topic dialog, you can click STable type and select a specified super table from the selections. Then click Confirm button to create a topic to a super table.

With Subquery

In the opened Add New Topic dialog, you can click Subquery type to show all subquery form items. The first item is Table Type and the default selection is STable. After you select or input a super table name, the following will show you all fields from the super table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.

You can select another Table Table Table and then select a table from the selections or input an existed table name. You can get all fields of the selected table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.

Share Topic

In each row of the topic list in the Topics page, you can click Share Topic action icon to the Share Topic page. Also you can directly click Share Topic tab to switch to the right location. In the Share Topic tab, you can get only one row for yourself in the Users page.

Users

In the default tab Users of the Share Topic page, you can click Add Users button to add more users who are active in the current organization. In the opened Add New Users dialog, you can select the new users who you want to share the topic with. Then you can set the expired time for the sharing to these users.

User Groups

You can click User Groups tab to switch to the User Groups page of the Share Topic. Then you can click Add User Groups button to add more user groups which are active in the current organization. In the opened Add New User Groups dialog, you can select the new user groups which you want to share the topic with. Then you can set the expired time for the sharing to these user groups.

Consume Shared Topic

The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click Sample Code icon of each topic Action area to the Sample Code page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance.

Data Schema and API

The related schemas and APIs in various languages are described as follows:

func NewConsumer(conf *Config) (*Consumer, error)

func (c *Consumer) Close() error

func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error

func (c *Consumer) FreeMessage(message unsafe.Pointer)

func (c *Consumer) Poll(timeout time.Duration) (*Result, error)

func (c *Consumer) Subscribe(topics []string) error

func (c *Consumer) Unsubscribe() error

Configure TDengine DSN

You can set the following for Go, Rust and JavaScript:

export TDENGINE_CLOUD_TMQ="<TDENGINE_CLOUD_TMQ>"
IMPORTANT

Replace <TDENGINE_CLOUD_TMQ> with the real value, the format should be wss://<cloud_endpoint>)/rest/tmq?token=<token>. To obtain the value of TDENGINE_CLOUD_TMQ, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to Example part.

For Python and C#, you need to set the following variables:

export TDENGINE_CLOUD_ENDPOINT="<TDENGINE_CLOUD_ENDPOINT>"
export TDENGINE_CLOUD_TOKEN="<TDENGINE_CLOUD_TOKEN>"
IMPORTANT

Replace <TDENGINE_CLOUD_ENDPOINT> and <TDENGINE_CLOUD_TOKEN> with the real values. To obtain the value of these, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to the Python tab of the Example part.

Last, for Java, you need to set the following variables:

export TDENGINE_JDBC_URL="<TDENGINE_JDBC_URL>"
IMPORTANT

Replace <TDENGINE_JDBC_URL> with the real value, the format should be jdbc:TAOS-RS://<cloud_endpoint>)?useSSL=false&token=<token>. To obtain the value of these, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to the Java tab of the Example part.

Create a Consumer from Instance

You configure the following parameters when creating a consumer:

ParameterTypeDescriptionRemarks
td.connect.ipstringTDengine Cloud instance endpoint used in Python, such as "gw.us-central-1.gcp.cloud.tdengine.com";
td.connect.tokenstringThe Cloud instance token used in Python;
group.idstringConsumer group ID; consumers with the same ID are in the same groupRequired. Maximum length: 192.
client.idstringClient IDMaximum length: 192.
auto.offset.resetenumInitial offset for the consumer groupSpecify earliest, latest, or none(default)
enable.auto.commitbooleanCommit automaticallySpecify true or false.
auto.commit.interval.msintegerInterval for automatic commits, in milliseconds
enable.heartbeat.backgroundbooleanBackend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time
msg.with.table.namebooleanSpecify whether to deserialize table names from messages

The method of specifying these parameters depends on the language used:

import (
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}

A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.

Subscribe to a Topic

A single consumer can subscribe to multiple topics.

err = consumer.Subscribe("<TDC_TOPIC>", nil)
if err != nil {
panic(err)
}
IMPORTANT

Replace <TDC_TOPIC> with the real value. To obtain the value of TDC_TOPIC, please log in TDengine Cloud and click Topcis on the left menu, then copy the topic name you want to consume.

Consume messages

The following code demonstrates how to consume the messages in a queue.

for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e.String())
consumer.Commit()
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
}

Close the consumer

After message consumption is finished, the consumer is unsubscribed.

consumer.Close()

Sample Code

The following are full sample codes about how to consume the shared topic test:

package main

import (
"fmt"
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
"os"
)

func main() {
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("test", nil)
if err != nil {
panic(err)
}
defer consumer.Close()
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e.String())
consumer.Commit()
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
}
}

view source code