Skip to main content

TDengine Rust Connector

libtaos is the official Rust language connector for TDengine. Rust developers can develop applications to access the TDengine instance data.

libtaos provides two ways to establish connections. One is the Native Connection, which connects to TDengine instances via the TDengine client driver (taosc). The other is REST connection, which connects to TDengine instances via taosAdapter's REST interface.

The source code for libtaos is hosted on GitHub.

Supported platforms

The platforms supported by native connections are the same as those supported by the TDengine client driver. REST connections are supported on all platforms that can run Rust.

Version support

Please refer to version support list.

The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 2.4 or higher to avoid known issues.

Installation

Pre-installation

  • Install the Rust development toolchain
  • If using the native connection, please install the TDengine client driver. Please refer to install client driver

Adding libtaos dependencies

Add the libtaos dependency to the Rust project as follows, depending on the connection method selected.

Add libtaos to the Cargo.toml file.

[dependencies]
# use default feature
libtaos = "*"

Using connection pools

Please enable the r2d2 feature in Cargo.toml.

[dependencies]
# with taosc
libtaos = { version = "*", features = ["r2d2"] }
# or rest
libtaos = { version = "*", features = ["rest", "r2d2"] }

Create a connection

The TaosCfgBuilder provides the user with an API in the form of a constructor for the subsequent creation of connections or use of connection pools.

let cfg: TaosCfg = TaosCfgBuilder::default()
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log") // do not set if not require a default database.
.port(6030u16)
.build()
.expect("TaosCfg builder error");
}

You can now use this object to create the connection.

let conn = cfg.connect()? ;

The connection object can create more than one.

let conn = cfg.connect()? ;
let conn2 = cfg.connect()? ;

You can use connection pools in applications.

let pool = r2d2::Pool::builder()
.max_size(10000) // max connections
.build(cfg)? ;

// ...
// Use pool to get connection
let conn = pool.get()? ;

After that, you can perform the following operations on the database.

async fn demo() -> Result<(), Error> {
// get connection ...

// create database
conn.exec("create database if not exists demo").await?
// change database context
conn.exec("use demo").await?
// create table
conn.exec("create table if not exists tb1 (ts timestamp, v int)").await?
// insert
conn.exec("insert into tb1 values(now, 1)").await?
// query
let rows = conn.query("select * from tb1").await?
for row in rows.rows {
println!("{}", row.into_iter().join(","));
}
}

Usage examples

Write data

SQL Write

use libtaos::*;

#[tokio::main]
async fn main() -> Result<(), Error> {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.create_database("power").await?;
taos.exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let sql = "INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
let result = taos.query(sql).await?;
println!("{:?}", result);
Ok(())
}

// output:
// TaosQueryData { column_meta: [ColumnMeta { name: "affected_rows", type_: Int, bytes: 4 }], rows: [[Int(8)]] }

view source code

InfluxDB line protocol write

use libtaos::schemaless::*;
use libtaos::*;

fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = ["meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249",
"meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250"];
let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLISECONDS,
)
.unwrap();
println!("affected_rows={}", affected_rows);
}

// run with: cargo run --example influxdb_line_example

view source code

OpenTSDB Telnet line protocol write

use libtaos::schemaless::*;
use libtaos::*;

fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3",
"meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611249 219 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611250 218 location=California.SanFrancisco groupid=2",
"meters.voltage 1648432611249 221 location=California.LosAngeles groupid=3",
"meters.voltage 1648432611250 217 location=California.LosAngeles groupid=3",
];
let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
)
.unwrap();
println!("affected_rows={}", affected_rows); // affected_rows=8
}

// run with: cargo run --example opentsdb_telnet_example

view source code

OpenTSDB JSON line protocol write

use libtaos::schemaless::*;
use libtaos::*;

fn main() {
let taos = TaosCfg::default().connect().expect("fail to connect");
taos.raw_query("CREATE DATABASE test").unwrap();
taos.raw_query("USE test").unwrap();
let lines = [
r#"[{"metric": "meters.current", "timestamp": 1648432611249, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}},
{"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}},
{"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#,
];

let affected_rows = taos
.schemaless_insert(
&lines,
TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NOT_CONFIGURED,
)
.unwrap();
println!("affected_rows={}", affected_rows); // affected_rows=4
}

// run with: cargo run --example opentsdb_json_example

view source code

Query data

use libtaos::*;

fn taos_connect() -> Result<Taos, Error> {
TaosCfgBuilder::default()
.ip("localhost")
.user("root")
.pass("taosdata")
.db("power")
.port(6030u16)
.build()
.expect("TaosCfg builder error")
.connect()
}

#[tokio::main]
async fn main() -> Result<(), Error> {
let taos = taos_connect().expect("connect error");
let result = taos.query("SELECT ts, current FROM meters LIMIT 2").await?;
// print column names
let meta: Vec<ColumnMeta> = result.column_meta;
for column in meta {
print!("{}\t", column.name)
}
println!();
// print rows
let rows: Vec<Vec<Field>> = result.rows;
for row in rows {
for field in row {
print!("{}\t", field);
}
println!();
}
Ok(())
}

// output:
// ts current
// 2022-03-28 09:56:51.249 10.3
// 2022-03-28 09:56:51.749 12.6

view source code

More sample programs

Program PathProgram Description
demo.rsBasic API Usage Examples
bailongma-rsUsing TDengine as the Prometheus remote storage API adapter for the storage backend, using the r2d2 connection pool

API Reference

Connection constructor API

The Builder Pattern constructor pattern is Rust's solution for handling complex data types or optional configuration types. The libtaos implementation uses the connection constructor TaosCfgBuilder as the entry point for the TDengine Rust connector. The TaosCfgBuilder provides optional configuration of servers, ports, databases, usernames, passwords, etc.

Using the default() method, you can construct a TaosCfg with default parameters for subsequent connections to the database or establishing connection pools.

let cfg = TaosCfgBuilder::default().build()? ;

Using the constructor pattern, the user can set on-demand.

let cfg = TaosCfgBuilder::default()
.ip("127.0.0.1")
.user("root")
.pass("taosdata")
.db("log")
.port(6030u16)
.build()? ;

Create TDengine connection using TaosCfg object.

let conn: Taos = cfg.connect();

Connection pooling

In complex applications, we recommend enabling connection pools. Connection pool for libtaos is implemented using r2d2.

As follows, a connection pool with default parameters can be generated.

let pool = r2d2::Pool::new(cfg)? ;

You can set the same connection pool parameters using the connection pool's constructor.

    use std::time::Duration;
let pool = r2d2::Pool::builder()
.max_size(5000) // max connections
.max_lifetime(Some(Duration::from_minutes(100))) // lifetime of each connection
.min_idle(Some(1000)) // minimal idle connections
.connection_timeout(Duration::from_minutes(2))
.build(cfg);

In the application code, use pool.get()? to get a connection object Taos.

let taos = pool.get()? ;

The Taos structure is the connection manager in libtaos and provides two main APIs.

  1. exec: Execute some non-query SQL statements, such as CREATE, ALTER, INSERT, etc.

    taos.exec().await?
  2. query: Execute the query statement and return the TaosQueryData object.

    let q = taos.query("select * from log.logs").await?

    The TaosQueryData object stores the query result data and basic information about the returned columns (column name, type, length).

    Column information is stored using [ColumnMeta].

    let cols = &q.column_meta;
    for col in cols {
    println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes);
    }

    It fetches data line by line.

    for (i, row) in q.rows.iter().enumerate() {
    for (j, cell) in row.iter().enumerate() {
    println!("cell({}, {}) data: {}", i, j, cell);
    }
    }

Note that Rust asynchronous functions and an asynchronous runtime are required.

Taos provides a few Rust methods that encapsulate SQL to reduce the frequency of format! code blocks.

  • .describe(table: &str): Executes DESCRIBE and returns a Rust data structure.
  • .create_database(database: &str): Executes the CREATE DATABASE statement.
  • .use_database(database: &str): Executes the USE statement.

In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.

Bind Interface

Similar to the C interface, Rust provides the bind interface's wrapping. First, create a bind object Stmt for a SQL command from the Taos object.

let mut stmt: Stmt = taos.stmt("insert into ? values(? ,?)") ? ;

The bind object provides a set of interfaces for implementing parameter binding.

.set_tbname(tbname: impl ToCString)

To bind table names.

.set_tbname_tags(tbname: impl ToCString, tags: impl IntoParams)

Bind sub-table table names and tag values when the SQL statement uses a super table.

let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)") ? ;
// tags can be created with any supported type, here is an example using JSON
let v = Field::Json(serde_json::from_str("{\"tag1\":\"one, two, three, four, five, six, seven, eight, nine, ten\"}").unwrap());
stmt.set_tbname_tags("tb0", [&tag])? ;
.bind(params: impl IntoParams)

Bind value types. Use the Field structure to construct the desired type and bind.

let ts = Field::Timestamp(Timestamp::now());
let value = Field::Float(0.0);
stmt.bind(vec![ts, value].iter())? ;
.execute()

Execute SQL.Stmt objects can be reused, re-binded, and executed after execution.

stmt.execute()? ;

// next bind cycle.
// stmt.set_tbname()? ;
//stmt.bind()? ;
//stmt.execute()? ;

Line protocol interface

The line protocol interface supports multiple modes and different precision and requires the introduction of constants in the schemaless module to set.

use libtaos::*;
use libtaos::schemaless::*;
  • InfluxDB line protocol

    let lines = [
    "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"pass\",c2=false 1626006833639000000"
    "st,t1=abc,t2=def,t3=anything c1=3i64,c3=L\"abc\",c4=4f64 1626006833639000000"
    ];
    taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANOSECONDS)? ;
  • OpenTSDB Telnet Protocol

    let lines = ["sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0"];
    taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ;
  • OpenTSDB JSON protocol

    let lines = [r#"
    {
    "metric": "st",
    "timestamp": 1626006833,
    "value": 10,
    "tags": {
    "t1": true,
    "t2": false,
    "t3": 10,
    "t4": "123_abc_.! @#$%^&*:;,. /? |+-=()[]{}<>"
    }
    }"#];
    taos.schemaless_insert(&lines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_SECONDS)? ;

Please move to the Rust documentation hosting page for other related structure API usage instructions: https://docs.rs/libtaos.