Data Subscription
TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.
To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.
By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.
The topic introduces how to share data from TDengine instance through the access control management of TDengine Cloud and the subscription interfaces of each supported client library. The data owner first creates the topic through the topic wizard. Then adds the users or user groups which he wants to share the data with to the subscribers of the topic. The subscriber of the topic can get the details about how to access the shared data from TDengine in the data subscription way. In this document we will briefly explain these main steps of data sharing.
Create Topic
You can create the topic in Topics of TDengine Cloud. In the Create Topic dialog, you can choose wizard or SQL way to create the topic. In the wizard way, you need to input the topic name and select the database of the current TDengine instance. Then select the super table or specify the subquery with the super table or sub table. Also you can add fields selections or add result set and condition set for each field. In the following, you can get the detail of how to create the topic in three levels through wizard way.
To Database
The default selection in the Add New Topic dialog is database type. After select a database in the selection, you can click Confirm button to create a topic to a database.
To Super Table
In the opened Add New Topic dialog, you can click STable type and select a specified super table from the selections. Then click Confirm button to create a topic to a super table.
With Subquery
In the opened Add New Topic dialog, you can click Subquery type to show all subquery form items. The first item is Table Type and the default selection is STable. After you select or input a super table name, the following will show you all fields from the super table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
You can select another Table Table Table and then select a table from the selections or input an existed table name. You can get all fields of the selected table. You can check or uncheck each field for the sub query and also you can set the result set or condition set for each field. If you want to preview the SQL based on your chooses, click SQL Preiview to open a SQL dialog to view.
Share Topic
In each row of the topic list in the Topics page, you can click Share Topic action icon to the Share Topic page. Also you can directly click Share Topic tab to switch to the right location. In the Share Topic tab, you can get only one row for yourself in the Users page.
Users
In the default tab Users of the Share Topic page, you can click Add Users button to add more users who are active in the current organization. In the opened Add New Users dialog, you can select the new users who you want to share the topic with. Then you can set the expired time for the sharing to these users.
User Groups
You can click User Groups tab to switch to the User Groups page of the Share Topic. Then you can click Add User Groups button to add more user groups which are active in the current organization. In the opened Add New User Groups dialog, you can select the new user groups which you want to share the topic with. Then you can set the expired time for the sharing to these user groups.
Consume Shared Topic
The shared user can get all topics which the creator shared with him, when he goes to the Topic page of Data Subscription. The user can click Sample Code icon of each topic Action area to the Sample Code page. Then he can follow the steps of the sample code how to consume the shared topic from TDengine instance.
Data Schema and API
The related schemas and APIs in various languages are described as follows:
- Go
- Rust
- Python
- Java
- C#
- Node.js
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
For more information, see Crate taos.
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitSync() throws SQLException;
void close() throws SQLException;
ConsumeResult<TValue> Consume(int millisecondsTimeout);
void Subscribe(IEnumerable<string> topic);
void Subscribe(string topic);
void Unsubscribe();
void Commit(ConsumeResult<TValue> consumerResult);
void Seek(TopicPartitionOffset tpo);
Offset Position(TopicPartition partition);
void Close();
When creating a data subscription with NodeJS, make sure that the TDengine version is above 3.2.0.0.
subscribe(topics: Array<string>, reqId?: number): Promise<void>;
unsubscribe(reqId?: number): Promise<void>;
poll(timeoutMs: number, reqId?: number): Promise<Map<string, TaosResult>>;
subscription(reqId?: number): Promise<Array<string>>;
commit(reqId?: number): Promise<Array<TopicPartition>>;
committed(partitions: Array<TopicPartition>, reqId?: number): Promise<Array<TopicPartition>>;
commitOffsets(partitions: Array<TopicPartition>): Promise<Array<TopicPartition>>;
commitOffset(partition: TopicPartition, reqId?: number): Promise<void>;
positions(partitions: Array<TopicPartition>, reqId?: number): Promise<Array<TopicPartition>>;
seek(partition: TopicPartition, reqId?: number): Promise<void>;
seekToBeginning(partitions: Array<TopicPartition>): Promise<void>;
seekToEnd(partitions: Array<TopicPartition>): Promise<void>;
assignment(topics?: string[]): Promise<Array<TopicPartition>>;
close(): Promise<void>;
Configure TDengine DSN
You can set the following for Go, Rust and JavaScript:
- Bash
- CMD
- Powershell
export TDENGINE_CLOUD_TMQ="<TDENGINE_CLOUD_TMQ>"
set TDENGINE_CLOUD_TMQ=<TDENGINE_CLOUD_TMQ>
$env:TDENGINE_CLOUD_TMQ='<TDENGINE_CLOUD_TMQ>'
Replace <TDENGINE_CLOUD_TMQ> with the real value, the format should be wss://<cloud_endpoint>)/rest/tmq?token=<token>
.
To obtain the value of TDENGINE_CLOUD_TMQ
, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to Example part.
For Python and C#, you need to set the following variables:
- Bash
- CMD
- Powershell
export TDENGINE_CLOUD_ENDPOINT="<TDENGINE_CLOUD_ENDPOINT>"
export TDENGINE_CLOUD_TOKEN="<TDENGINE_CLOUD_TOKEN>"
set TDENGINE_CLOUD_ENDPOINT=<TDENGINE_CLOUD_ENDPOINT>
set TDENGINE_CLOUD_TOKEN=<TDENGINE_CLOUD_TOKEN>
$env:TDENGINE_CLOUD_ENDPOINT='<TDENGINE_CLOUD_ENDPOINT>'
$env:TDENGINE_CLOUD_TOKEN='<TDENGINE_CLOUD_TOKEN>'
Replace <TDENGINE_CLOUD_ENDPOINT> and <TDENGINE_CLOUD_TOKEN> with the real values. To obtain the value of these, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to the Python tab of the Example part.
Last, for Java, you need to set the following variables:
- Bash
- CMD
- Powershell
export TDENGINE_JDBC_URL="<TDENGINE_JDBC_URL>"
set TDENGINE_JDBC_URL=<TDENGINE_JDBC_URL>
$env:TDENGINE_JDBC_URL='<TDENGINE_JDBC_URL>'
Replace <TDENGINE_JDBC_URL> with the real value, the format should be jdbc:TAOS-RS://<cloud_endpoint>)?useSSL=false&token=<token>
. To obtain the value of these, please log in TDengine Cloud and click Topcis on the left menu, then click Sample Code action of the each topic to the Java tab of the Example part.
Create a Consumer from Instance
You configure the following parameters when creating a consumer:
Parameter | Type | Description | Remarks |
---|---|---|---|
td.connect.ip | string | TDengine Cloud instance endpoint used in Python, such as "gw.us-central-1.gcp.cloud.tdengine.com"; | |
td.connect.token | string | The Cloud instance token used in Python; | |
group.id | string | Consumer group ID; consumers with the same ID are in the same group | Required. Maximum length: 192. |
client.id | string | Client ID | Maximum length: 192. |
auto.offset.reset | enum | Initial offset for the consumer group | Specify earliest , latest , or none (default) |
enable.auto.commit | boolean | Commit automatically | Specify true or false . |
auto.commit.interval.ms | integer | Interval for automatic commits, in milliseconds | |
enable.heartbeat.background | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
msg.with.table.name | boolean | Specify whether to deserialize table names from messages |
The method of specifying these parameters depends on the language used:
- Go
- Rust
- Python
- Java
- C#
- Node.js
import (
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
let tmq_str = std::env::var("TDENGINE_CLOUD_TMQ")?;
let tmq_uri = format!( "{}&group.id=test_group_rs&client.id=test_consumer_ws", tmq_str);
println!("request tmq URI is {tmq_uri}\n");
let tmq = TmqBuilder::from_dsn(tmq_uri,)?;
let mut consumer = tmq.build().await?;
endpoint = os.environ["TDENGINE_CLOUD_ENDPOINT"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]
conf = {
# auth options
"td.connect.websocket.scheme": "wss",
"td.connect.ip": endpoint,
"td.connect.token": token,
# consume options
"group.id": "test_group_py",
"client.id": "test_consumer_ws_py",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"auto.offset.reset": "earliest",
"msg.with.table.name": "true",
}
consumer = Consumer(conf)
String url = System.getenv("TDENGINE_JDBC_URL");
Properties properties = new Properties();
properties.setProperty(TMQConstants.CONNECT_TYPE, "websocket");
properties.setProperty(TMQConstants.CONNECT_URL, url);
properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.GROUP_ID, "gId");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");
TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties));
var cloudEndPoint = Environment.GetEnvironmentVariable("CLOUD_ENDPOINT");
var cloudToken = Environment.GetEnvironmentVariable("CLOUD_TOKEN");
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", cloudEndPoint.ToString() },
{ "td.connect.port", "443" },
{ "useSSL", "true" },
{ "token", cloudToken.ToString() },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
let url = process.env.TDENGINE_CLOUD_TMQ;
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, 'gId'],
[taos.TMQConstants.CLIENT_ID, 'clientId'],
[taos.TMQConstants.AUTO_OFFSET_RESET, 'earliest'],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'],
]);
// create consumer
let consumer = await taos.tmqConnect(configMap);
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
Subscribe to a Topic
A single consumer can subscribe to multiple topics.
- Go
- Rust
- Python
- Java
- C#
- Node.js
err = consumer.Subscribe("<TDC_TOPIC>", nil)
if err != nil {
panic(err)
}
consumer.subscribe(["<TDC_TOPIC>"]).await?;
consumer.subscribe(["<TDC_TOPIC>"])
consumer.subscribe(Collections.singletonList("<TDC_TOPIC>"));
consumer.Subscribe(new List<string>() { "<TDC_TOPIC>" });
await consumer.subscribe(['<TDC_TOPIC>']);
Replace <TDC_TOPIC> with the real value. To obtain the value of TDC_TOPIC
, please log in TDengine Cloud and click Topcis on the left menu, then copy the topic name you want to consume.
Consume messages
The following code demonstrates how to consume the messages in a queue.
- Go
- Rust
- Python
- Java
- C#
- Node.js
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e.String())
consumer.Commit()
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
}
// consume loop
consumer
.stream()
.try_for_each_concurrent(10, |(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// A two-dimension matrix while each cell is a [taos::Value] object.
let values = block.to_values();
// Number of rows.
assert_eq!(values.len(), block.nrows());
// Number of columns
assert_eq!(values[0].len(), block.ncols());
println!("first row: {}", values[0].iter().join(", "));
}
}
consumer.commit(offset).await?;
Ok(())
})
.await?;
while 1:
message = consumer.poll(timeout=1.0)
if message:
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
else:
break
for (int i = 0; i < 100; i++) {
ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {
Map<String, Object> bean = r.value();
bean.forEach((k, v) -> {
System.out.print(k + " : " + v + " ");
});
System.out.println();
}
}
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
// poll
for (let i = 0; i < 100; i++) {
let res = await consumer.poll(1000);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${JSON.stringify(value, replacer)}`);
}
// commit
await consumer.commit();
}
// Custom replacer function to handle BigInt serialization
function replacer(key, value) {
if (typeof value === 'bigint') {
return value.toString(); // Convert BigInt to string
}
return value;
}
Close the consumer
After message consumption is finished, the consumer is unsubscribed.
- Go
- Rust
- Python
- Java
- C#
- Node.js
consumer.Close()
consumer.unsubscribe().await;
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
/* Unsubscribe */
consumer.unsubscribe();
/* Close consumer */
consumer.close();
// unsubscribe
consumer.Unsubscribe();
// close consumer
consumer.Close();
await consumer.unsubscribe();
await consumer.close();
Sample Code
The following are full sample codes about how to consume the shared topic test:
- Go
- Rust
- Python
- Java
- C#
- Node.js
package main
import (
"fmt"
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/ws/tmq"
"os"
)
func main() {
tmqStr := os.Getenv("TDENGINE_CLOUD_TMQ")
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": tmqStr,
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"group.id": "test_group",
"client.id": "test_consumer_ws",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("test", nil)
if err != nil {
panic(err)
}
defer consumer.Close()
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e.String())
consumer.Commit()
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
return
default:
fmt.Printf("unexpected event:%v\n", e)
return
}
}
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// subscribe
let tmq_str = std::env::var("TDENGINE_CLOUD_TMQ")?;
let tmq_uri = format!( "{}&group.id=test_group_rs&client.id=test_consumer_ws", tmq_str);
println!("request tmq URI is {tmq_uri}\n");
let tmq = TmqBuilder::from_dsn(tmq_uri,)?;
let mut consumer = tmq.build().await?;
consumer.subscribe(["test_topic"]).await?;
// consume loop
consumer
.stream()
.try_for_each_concurrent(10, |(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// A two-dimension matrix while each cell is a [taos::Value] object.
let values = block.to_values();
// Number of rows.
assert_eq!(values.len(), block.nrows());
// Number of columns
assert_eq!(values[0].len(), block.ncols());
println!("first row: {}", values[0].iter().join(", "));
}
}
consumer.commit(offset).await?;
Ok(())
})
.await?;
consumer.unsubscribe().await;
Ok(())
}
#!/usr/bin/env python
import os
from taosws import Consumer
endpoint = os.environ["TDENGINE_CLOUD_ENDPOINT"]
token = os.environ["TDENGINE_CLOUD_TOKEN"]
conf = {
# auth options
"td.connect.websocket.scheme": "wss",
"td.connect.ip": endpoint,
"td.connect.token": token,
# consume options
"group.id": "test_group_py",
"client.id": "test_consumer_ws_py",
}
consumer = Consumer(conf)
consumer.subscribe(["test_topic"])
while 1:
message = consumer.poll(timeout=1.0)
if message:
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
else:
break
consumer.close()
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) throws SQLException {
String url = System.getenv("TDENGINE_JDBC_URL");
Properties properties = new Properties();
properties.setProperty(TMQConstants.CONNECT_TYPE, "websocket");
properties.setProperty(TMQConstants.CONNECT_URL, url);
properties.setProperty(TMQConstants.CONNECT_TIMEOUT, "10000");
properties.setProperty(TMQConstants.CONNECT_MESSAGE_TIMEOUT, "10000");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.GROUP_ID, "gId");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapDeserializer");
try (TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList("test"));
for (int i = 0; i < 100; i++) {
ConsumerRecords<Map<String, Object>> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Map<String, Object>> r : consumerRecords) {
Map<String, Object> bean = r.value();
bean.forEach((k, v) -> {
System.out.print(k + " : " + v + " ");
});
System.out.println();
}
}
consumer.unsubscribe();
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
using System.Text.Json;
namespace Cloud.Examples
{
public class SubscribeDemo
{
private static string _host = "";
private static string _token = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
static void Main(string[] args)
{
var cloudEndPoint = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_ENDPOINT");
var cloudToken = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_TOKEN");
_host = cloudEndPoint.ToString();
_token = cloudToken.ToString();
try
{
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer(){
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", _groupId },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", _host},
{ "td.connect.port", "443" },
{ "useSSL", "true" },
{ "token", _token},
{ "client.id", _clientId },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
return new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "<TDC_TOPIC>";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine($"data: "+JsonSerializer.Serialize(message));
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void InsertData()
{
var cloudEndPoint = Environment.GetEnvironmentVariable("CLOUD_ENDPOINT");
var cloudToken = Environment.GetEnvironmentVariable("CLOUD_TOKEN");
var connectionString = $"protocol=WebSocket;host={cloudEndPoint};port=443;useSSL=true;token={cloudToken};";
// Connect to TDengine server using WebSocket
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into test.d991 using test.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}
const taos = require('@tdengine/websocket');
const url = process.env.TDENGINE_CLOUD_TMQ;
const topic = 'topic_meters';
const topics = [topic];
const groupId = 'group1';
const clientId = 'client1';
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.AUTO_OFFSET_RESET, 'earliest'],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'],
]);
try {
// create consumer
let consumer = await taos.tmqConnect(configMap);
console.log(
`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`
);
return consumer;
} catch (err) {
console.error(
`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`
);
throw err;
}
}
async function testConsumer() {
let consumer = await createConsumer();
try {
// subscribe
await consumer.subscribe(topics);
console.log(`Subscribe topics successfully, topics: ${topics}`);
for (let i = 0; i < 100; i++) {
// poll
let res = await consumer.poll(1000);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${JSON.stringify(value, replacer)}`);
}
// commit
await consumer.commit();
}
// seek
let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment);
console.log('Assignment seek to beginning successfully');
// clean
await consumer.unsubscribe();
} catch (err) {
console.error(
`Failed to consumer, ErrCode: ${err.code}, ErrMessage: ${err.message}`
);
throw err;
} finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}
// Custom replacer function to handle BigInt serialization
function replacer(key, value) {
if (typeof value === 'bigint') {
return value.toString(); // Convert BigInt to string
}
return value;
}
testConsumer();