Data Subscription
TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.
To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.
By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.
To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.
The following are some explanations about data subscription, which require some understanding of the architecture of TDengine and the use of various language linker interfaces(you can learn it when you need it).
- A consumption group consumes all data under the same topic, and different consumption groups are independent of each other;
- A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data;
- On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups;
- Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through offset interface. The offset interface obtains the offset of the first record in the block;
- If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data;
- If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready;
- The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for;
- Consumers can call position interface to obtain the offset of the current consumption, seek to the specified offset, and consume again;
- Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data;
- Position is to obtain the current consumption position, which is the position to be taken next time, not the current consumption position
- Commit is the submission of the consumption location. Without parameters, it is the submission of the current consumption location (the location to be taken next time, not the current consumption location). With parameters, it is the location in the submission parameters (i.e. the location to be taken after the next exit and restart)
- Seek is to set the consumer's consumption position. Wherever the seek goes, the position will be returned, all of which are the positions to be taken next time
- Seek does not affect commit, commit does not affect seek, independent of each other, the two are different concepts
- The begin interface is the offset of the first data in wal, and the end interface is the offset+1 of the last data in wal10.
- Before the seek operation, tmq must be call assignment interface, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported;
- Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number;
- The offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four;
- Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter
WAL_RETENTION_PERIOD
orWAL_RETENTION_SIZE
when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.
This document does not provide any further introduction to the knowledge of message queues themselves. If you need to know more, please search for it yourself.
Note: Starting from version 3.2.0.0, data subscription supports vnode migration and splitting. Due to the dependence of data subscription on wal files, wal does not synchronize during vnode migration and splitting. Therefore, after migration or splitting, wal data that has not been consumed before cannot be consumed. So please ensure that all data has been consumed before proceeding with vnode migration or splitting, otherwise data loss may occur during consumption.
Data Schema and API
The related schemas and APIs in various languages are described as follows(Note that the consumer structure is not thread safe. When using a consumer on one thread, do not close the consumer on another thread):
- C
- Java
- Python
- Go
- Rust
- Node.JS
- C#
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end; // The last version of wal + 1
} tmq_topic_assignment;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); //Commit the msg’s offset + 1
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); // Get current offset of the result
DLL_EXPORT const char *tmq_err2str(int32_t code);
The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException;
Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
Set<TopicPartition> assignment() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
void close() throws SQLException;
class Consumer:
def subscribe(self, topics):
pass
def unsubscribe(self):
pass
def poll(self, timeout: float = 1.0):
pass
def assignment(self):
pass
def poll(self, timeout: float = 1.0):
pass
def close(self):
pass
def commit(self, message):
pass
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event
// tmq.TopicPartition is reserved for compatibility purpose
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self,
topics: I,
) -> Result<(), Self::Error>;
fn stream(
&self,
) -> Pin<
Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
>,
>,
>;
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self);
For more information, see Crate taos.
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
class ConsumerBuilder<TValue>
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public IConsumer<TValue> Build()
void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic)
ConsumeResult<TValue> Consume(int millisecondsTimeout)
List<string> Subscription()
void Unsubscribe()
List<TopicPartitionOffset> Commit()
void Close()
Insert Data into TDengine
A database including one supertable and two subtables is created as follows:
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
Create a Topic
The following SQL statement creates a topic in TDengine:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- There is an upper limit to the number of topics created, controlled by the parameter tmqMaxTopicNum, with a default of 20
Multiple subscription types are supported.
Subscribe to a Column
Syntax:
CREATE TOPIC topic_name as subquery
You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as SELECT *
and SELECT ts, cl
are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:
- The schema of topics created in this manner is determined by the subscribed data.
- You cannot modify (
ALTER <table> MODIFY
) or delete (ALTER <table> DROP
) columns or tags that are used in a subscription or calculation. - Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.
Subscribe to a Supertable
Syntax:
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
Creating a topic in this manner differs from a SELECT * from stbName
statement as follows:
- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- The 'with meta' parameter is optional. When selected, statements such as creating super tables and sub tables will be returned, mainly used for Taosx to perform super table migration
- The 'where_condition' parameter is optional and will be used to filter and subscribe to sub tables that meet the criteria. Where conditions cannot have ordinary columns, only tags or tbnames. Functions can be used in where conditions to filter tags, but cannot be aggregate functions because sub table tag values cannot be aggregated. It can also be a constant expression, such as 2>1 (subscribing to all child tables), Or false (subscribe to 0 sub tables)
- The data returned does not include tags.
Subscribe to a Database
Syntax:
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
This SQL statement creates a subscription to all tables in the database.
- The 'with meta' parameter is optional. When selected, it will return statements for creating all super tables and sub tables in the database, mainly used for Taosx database migration
Create a Consumer
You configure the following parameters when creating a consumer:
Parameter | Type | Description | Remarks |
---|---|---|---|
td.connect.ip | string | IP address of the server side | |
td.connect.user | string | User Name | |
td.connect.pass | string | Password | |
td.connect.port | string | Port of the server side | |
group.id | string | Consumer group ID; consumers with the same ID are in the same group | Required. Maximum length: 192. Each topic can create up to 100 consumer groups. |
client.id | string | Client ID | Maximum length: 192. |
auto.offset.reset | enum | Initial offset for the consumer group | earliest : subscribe from the earliest data, this is the default behavior(version < 3.2.0.0); latest : subscribe from the latest data, this is the default behavior(version >= 3.2.0.0); or none : can't subscribe without committed offset |
enable.auto.commit | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
auto.commit.interval.ms | integer | Interval for automatic commits, in milliseconds | |
msg.with.table.name | boolean | Specify whether to deserialize table names from messages. Not applicable if subscribe to a column (tbname can be written as a column in the subquery statement during column subscriptions) (This parameter has been deprecated since version 3.2.0.0 and remains true) | default value: false |
enable.replay | boolean | Specify whether data replay function enabled or not | default value: false |
The method of specifying these parameters depends on the language used:
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
Java programs use the following parameters:
| Parameter | Type | Description | Remarks |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| td.connect.type
| string | connection type: "jni" means native connection, "ws" means websocket connection, the default is "jni" |
| bootstrap.servers
| string |Connection address, such as localhost:6030
|
| value.deserializer
| string | Value deserializer; to use this method, implement the com.taosdata.jdbc.tmq.Deserializer
interface or inherit the com.taosdata.jdbc.tmq.ReferenceDeserializer
type |
| value.deserializer.encoding
| string | Specify the encoding for string deserialization | |
Note: The bootstrap.servers
parameter is used instead of td.connect.ip
and td.connect.port
to provide an interface that is consistent with Kafka.
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
conf := &tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
}
consumer, err := NewConsumer(conf)
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "latest");
let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?;
from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer(
{
"group.id": "local",
"client.id": "1",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "latest",
"msg.with.table.name": "true",
}
)
// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass)
let consumer = taos.consumer({
'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000',
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'auto.offset.reset','latest',
'msg.with.table.name': 'true',
'td.connect.ip','127.0.0.1',
'td.connect.port','6030'
});
var cfg = new Dictionary<string, string>()
{
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "td.connect.port", "6030" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.
Data replay function description:
- Subscription adds replay function, which replays according to the time of data writing.
For example, writing three pieces of data at the following time.
After subscribing to the first data for 5 seconds, the second data is returned, and after obtaining the second data for 3 seconds, the third data is returned.
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000 - Only column subscriptions support data replay.
- Replay needs to ensure an independent timeline
- If it is a sub table subscription or a normal table subscription, only one vnode has data, ensuring a timeline.
- If subscribing to a super table, it is necessary to ensure that the DB has only one vnode, otherwise an error will be reported (because the data subscribed to on multiple vnodes is not on the same timeline).
- Super table and database subscriptions do not support replay
- Add the enable.replay parameter. True indicates that the subscription replay function is enabled, while false indicates that the subscription replay function is not enabled by default.
- Replay does not support progress saving, so when the replay parameter enable, auto commit will automatically close.
- Due to the processing time required for data replay, there is an error of tens of milliseconds in the accuracy of replay.
Subscribe to a Topic
A single consumer can subscribe to multiple topics.
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
// Create a list of subscribed topics
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// Enable subscription
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
consumer.subscribe(["tmq_meters"]).await?;
consumer.subscribe(['topic1', 'topic2'])
// Create a list of subscribed topics
let topics = ['topic_test']
// Enable subscription
consumer.subscribe(topics);
// Create a list of subscribed topics
List<String> topics = new List<string>();
topics.add("tmq_topic");
// Enable subscription
consumer.Subscribe(topics);
Consume messages
The following code demonstrates how to consume the messages in a queue.
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
## Consume data
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
msg_process(msg);
}
The while
loop obtains a message each time it calls tmq_consumer_poll()
. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.
while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
processMsg(meter);
}
}
for {
ev := consumer.Poll(0)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
while True:
res = consumer.poll(100)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
## Consume data
while (true)
{
using (var result = consumer.Consume(500))
{
if (result == null) continue;
ProcessMsg(result);
consumer.Commit();
}
}
Close the consumer
After message consumption is finished, the consumer is unsubscribed.
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/* Unsubscribe */
tmq_unsubscribe(tmq);
/* Close consumer object */
tmq_consumer_close(tmq);
/* Unsubscribe */
consumer.unsubscribe();
/* Close consumer */
consumer.close();
/* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
consumer.unsubscribe().await;
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
consumer.unsubscribe();
consumer.close();
// Unsubscribe
consumer.Unsubscribe();
// Close consumer
consumer.Close();
Delete a Topic
You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.
/* Delete topic/
DROP TOPIC topic_name;
Check Status
- Query all existing topics.
SHOW TOPICS;
- Query the status and subscribed topics of all consumers.
SHOW CONSUMERS;
- Query the relationships between consumers and vgroups.
SHOW SUBSCRIPTIONS;
Examples
The following section shows sample code in various languages.
- C
- Java
- Go
- Rust
- Python
- Node.JS
- C#
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf);
}
return rows;
}
static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes;
// drop database if exists
printf("create database\n");
pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database tmqdb wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create super table
printf("create super table\n");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create sub tables
printf("create sub tables\n");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// insert data
printf("insert data into sub tables\n");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) {
tmq_conf_destroy(conf);
return NULL;
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
return topicList;
}
void basic_consume_loop(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
} else {
break;
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int main(int argc, char* argv[]) {
int32_t code;
if (init_env() < 0) {
return -1;
}
if (create_topic() < 0) {
return -1;
}
tmq_t* tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n");
return -1;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
}
tmq_list_destroy(topic_list);
basic_consume_loop(tmq);
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% Consumer closed\n");
}
return 0;
}
- native connection
- WebSocket connection
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class SubscribeDemo {
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test1");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class WebsocketSubscribeDemo {
private static final String TOPIC = "tmq_topic_ws";
private static final String DB_NAME = "meters_ws";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true";
try (Connection connection = DriverManager.getConnection(jdbcUrl);
Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME + " wal_retention_period 3600");
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
properties.setProperty(TMQConstants.GROUP_ID, "test2");
properties.setProperty(TMQConstants.CLIENT_ID, "1");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taos.example.MetersDeserializer");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Meters> r : meters) {
Meters meter = (Meters) r.value();
System.out.println(meter);
}
}
consumer.unsubscribe();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
timer.cancel();
}
}
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}
package main
import (
"fmt"
"os"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
func main() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq wal_retention_period 3600")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
go func() {
for {
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
time.Sleep(time.Microsecond * 100)
}
}()
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.String())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
err = consumer.Unsubscribe()
if err != nil {
panic(err)
}
err = consumer.Close()
if err != nil {
panic(err)
}
}
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build().await?;
let db = "tmq";
// prepare database
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build().await?;
consumer.subscribe(["tmq_meters"]).await?;
// ANCHOR: consumer_commit_manually
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
consumer.commit(offset).await?;
Ok(())
})
.await?;
// ANCHOR_END: consumer_commit_manually
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
// ANCHOR_END: seek_offset
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
// after seek offset
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
consumer.unsubscribe().await;
task.await??;
Ok(())
}
from taos.tmq import Consumer
import taos
def init_tmq_env(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {} wal_retention_period 3600 keep 36500".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")
const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" });
var cursor = conn.cursor();
function runConsumer() {
// create topic
cursor.execute("create topic topic_name_example as select * from meters");
let consumer = taos.consumer({
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'msg.with.table.name': 'true',
'enable.auto.commit': 'true'
});
// subscribe the topic just created.
consumer.subscribe("topic_name_example");
// get subscribe topic list
let topicList = consumer.subscription();
console.log(topicList);
for (let i = 0; i < 5; i++) {
let msg = consumer.consume(100);
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
consumer.unsubscribe();
consumer.close();
// drop topic
cursor.execute("drop topic topic_name_example");
}
try {
runConsumer();
} finally {
setTimeout(() => {
cursor.close();
conn.close();
}, 2000);
}
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace TMQExample
{
internal class SubscribeDemo
{
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args)
{
try
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("CREATE DATABASE IF NOT EXISTS power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC IF NOT EXISTS topic_meters as SELECT * from power.meters");
var consumer = CreateConsumer();
// insert data
Task.Run(InsertData);
// consume message
Consume(consumer);
// seek
Seek(consumer);
// commit
CommitOffset(consumer);
// close
Close(consumer);
Console.WriteLine("Done");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(e.Message);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(e.Message);
throw;
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("host=127.0.0.1;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec(
"INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
static IConsumer<Dictionary<string, object>> CreateConsumer()
{
// ANCHOR: create_consumer
// consumer config
_host = "127.0.0.1";
_groupId = "group1";
_clientId = "client1";
var cfg = new Dictionary<string, string>()
{
{ "td.connect.port", "6030" },
{ "auto.offset.reset", "latest" },
{ "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" },
{ "group.id", _groupId },
{ "client.id", _clientId },
{ "td.connect.ip", _host },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
};
IConsumer<Dictionary<string, object>> consumer = null!;
try
{
// create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine(
$"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: create_consumer
return consumer;
}
static void Consume(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: subscribe
_topic = "topic_meters";
try
{
// subscribe
consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: subscribe
}
static void Seek(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: seek
try
{
// get assignment
var assignment = consumer.Assignment;
Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning
foreach (var topicPartition in assignment)
{
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
}
Console.WriteLine("Assignment seek to beginning successfully");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw;
}
// ANCHOR_END: seek
}
static void CommitOffset(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: commit_offset
for (int i = 0; i < 5; i++)
{
TopicPartitionOffset topicPartitionOffset = null;
try
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
// commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset>
{
topicPartitionOffset,
});
Console.WriteLine("Commit offset manually successfully.");
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw;
}
}
// ANCHOR_END: commit_offset
}
static void Close(IConsumer<Dictionary<string, object>> consumer)
{
// ANCHOR: close
try
{
// unsubscribe
consumer.Unsubscribe();
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw;
}
finally
{
// close consumer
consumer.Close();
Console.WriteLine("Consumer closed successfully.");
}
// ANCHOR_END: close
}
}
}