Skip to main content

TDengine Rust Client Library

Crates.io Crates.io docs.rs

taos is the official Rust client library for TDengine. Rust developers can develop applications to access the TDengine instance data.

The source code for the Rust client library is located on GitHub.

Connection types

taos provides two ways to establish connections, among which we recommend using websocket connection.

  • Native Connection, which connects to TDengine instances via the TDengine client driver (taosc).
  • WebSocket connection, which connects to TDengine instances via the WebSocket interface provided by taosAdapter.

You can specify a connection type with Cargo features. By default, both types are supported.

For a detailed introduction of the connection types, please refer to: Establish Connection

Supported platforms

Native connections are supported on the same platforms as the TDengine client driver. Websocket connections are supported on all platforms that can run Go.

Version history

connector-rust versionTDengine versionmajor features
v0.12.03.2.3.0 or laterWS supports compression
v0.11.03.2.0.0TMQ feature optimization
v0.10.03.1.0.0WS endpoint changes
v0.9.23.0.7.0STMT: Get tag_fields and col_fields under ws.
v0.8.123.0.5.0TMQ: Get consuming progress and seek offset to consume.
v0.8.03.0.4.0Support schemaless insert.
v0.7.63.0.3.0Support req_id in query.
v0.6.03.0.0.0Base features.

The Rust client library is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.

Handling exceptions

After the error is reported, the specific information of the error can be obtained:

match conn.exec(sql) {
Ok(_) => {
Ok(())
}
Err(e) => {
eprintln!("ERROR: {:?}", e);
Err(e)
}
}

TDengine DataType vs. Rust DataType

TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Rust is as follows:

TDengine DataTypeRust DataType
TIMESTAMPTimestamp
INTi32
BIGINTi64
FLOATf32
DOUBLEf64
SMALLINTi16
TINYINTi8
BOOLbool
BINARYVec<u8>
NCHARString
JSONserde_json::Value

Note: Only TAG supports JSON types

Installation Steps

Pre-installation preparation

  • Install the Rust development toolchain
  • If using the native connection, please install the TDengine client driver. Please refer to install client driver

Install the client library

Depending on the connection method, add the taos dependency in your Rust project as follows:

In cargo.toml, add taos:

[dependencies]
# use default feature
taos = "*"

Establishing a connection

TaosBuilder creates a connection constructor through the DSN connection description string.

let builder = TaosBuilder::from_dsn("taos://")?;

You can now use this object to create the connection.

let conn = builder.build()?;

The connection object can create more than one.

let conn1 = builder.build()?;
let conn2 = builder.build()?;

The structure of the DSN description string is as follows:

<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver| protocol | | username | password | host | port | database | params |

The parameters are described as follows:

  • driver: Specify a driver name so that the client library can choose which method to use to establish the connection. Supported driver names are as follows:
    • taos: Table names use the TDengine native connection driver.
    • tmq: Use the TMQ to subscribe to data.
    • http/ws: Use Websocket to establish connections.
    • https/wss: Use Websocket to establish connections, and enable SSL/TLS.
  • protocol: Specify which connection method to use. For example, taos+ws://localhost:6041 uses Websocket to establish connections.
  • username/password: Username and password used to create connections.
  • host/port: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to localhost:6030 and Websocket connections default to localhost:6041.
  • database: Specify the default database to connect to. It's optional.
  • params: Optional parameters.

A sample DSN description string is as follows:

taos+ws://localhost:6041/test

This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database test by default.

You can create DSNs to connect to servers in your environment.

use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

// use websocket protocol.
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();

After the connection is established, you can perform operations on your database.

async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;

let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;

assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;

for field in result.fields() {
println!("got field: {}", field.name());
}

let values = result.
}

There are two ways to query data: Using built-in types or the serde deserialization framework.

    // Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}

// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;

dbg!(records);
Ok(())

Usage examples

Create database and tables

use taos::*;
use chrono::Local;
use chrono::DateTime;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;

let taos = builder.build().await?;

// query data, make sure the database and table are created before
let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query(sql).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}



// query data, make sure the database and table are created before
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// binary/varchar to String
location: String,
}

let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await {
Ok(mut query) => {
match query.deserialize::<Record>().try_collect::<Vec<_>>().await {
Ok(records) => {
dbg!(records);
}
Err(err) => {
eprintln!("Failed to deserialize query results; ErrMessage: {}", err);
return Err(err.into());
}
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}


let req_id :u64 = 3;
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to execute sql with reqId: {}, ErrMessage: {}", req_id, err);
return Err(err.into());
}
}

Ok(())
}

view source code

The query is consistent with operating a relational database. When using subscripts to get the contents of the returned fields, you have to start from 1. However, we recommend using the field names to get the values of the fields in the result set.

Insert data

use taos::*;
use chrono::Local;
use chrono::DateTime;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;

let taos = builder.build().await?;

// query data, make sure the database and table are created before
let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query(sql).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}



// query data, make sure the database and table are created before
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// binary/varchar to String
location: String,
}

let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query("SELECT ts, current, location FROM power.meters limit 100").await {
Ok(mut query) => {
match query.deserialize::<Record>().try_collect::<Vec<_>>().await {
Ok(records) => {
dbg!(records);
}
Err(err) => {
eprintln!("Failed to deserialize query results; ErrMessage: {}", err);
return Err(err.into());
}
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}


let req_id :u64 = 3;
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to execute sql with reqId: {}, ErrMessage: {}", req_id, err);
return Err(err.into());
}
}

Ok(())
}

view source code

Query data

// query data, make sure the database and table are created before
let sql = "SELECT ts, current, location FROM power.meters limit 100";
match taos.query(sql).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to query data from power.meters, sql: {}, ErrMessage: {}", sql, err);
return Err(err.into());
}
}


view source code

execute SQL with req_id

The reqId is very similar to TraceID in distributed tracing systems. In a distributed system, a request may need to pass through multiple services or modules to be completed. The reqId is used to identify and associate all related operations of this request, allowing us to track and understand the complete execution path of the request. Here are some primary usage of reqId:

  • Request Tracing: By associating the same reqId with all related operations of a request, we can trace the complete path of the request within the system.
  • Performance Analysis: By analyzing a request's reqId, we can understand the processing time of the request across various services or modules, thereby identifying performance bottlenecks.
  • Fault Diagnosis: When a request fails, we can identify the location of the issue by examining the reqId associated with that request.

If the user does not set a reqId, the client library will generate one randomly internally, but it is still recommended for the user to set it, as it can better associate with the user's request.


let req_id :u64 = 3;
match taos.query_with_req_id("SELECT ts, current, location FROM power.meters limit 1", req_id).await{
Ok(mut result) => {
for field in result.fields() {
println!("got field: {}", field.name());
}

let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
}
Err(err) => {
eprintln!("Failed to execute sql with reqId: {}, ErrMessage: {}", req_id, err);
return Err(err.into());
}
}

view source code

Writing data via parameter binding

TDengine has significantly improved the bind APIs to support data writing (INSERT) scenarios. Writing data in this way avoids the resource consumption of SQL syntax parsing, resulting in significant write performance improvements in many cases.

Parameter binding details see API Reference

use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;

taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;

let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;

const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];

// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}

match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}

// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}

Ok(())
}

view source code

Schemaless Writing

TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see Schemaless Writing.

use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;

use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("taos://{}:6030", host);
log::debug!("dsn: {:?}", &dsn);

let client = TaosBuilder::from_dsn(dsn)?.build().await?;

let db = "power";

client
.exec(format!("create database if not exists {db}"))
.await?;

// should specify database before insert
client.exec(format!("use {db}")).await?;

// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}

// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}

// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}

println!("Inserted data with schemaless successfully.");
Ok(())
}

view source code

Schemaless with req_id

This req_id can be used to request link tracing.

let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.data(data)
.req_id(100u64)
.build()?;

client.put(&sml_data)?

Data Subscription

TDengine starts subscriptions through TMQ.

Create a Topic

taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;

view source code

Create a Consumer

Create a consumer:

let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;

view source code

Subscribe to consume data

A single consumer can subscribe to one or more topics.

use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;

let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;

// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;

taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;

let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());

let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};

let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});

let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}

#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;


consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;

let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);

// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);

match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}

let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);

consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");

tokio::time::sleep(Duration::from_secs(1)).await;

handle.join().unwrap();

taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}

view source code

The TMQ is of futures::Stream type. You can use the corresponding API to consume each message in the queue and then use .commit to mark them as consumed.

let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}

#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;

view source code

Get assignments:

Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0

use chrono::DateTime;
use chrono::Local;
use std::str::FromStr;
use std::thread;
use std::time::Duration;
use taos::*;
use tokio::runtime::Runtime;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
let dsn = "taos://localhost:6030".to_string();
println!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;

let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;

// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;

taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;

let group_id = "group1".to_string();
let client_id = "client1".to_string();
dsn.params
.insert("auto.offset.reset".to_string(), "latest".to_string());
dsn.params
.insert("msg.with.table.name".to_string(), "true".to_string());
dsn.params
.insert("enable.auto.commit".to_string(), "true".to_string());
dsn.params
.insert("auto.commit.interval.ms".to_string(), "1000".to_string());
dsn.params.insert("group.id".to_string(), group_id.clone());
dsn.params
.insert("client.id".to_string(), client_id.clone());

let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = match builder.build().await {
Ok(consumer) => {
println!(
"Create consumer successfully, dsn: {}, groupId: {}, clientId: {}.",
dsn, group_id, client_id
);
consumer
}
Err(err) => {
eprintln!("Failed to create native consumer, dsn: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", dsn, group_id, client_id, err);
return Err(err.into());
}
};

let handle = thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
tokio::time::sleep(Duration::from_secs(1)).await;
let taos_insert = TaosBuilder::from_dsn(&dsn).unwrap().build().await.unwrap();
for i in 0..50 {
let insert_sql = format!(r#"INSERT INTO
power.d1001 USING power.meters TAGS(2,'California.SanFrancisco')
VALUES
(NOW, 10.30000, {}, 0.31000)"#, i);
if let Err(e) = taos_insert.exec(insert_sql).await {
eprintln!("Failed to execute insert: {:?}", e);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
});

let topic = "topic_meters";
match consumer.subscribe([topic]).await {
Ok(_) => println!("Subscribe topics successfully."),
Err(err) => {
eprintln!(
"Failed to subscribe topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, err
);
return Err(err.into());
}
}

#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}

consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async move {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
Ok(())
})
.await
.map_err(|e| {
eprintln!(
"Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}",
topic, group_id, client_id, e
);
e
})?;


consumer
.stream_with_timeout(Timeout::from_secs(10))
.try_for_each(|(offset, message)| async {
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {} of topic {}\n", vgroup_id, topic);

if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let records: Vec<Record> = block.deserialize().try_collect()?;
// Add your data processing logic here
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
// commit offset manually when you have processed the message.
match consumer.commit(offset).await{
Ok(_) => println!("Commit offset manually successfully."),
Err(err) => {
eprintln!("Failed to commit offset manually, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, err);
return Err(err.into());
}
}
Ok(())
})
.await.map_err(|e| {
eprintln!("Failed to poll data, topic: {}, groupId: {}, clientId: {}, ErrMessage: {:?}", topic, group_id, client_id, e);
e
})?;

let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);

// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);

match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}

let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);

consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");

tokio::time::sleep(Duration::from_secs(1)).await;

handle.join().unwrap();

taos.exec_many(["drop topic topic_meters", "drop database power"])
.await?;
Ok(())
}

view source code

Assignment subscription Offset

Seek offset:

Version requirements connector-rust >= v0.8.8, TDengine >= 3.0.5.0

let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("assignments: {:?}", assignments);

// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
println!(
"topic: {}, vgroup_id: {}, current offset: {}, begin {}, end: {}",
topic, vgroup_id, current, begin, end
);

match consumer.offset_seek(topic, vgroup_id, begin).await {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to seek offset, topic: {}, groupId: {}, clientId: {}, vGroupId: {}, begin: {}, ErrMessage: {:?}",
topic, group_id, client_id, vgroup_id, begin, err);
return Err(err.into());
}
}
}

let topic_assignment = consumer.topic_assignment(topic).await;
println!("topic assignment: {:?}", topic_assignment);
}
println!("Assignment seek to beginning successfully.");
// after seek offset
let assignments = match consumer.assignments().await {
Some(assignments) => assignments,
None => {
let error_message = format!(
"Failed to get assignments. topic: {}, groupId: {}, clientId: {}",
topic, group_id, client_id
);
eprintln!("{}", error_message);
return Err(anyhow::anyhow!(error_message));
}
};
println!("After seek offset assignments: {:?}", assignments);

view source code

Close subscriptions

consumer.unsubscribe().await;
println!("Consumer unsubscribed successfully.");

view source code

The following parameters can be configured for the TMQ DSN. Only group.id is mandatory.

  • group.id: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
  • client.id: Subscriber client ID.
  • auto.offset.reset: Initial point of subscription. earliest subscribes from the beginning, and latest subscribes from the newest message. The default value varies depending on the TDengine version. For details, see Data Subscription. Note: This parameter is set per consumer group.
  • enable.auto.commit: Automatically commits. This can be enabled when data consistency is not essential.
  • auto.commit.interval.ms: Interval for automatic commits.

Full Sample Code

For more information, see GitHub sample file.

Use with connection pool

In complex applications, we recommend enabling connection pools. taos implements connection pools based on r2d2.

As follows, a connection pool with default parameters can be generated.

let pool = TaosBuilder::from_dsn(dsn)?.pool()?;

You can set the same connection pool parameters using the connection pool's constructor.

let dsn = "taos://localhost:6030";

let opts = PoolBuilder::new()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_secs(2));

let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;

In the application code, use pool.get()? to get a connection object Taos.

let taos = pool.get()?;

More sample programs

The source code of the sample application is under TDengine/docs/examples/rust :

rust example

Frequently Asked Questions

For additional troubleshooting, see FAQ.

API Reference

The Taos object provides an API to perform operations on multiple databases.

  1. exec: Execute some non-query SQL statements, such as CREATE, ALTER, INSERT, etc.

    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
  2. exec_many: Run multiple SQL statements simultaneously or in order.

    taos.exec_many([
    "CREATE DATABASE test",
    "USE test",
    "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
  3. query: Run a query statement and return a [ResultSet] object.

    let mut q = taos.query("select * from log.logs").await?;

    The [ResultSet] object stores query result data and the names, types, and lengths of returned columns

    You can obtain column information by using [.fields()].

    let cols = q.fields();
    for col in cols {
    println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
    }

    It fetches data line by line.

    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
    for (col, (name, value)) in row.enumerate() {
    println!(
    "[{}] got value in col {} (named `{:>8}`): {}",
    nrows, col, name, value
    );
    }
    nrows += 1;
    }

    Or use the serde deserialization framework.

    #[derive(Debug, Deserialize)]
    struct Record {
    // deserialize timestamp to chrono::DateTime<Local>
    ts: DateTime<Local>,
    // float to f32
    current: Option<f32>,
    // int to i32
    voltage: Option<i32>,
    phase: Option<f32>,
    groupid: i32,
    // binary/varchar to String
    location: String,
    }

    let records: Vec<Record> = taos
    .query("select * from `meters`")
    .await?
    .deserialize()
    .try_collect()
    .await?;

Note that Rust asynchronous functions and an asynchronous runtime are required.

Taos provides Rust methods for some SQL statements to reduce the number of format!s.

  • .describe(table: &str): Executes DESCRIBE and returns a Rust data structure.
  • .create_database(database: &str): Executes the CREATE DATABASE statement.
  • .use_database(database: &str): Executes the USE statement.

In addition, this structure is also the entry point for Parameter Binding and Line Protocol Interface. Please refer to the specific API descriptions for usage.

Bind Interface

Similar to the C interface, Rust provides the bind interface's wrapping. First, the Taos object creates a parameter binding object Stmt for an SQL statement.

let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

The bind object provides a set of interfaces for implementing parameter binding.

.set_tbname(name)

To bind table names.

let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;

.set_tags(&[tag])

Bind sub-table table names and tag values when the SQL statement uses a super table.

let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("taos".to_string())])?;

.bind(&[column])

Bind value types. Use the [ColumnView] structure to create and bind the required types.

let params = vec![
ColumnView::from_millis_timestamp(vec![164000000000]),
ColumnView::from_bools(vec![true]),
ColumnView::from_tiny_ints(vec![i8::MAX]),
ColumnView::from_small_ints(vec![i16::MAX]),
ColumnView::from_ints(vec![i32::MAX]),
ColumnView::from_big_ints(vec![i64::MAX]),
ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
ColumnView::from_unsigned_ints(vec![u32::MAX]),
ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
ColumnView::from_floats(vec![f32::MAX]),
ColumnView::from_doubles(vec![f64::MAX]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;

.execute()

Execute SQL. Stmt objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with .add_batch.

stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;

For a working example, see GitHub.

For information about other structure APIs, see the Rust documentation.