Skip to main content

Data Subscritpion

This topic introduces how to read out data from TDengine using data subscription, which is an advanced feature in TDengine. To access the data in TDengine in data subscription way, you need to create topic, create consumer, subscribe to a topic, and consume data. In this document we will briefly explain these main steps of data subscription.

Create Topic

A topic can be created on a database, on some selected columns,or on a supertable.

Topic on Columns

The most common way to create a topic is to create a topic on some specifically selected columns. The Syntax is like below:

CREATE TOPIC topic_name as subquery;

You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as SELECT * and SELECT ts, cl are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:

  • The schema of topics created in this manner is determined by the subscribed data.
  • You cannot modify (ALTER <table> MODIFY) or delete (ALTER <table> DROP) columns or tags that are used in a subscription or calculation.
  • Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.

For example:

CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;

Topic on SuperTable

Syntax:

CREATE TOPIC topic_name AS STABLE stb_name;

Creating a topic in this manner differs from a SELECT * from stbName statement as follows:

  • The table schema can be modified.
  • Unstructured data is returned. The format of the data returned changes based on the supertable schema.
  • A different table schema may exist for every data block to be processed.
  • The data returned does not include tags.

Topic on Database

Syntax:

CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;

This SQL statement creates a subscription to all tables in the database. You can add the WITH META parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.

Programming Model

To subscribe the data from a created topic, the client program needs to follow the programming model described in this section.

  1. Create Consumer

To create a consumer, you must use the APIs provided by TDengine connectors. Below is the sample code of using connectors of different languages.

  1. Subscribe to a Topic

A single consumer can subscribe to multiple topics.

  1. Consume messages

  2. Subscribe to a Topic

A single consumer can subscribe to multiple topics.

  1. Consume Data

  2. Close the consumer

After message consumption is finished, the consumer is unsubscribed.

Sample Code

use std::time::Duration;

use chrono::{DateTime, Local};
use taos::*;

// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}

async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
"use tmq",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;

let builder = TaosBuilder::from_dsn(&dsn)?;

let taos = builder.build()?;

// prepare database
taos.exec_many([
"DROP TOPIC IF EXISTS tmq_meters",
"USE tmq",
"CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))",
"CREATE TOPIC tmq_meters with META AS DATABASE tmq"
])
.await?;

let task = tokio::spawn(prepare(taos));

tokio::time::sleep(Duration::from_secs(1)).await;

// subscribe
let dsn2 = format!("{dsn}&group.id=test");
dbg!(&dsn2);
let tmq = TmqBuilder::from_dsn(dsn2)?;

let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
println!("start subscription");

{
let mut stream = consumer.stream();

while let Some((offset, message)) = stream.try_next().await? {
// get information from offset

// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}

consumer.unsubscribe().await;

task.await??;

Ok(())
}

view source code

Delete Topic

Once a topic becomes useless, it can be deleted.

You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.

DROP TOPIC topic_name;

Check Status

At any time, you can check the status of existing topics and consumers.

  1. Query all existing topics.
SHOW TOPICS;
  1. Query the status and subscribed topics of all consumers.
SHOW CONSUMERS;