Ingesting Data in Schemaless Mode
In Internet of Things (IoT) applications, it is often necessary to collect a large number of data points to achieve various functionalities such as automated management, business analysis, and device monitoring. However, due to reasons like version upgrades of application logic and adjustments in the hardware of devices, the data collection items may change frequently. To address this challenge, TDengine provides a schemaless writing mode aimed at simplifying the data recording process.
By using schemaless writing, users do not need to pre-create supertables or subtables, as TDengine automatically creates the corresponding storage structure based on the actual data written. Additionally, when necessary, schemaless writing can also automatically add required data columns or tag columns to ensure that the data written by users can be stored correctly.
It is worth noting that supertables and their corresponding subtables created through schemaless writing are functionally indistinguishable from those created directly via SQL. Users can still use SQL to write data into these tables directly. However, the table names generated through schemaless writing are based on the tag values and are created according to a fixed mapping rule, which may result in names that lack readability and are not easy to understand.
When using schemaless writing, tables are automatically created, and there is no need to manually create tables. Manual table creation may result in unknown errors.
Schemaless Writing Line Protocol
TDengine's schemaless writing line protocol is compatible with InfluxDB's line protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For the standard writing protocols of InfluxDB and OpenTSDB, please refer to their respective official documentation.
The following introduces the protocol based on InfluxDB's line protocol, along with the extensions made by TDengine. This protocol allows users to control (supertable) schemas in a more granular way. A string can express a data row, and multiple rows can be passed to the writing API at once as multiple strings. The format is specified as follows.
measurement,tag_set field_set timestamp
The parameters are described as follows:
- measurement is the table name, separated from tag_set by a comma.
- tag_set has the format
<tag_key>=<tag_value>, <tag_key>=<tag_value>
, indicating the tag column data, separated by commas and space from field_set. - field_set has the format
<field_key>=<field_value>, <field_key>=<field_value>
, indicating the ordinary columns, also separated by commas and space from the timestamp. - timestamp is the primary key timestamp corresponding to this data row.
- Schemaless writing does not support writing data to tables with a second primary key column.
All data in tag_set is automatically converted to the nchar data type and does not require double quotes. In the schemaless writing data row protocol, each data item in field_set needs to describe its own data type with specific requirements as follows:
- If surrounded by double quotes, it indicates varchar type, e.g., "abc".
- If surrounded by double quotes and prefixed with L or l, it indicates nchar type, e.g., L" error message ".
- If surrounded by double quotes and prefixed with G or g, it indicates geometry type, e.g., G"Point(4.343 89.342)".
- If surrounded by double quotes and prefixed with B or b, it indicates varbinary type, where the quoted string can start with \x for hexadecimal or be a regular string, e.g., B"\x98f46e" and B"hello".
- For characters like spaces, equals sign (=), commas (,), double quotes ("), and backslashes (), the preceding backslash () must be used for escaping (all are in English half-width symbols). The domain escaping rules for the schemaless writing protocol are shown in the following table.
No. | Domain | Need to Escape Characters |
---|---|---|
1 | Supertable Name | Comma, space |
2 | Tag Name | Comma, equals sign, space |
3 | Tag Value | Comma, equals sign, space |
4 | Column Name | Comma, equals sign, space |
5 | Column Value | Double quotes, backslash |
If two consecutive backslashes are used, the first backslash acts as an escape character; if only one backslash is used, it does not need to be escaped. The backslash escape rules for the schemaless writing protocol are shown in the following table.
No. | Backslash | Escaped as |
---|---|---|
1 | \ | \ |
2 | \\ | \ |
3 | \\\ | \\ |
4 | \\\\ | \\ |
5 | \\\\\ | \\\ |
6 | \\\\\\ | \\\ |
Numerical types will be distinguished by suffixes. The numerical type escape rules for the schemaless writing protocol are shown in the following table.
No. | Suffix | Mapped Type | Size (bytes) |
---|---|---|---|
1 | None or f64 | double | 8 |
2 | f32 | float | 4 |
3 | i8/u8 | TinyInt/UTinyInt | 1 |
4 | i16/u16 | SmallInt/USmallInt | 2 |
5 | i32/u32 | Int/UInt | 4 |
6 | i64/i/u64/u | BigInt/BigInt/UBigInt/UBigInt | 8 |
- t, T, true, True, TRUE, f, F, false, False will be treated directly as BOOL type.
For example, the following data row represents: writing to a subtable under the supertable named st
, with tags t1 as "3" (NCHAR), t2 as "4" (NCHAR), and t3 as "t3" (NCHAR), writing column c1 as 3 (BIGINT), column c2 as false (BOOL), column c3 as "passit" (BINARY), and column c4 as 4 (DOUBLE), with a primary key timestamp of 1626006833639000000.
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
It is important to note that errors in case sensitivity when specifying data type suffixes, or incorrect specifications for data types, can lead to error messages and cause data writing to fail.
TDengine provides idempotency guarantees for data writing, meaning you can repeatedly call the API to write erroneous data. However, it does not provide atomicity guarantees for writing multiple rows of data. This means that during a batch write process, some data may be successfully written while other data may fail.
Schemaless Writing Processing Rules
Schemaless writing processes row data according to the following principles:
-
The following rules will be used to generate subtable names: first, combine the measurement name and the tag key and value into the following string.
"measurement,tag_key1=tag_value1,tag_key2=tag_value2"
-
It should be noted that tag_key1, tag_key2 here are not in the original order of user-input tags, but are arranged in alphabetical order based on tag names. Therefore, tag_key1 is not the first tag input in the row protocol. After arrangement, the MD5 hash value "md5_val" of the string is calculated. Then, the result is combined with the string to generate the table name: "t_md5_val". The prefix "t_" is fixed, and every table generated by this mapping relationship has this prefix.
-
If you do not want to use the automatically generated table name, there are two ways to specify subtable names (the first one has a higher priority).
- Specify it by configuring the
smlAutoChildTableNameDelimiter
parameter intaos.cfg
(excluding@ # space carriage return newline tab
).- For example: if configured
smlAutoChildTableNameDelimiter=-
and the data inserted isst,t0=cpu1,t1=4 c1=3 1626006833639000000
, the created table name will becpu1-4
.
- For example: if configured
- Specify it by configuring the
smlChildTableName
parameter intaos.cfg
.- For example: if configured
smlChildTableName=tname
and the data inserted isst,tname=cpu1,t1=4 c1=3 1626006833639000000
, the created table name will becpu1
. Note that if thetname
is the same for multiple rows of data but with different tag sets, only the first row with the automatically created table will use the specified tag set; the others will be ignored.
- For example: if configured
-
-
If the supertable obtained from parsing the row protocol does not exist, it will be created (it is not recommended to manually create supertables; otherwise, data insertion may be abnormal).
-
If the subtable obtained from parsing the row protocol does not exist, Schemaless will create the subtable according to the names determined in steps 1 or 2.
-
If the tag columns or ordinary columns specified in the data row do not exist, the corresponding tag columns or ordinary columns will be added to the supertable (only additions are allowed).
-
If some tag columns or ordinary columns exist in the supertable but are not specified with values in a data row, those columns will be set to NULL in that row.
-
For BINARY or NCHAR columns, if the provided value's length exceeds the column type limit, the allowable character length of that column will be automatically increased (only increments are allowed) to ensure complete data storage.
-
Any errors encountered during the entire process will interrupt the writing process and return an error code.
-
To improve writing efficiency, it is assumed by default that the order of field_set in the same supertable is the same (the first data contains all fields, and the subsequent data follows this order). If the order differs, you need to configure the parameter
smlDataFormat
as false; otherwise, data will be written in the same order, and the data in the library will be abnormal. Starting from version 3.0.3.0, automatic detection of order consistency is performed, and this configuration is deprecated. -
Since SQL table names do not support dots (.), schemaless writing also handles dots (.). If the table name generated by schemaless writing contains a dot (.), it will be automatically replaced with an underscore (). If a subtable name is specified manually, dots (.) in the name will also be converted to underscores ().
-
The
taos.cfg
configuration file has added thesmlTsDefaultName
configuration (value as a string), which only takes effect on the client side. After configuration, the timestamp column name of the automatically created supertable can be set through this configuration. If not configured, it defaults to_ts
. -
The names of supertables or subtables created through schemaless writing are case-sensitive.
-
Schemaless writing still adheres to TDengine's underlying limitations on data structures, such as the total length of each row of data not exceeding 48KB (64KB from version 3.0.5.0), and the total length of tag values not exceeding 16KB.
Time Resolution Identification
Schemaless writing supports three specified modes, as shown in the table below:
No. | Value | Description |
---|---|---|
1 | SML_LINE_PROTOCOL | InfluxDB Line Protocol |
2 | SML_TELNET_PROTOCOL | OpenTSDB Telnet Protocol |
3 | SML_JSON_PROTOCOL | JSON Format Protocol |
In SML_LINE_PROTOCOL parsing mode, users need to specify the time resolution of the input timestamps. Available time resolutions are as follows:
No. | Time Resolution Definition | Meaning |
---|---|---|
1 | TSDB_SML_TIMESTAMP_NOT_CONFIGURED | Undefined (Invalid) |
2 | TSDB_SML_TIMESTAMP_HOURS | Hours |
3 | TSDB_SML_TIMESTAMP_MINUTES | Minutes |
4 | TSDB_SML_TIMESTAMP_SECONDS | Seconds |
5 | TSDB_SML_TIMESTAMP_MILLI_SECONDS | Milliseconds |
6 | TSDB_SML_TIMESTAMP_MICRO_SECONDS | Microseconds |
7 | TSDB_SML_TIMESTAMP_NANO_SECONDS | Nanoseconds |
In SML_TELNET_PROTOCOL and SML_JSON_PROTOCOL modes, the time precision is determined by the length of the timestamps (this is the same as the standard operation method for OpenTSDB), and the user-specified time resolution will be ignored.
Data Mode Mapping Rules
The data from InfluxDB line protocol will be mapped to a structured format, where the measurement maps to the supertable name, the tag names in tag_set map to the tag names in the data structure, and the names in field_set map to column names. For example, the following data:
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
This data row generates a supertable named st
, which includes three nchar-type tags: t1
, t2
, t3
, and five data columns: ts
(timestamp), c1
(bigint), c3
(binary), c2
(bool), c4
(bigint). It maps to the following SQL statement:
create stable st (_ts timestamp, c1 bigint, c2 bool, c3 binary(6), c4 bigint) tags(t1 nchar(1), t2 nchar(1), t3 nchar(2))
Data Mode Change Handling
This section explains the impact on data mode under different row data writing conditions.
When writing with a clearly identified field type using row protocol, changing the field type definition later will result in a clear data mode error, triggering the writing API to report an error. As shown below,
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4i 1626006833640000000
The first row maps the data type of column c4
to Double, but the second row specifies this column as BigInt through the numerical suffix, triggering a parsing error in schemaless writing.
If a row protocol indicates that a column is binary, but subsequent rows require a longer binary length, this will trigger a change in the supertable mode.
st,t1=3,t2=4,t3=t3 c1=3i64,c5="pass" 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c5="passit" 1626006833640000000
In the first row, the row protocol parsing declares c5
as a binary(4) field, and the second row will extract c5
as a binary column, but its width will be increased to accommodate the new string width.
st,t1=3,t2=4,t3=t3 c1=3i64 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000
In the second row, compared to the first, an additional column c6
of type binary(6) has been added. Therefore, a new column c6
of type binary(6) will be automatically added.
Example of Schemaless Writing
Using the smart meter as an example, here are code samples demonstrating how various language connectors use the schemaless writing interface to write data, covering three protocols: InfluxDB's line protocol, OpenTSDB's TELNET line protocol, and OpenTSDB's JSON format protocol.
- Since the automatic table creation rules for schemaless writing differ from those in previous SQL example sections, please ensure that the
meters
,metric_telnet
, andmetric_json
tables do not exist before running the code samples. - The TELNET line protocol and JSON format protocol of OpenTSDB only support a single data column, so other examples have been used.
Websocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-WS://" + host + ":6041?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(url)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
Execute schemaless writing with reqId, the last parameter reqId can be used for request tracing.
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taosws
host = "localhost"
port = 6041
def prepare():
conn = None
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
assert rowsAffected == 0
except Exception as err:
print(f"Failed to create db and table, db addrr:{host}:{port} ; ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
def schemaless_insert():
conn = None
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port,
database='power')
conn.schemaless_insert(
lines = lineDemo,
protocol = taosws.PySchemalessProtocol.Line,
precision = taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
if __name__ == "__main__":
prepare()
schemaless_insert()
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/common"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/schemaless"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
taosDSN := fmt.Sprintf("root:taosdata@ws(%s:6041)/", host)
db, err := sql.Open("taosWS", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041", 1,
schemaless.SetDb("power"),
schemaless.SetReadTimeout(10*time.Second),
schemaless.SetWriteTimeout(10*time.Second),
schemaless.SetUser("root"),
schemaless.SetPassword("taosdata"),
))
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = s.Insert(lineDemo, schemaless.InfluxDBLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet line protocol
err = s.Insert(telnetDemo, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json format protocol
err = s.Insert(jsonDemo, schemaless.OpenTSDBJsonFormatProtocol, "s", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("ws://{}:6041/power", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
const taos = require("@tdengine/websocket");
let influxdbData = ["meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"];
let jsonData = ["{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"]
let telnetData = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"];
async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}
async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
console.log("Inserted data with schemaless successfully.")
}
catch (err) {
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"protocol=WebSocket;host={host};port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[] { lineDemo }, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[] { telnetDemo }, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new[] { jsonDemo }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
Console.WriteLine("Inserted data with schemaless successfully.");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage: " + e.Message);
throw;
}
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// use database
result = ws_query(taos, "USE power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
int totalLines = 0;
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb telnet protocol
totalLines = 0;
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
totalLines = 0;
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
free(jsons[0]);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
Execute schemaless writing with reqId, the last parameter reqId can be used for request tracing.
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taos
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
host = "localhost"
port = 6030
try:
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port
)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
# change database. same as execute "USE db"
conn.select_db("power")
conn.schemaless_insert(
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
conn.schemaless_insert(
telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
)
conn.schemaless_insert(
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
conn, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer conn.Close()
_, err = conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = conn.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = conn.InfluxDBInsertLines([]string{lineDemo}, "ms")
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet protocol
err = conn.OpenTSDBInsertTelnetLines([]string{telnetDemo})
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json protocol
err = conn.OpenTSDBInsertJsonPayload(jsonDemo)
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("taos://{}:6030", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
Not supported
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"host={host};port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[]{lineDemo}, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[]{telnetDemo}, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new []{jsonDemo}, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage:" + e.Message);
throw;
}
}
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// use database
result = taos_query(taos, "USE power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
int rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result);
// opentsdb telnet protocol
char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) {
fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos);
taos_cleanup();
return -1;
}
(void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
free(jsons[0]);
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
Not supported
Querying Written Data
Running the code samples from the previous section will automatically create tables in the power database. We can query the data through the taos shell or application. Below are examples of querying supertables and the meters table data using the taos shell.
taos> show power.stables;
stable_name |
=================================
meter_current |
stb0_0 |
meters |
Query OK, 3 row(s) in set (0.002527s)
taos> select * from power.meters limit 1 \G;
*************************** 1.row ***************************
_ts: 2021-07-11 20:33:53.639
current: 10.300000199999999
voltage: 219
phase: 0.310000000000000
groupid: 2
location: California.SanFrancisco
Query OK, 1 row(s) in set (0.004501s)