Ingesting Data in Schemaless Mode
In IoT applications, to achieve functions such as automated management, business analysis, and device monitoring, it is often necessary to collect a large number of data items. However, due to reasons such as application logic upgrades and hardware adjustments of the devices themselves, the data collection items may change frequently. To address this challenge, TDengine provides a schemaless writing method, aimed at simplifying the data recording process.
With the schemaless writing method, users do not need to create supertables or subtables in advance, as TDengine will automatically create the corresponding storage structures based on the actual data written. Additionally, when necessary, the schemaless writing method can also automatically add necessary data columns or tag columns to ensure that the data written by users is correctly stored.
It is worth noting that the supertables and their corresponding subtables created through the schemaless writing method have no functional differences from those created directly through SQL. Users can still use SQL to write data directly into them. However, since the table names generated by the schemaless writing method are based on tag values according to a fixed mapping rule, these table names may lack readability and are not easy to understand.
When using the schemaless writing method, tables are created automatically, and manual creation of tables may lead to unknown errors.
Schemaless Writing Line Protocol
TDengine's schemaless writing line protocol is compatible with InfluxDB's line protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For the standard writing protocols of InfluxDB and OpenTSDB, please refer to their respective official documentation.
Below, we first introduce the protocol content extended by TDengine based on InfluxDB's line protocol. This protocol allows users to control the (supertable) schema in a more detailed manner. Using a string to express a data row, multiple rows of strings can be passed into the writing API at once to achieve batch writing of multiple data rows, with the format specified as follows.
measurement,tag_set field_set timestamp
The parameters are explained as follows.
- measurement is the table name, separated by a comma from tag_set.
- tag_set is formatted as
<tag_key>=<tag_value>, <tag_key>=<tag_value>
, representing tag column data, separated by commas, and separated by a space from field_set. - field_set is formatted as
<field_key>=<field_value>, <field_key>=<field_value>
, representing ordinary columns, also separated by commas, and separated by a space from timestamp. - timestamp is the primary key timestamp for this row of data.
- Schemaless writing does not support writing data for tables with a second primary key column.
All data in tag_set are automatically converted to nchar data type and do not need to use double quotes. In the schemaless writing line protocol, each data item in field_set needs to describe its own data type, with specific requirements as follows.
- If enclosed in double quotes, it represents varchar type, e.g., "abc".
- If enclosed in double quotes and prefixed with L or l, it represents nchar type, e.g., L" error message ".
- If enclosed in double quotes and prefixed with G or g, it represents geometry type, e.g., G"Point(4.343 89.342)".
- If enclosed in double quotes and prefixed with B or b, it represents varbinary type, the double quotes can contain hexadecimal starting with \x or strings, e.g., B"\x98f46e" and B"hello".
- For spaces, equal signs (=), commas (,), double quotes ("), and backslashes (), a backslash () is needed for escaping (all in half-width English characters). The domain escape rules for the schemaless writing protocol are shown in the following table.
Number | Field | Characters to Escape |
---|---|---|
1 | Supertable name | comma, space |
2 | Tag name | comma, equal sign, space |
3 | Tag value | comma, equal sign, space |
4 | Column name | comma, equal sign, space |
5 | Column value | double quotes, backslash |
If two consecutive backslashes are used, the first backslash acts as an escape character; if there is only one backslash, no escape is needed. The backslash escape rules for the schemaless writing protocol are shown in the following table.
Number | Backslash | Escapes to |
---|---|---|
1 | \ | \ |
2 | \\ | \ |
3 | \\\ | \\ |
4 | \\\\ | \\ |
5 | \\\\\ | \\\ |
6 | \\\\\\ | \\\ |
Numeric types are distinguished by suffixes. The escape rules for numeric types in the schema-less write protocol are shown in the following table.
Number | Suffix | Mapped Type | Size (Bytes) |
---|---|---|---|
1 | None or f64 | double | 8 |
2 | f32 | float | 4 |
3 | i8/u8 | TinyInt/UTinyInt | 1 |
4 | i16/u16 | SmallInt/USmallInt | 2 |
5 | i32/u32 | Int/UInt | 4 |
6 | i64/i/u64/u | BigInt/BigInt/UBigInt/UBigInt | 8 |
- t, T, true, True, TRUE, f, F, false, False will be directly treated as BOOL type.
For example, the following data line indicates: under the supertable named st, a subtable with tags t1 as "3" (NCHAR), t2 as "4" (NCHAR), t3 as "t3" (NCHAR), writing a row of data with column c1 as 3 (BIGINT), c2 as false (BOOL), c3 as "passit" (BINARY), c4 as 4 (DOUBLE), and the primary timestamp as 1626006833639000000.
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
Note that if there is a case error in describing the data type suffix or the data type specified for the data is incorrect, it may trigger an error message and cause data writing to fail.
TDengine provides idempotence for data writing, meaning you can repeatedly call the API to write data that failed previously. However, it does not provide atomicity for writing multiple rows of data. That is, during the batch writing process of multiple rows of data, some data may be written successfully while others may fail.
Schema-less Write Handling Rules
Schema-less writes handle row data according to the following principles:
-
The subtable name is generated using the following rules: first, combine the measurement name with the tag's key and value into the following string:
"measurement,tag_key1=tag_value1,tag_key2=tag_value2"
-
Note that tag_key1, tag_key2 are not in the original order entered by the user, but are sorted in ascending order by tag name. Therefore, tag_key1 is not the first tag entered in the line protocol. After sorting, calculate the MD5 hash value "md5_val" of this string. Then combine the calculated result with the string to generate the table name: "t_md5_val". The "t_" is a fixed prefix, and each table automatically generated through this mapping relationship has this prefix.
-
If you do not want to use the automatically generated table name, there are two ways to specify the subtable name (the first method has higher priority).
- By configuring the smlAutoChildTableNameDelimiter parameter in taos.cfg (excluding
@ # space CR LF tab
).- For example: configure smlAutoChildTableNameDelimiter=- and insert data as st,t0=cpu1,t1=4 c1=3 1626006833639000000, the created table name would be cpu1-4.
- By configuring the smlChildTableName parameter in taos.cfg.
- For example: configure smlChildTableName=tname and insert data as st,tname=cpu1,t1=4 c1=3 1626006833639000000, the created table name would be cpu1. Note that if multiple rows of data have the same tname but different tag_sets, the tag_set specified during the first automatic table creation is used, and other rows will ignore it.
-
-
If the supertable obtained from parsing the line protocol does not exist, it will be created (it is not recommended to manually create supertables, otherwise data insertion may be abnormal).
-
If the subtable obtained from parsing the line protocol does not exist, Schemaless will create the subtable according to the subtable name determined in step 1 or 2.
-
If the tag columns or regular columns specified in the data row do not exist, they will be added to the supertable (only additions, no deletions).
-
If some tag columns or regular columns exist in the supertable but are not specified in a data row, their values will be set to NULL in that row.
-
For BINARY or NCHAR columns, if the length of the values provided in the data row exceeds the limit of the column type, the maximum character storage limit of the column will be automatically increased (only additions, no deletions) to ensure the complete storage of data.
-
Errors encountered during the entire processing process will interrupt the writing process and return an error code.
-
To improve writing efficiency, it is assumed by default that the order of the field_set in the same supertable is the same (the first data contains all fields, and subsequent data follow this order). If the order is different, configure the smlDataFormat parameter to false. Otherwise, data will be written in the same order, and the data in the database will be abnormal. Starting from version 3.0.3.0, it automatically checks whether the order is consistent, and this configuration is deprecated.
-
Since SQL table creation does not support dots (.), Schemaless also processes dots (.) in automatically created table names, replacing them with underscores (). If the subtable name is manually specified and contains a dot (.), it will also be converted to an underscore ().
-
taos.cfg adds the smlTsDefaultName configuration (value as a string), which only works on the client side. After configuration, the time column name for Schemaless automatic table creation can be set through this configuration. If not configured, the default is _ts.
-
The supertable or subtable names in schema-less writing are case-sensitive.
-
Schema-less writing still follows TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48KB (from version 3.0.5.0 it is 64KB), and the total length of tag values cannot exceed 16KB.
Time Resolution Recognition
Schema-less writing supports three specified modes, as shown in the table below:
Number | Value | Description |
---|---|---|
1 | SML_LINE_PROTOCOL | InfluxDB Line Protocol |
2 | SML_TELNET_PROTOCOL | OpenTSDB Text Line Protocol |
3 | SML_JSON_PROTOCOL | JSON Protocol Format |
In the SML_LINE_PROTOCOL parsing mode, users need to specify the time resolution of the input timestamp. The available time resolutions are as follows:
Number | Time Resolution Definition | Meaning |
---|---|---|
1 | TSDB_SML_TIMESTAMP_NOT_CONFIGURED | Undefined (invalid) |
2 | TSDB_SML_TIMESTAMP_HOURS | Hours |
3 | TSDB_SML_TIMESTAMP_MINUTES | Minutes |
4 | TSDB_SML_TIMESTAMP_SECONDS | Seconds |
5 | TSDB_SML_TIMESTAMP_MILLI_SECONDS | Milliseconds |
6 | TSDB_SML_TIMESTAMP_MICRO_SECONDS | Microseconds |
7 | TSDB_SML_TIMESTAMP_NANO_SECONDS | Nanoseconds |
In the SML_TELNET_PROTOCOL and SML_JSON_PROTOCOL modes, the time precision is determined by the length of the timestamp (consistent with the standard operation of OpenTSDB), and the user-specified time resolution will be ignored.
Data Mode Mapping Rules
Data from the InfluxDB line protocol will be mapped to schema-based data, where the measurement maps to the supertable name, tag names in the tag_set map to tag names in the data schema, and names in the field_set map to column names. For example, the following data.
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000
This line of data maps to create a supertable: st, which includes 3 tags of type nchar: t1, t2, t3. Five data columns, namely ts (timestamp), c1 (bigint), c3 (binary), c2 (bool), c4 (bigint). Mapped into the following SQL statement:
create stable st (_ts timestamp, c1 bigint, c2 bool, c3 binary(6), c4 bigint) tags(t1 nchar(1), t2 nchar(1), t3 nchar(2))
Data Mode Change Handling
This section will explain the impact on the data schema under different line data writing scenarios.
When using line protocol to write a field type with a clear identifier, subsequent changes to the field type definition will result in a clear data schema error, triggering the write API to report an error. As shown below,
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4i 1626006833640000000
The data type mapping of the first line defines the c4 column as Double, but the second line declares the column as BigInt through a numeric suffix, thus triggering a parsing error in schema-less writing.
If the line protocol in the previous rows declares a data column as binary, and subsequent requirements for a longer binary length, this will trigger a change in the supertable schema.
st,t1=3,t2=4,t3=t3 c1=3i64,c5="pass" 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c5="passit" 1626006833640000000
The line protocol parsing in the first line declares that column c5 is a binary(4) field. The second line of data writing extracts that column c5 is still a binary column, but its width is 6. At this point, the width of the binary needs to be increased to accommodate the new string.
st,t1=3,t2=4,t3=t3 c1=3i64 1626006833639000000
st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000
The second line of data adds a column c6 relative to the first line, with a type of binary(6). Thus, a column c6, type binary(6), will be automatically added.
Schemaless Writing Example
Below, using smart meters as an example, we introduce code samples for writing data using the schemaless writing interface with various language connectors. This includes three protocols: InfluxDB's line protocol, OpenTSDB's TELNET line protocol, and OpenTSDB's JSON format protocol.
- Since the rules for automatic table creation with schemaless writing differ from those in the previous SQL examples, please ensure that the
meters
,metric_telnet
, andmetric_json
tables do not exist before running the code samples. - OpenTSDB's TELNET line protocol and OpenTSDB's JSON format protocol only support one data column, so we have used other examples.
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-WS://" + host + ":6041?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(url)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
Execute schemaless writing with reqId, where the last parameter reqId can be used for request link tracing.
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taosws
host = "localhost"
port = 6041
def prepare():
conn = None
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
# create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
assert rowsAffected == 0
except Exception as err:
print(f"Failed to create db and table, db addrr:{host}:{port} ; ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
def schemaless_insert():
conn = None
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port,
database='power')
conn.schemaless_insert(
lines = lineDemo,
protocol = taosws.PySchemalessProtocol.Line,
precision = taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
if __name__ == "__main__":
prepare()
schemaless_insert()
package main
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/taosdata/driver-go/v3/common"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/schemaless"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
taosDSN := fmt.Sprintf("root:taosdata@ws(%s:6041)/", host)
db, err := sql.Open("taosWS", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041", 1,
schemaless.SetDb("power"),
schemaless.SetReadTimeout(10*time.Second),
schemaless.SetWriteTimeout(10*time.Second),
schemaless.SetUser("root"),
schemaless.SetPassword("taosdata"),
))
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = s.Insert(lineDemo, schemaless.InfluxDBLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet line protocol
err = s.Insert(telnetDemo, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json format protocol
err = s.Insert(jsonDemo, schemaless.OpenTSDBJsonFormatProtocol, "s", 0, common.GetReqID())
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data: " + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("ws://{}:6041/power", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
const taos = require("@tdengine/websocket");
let influxdbData = ["meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"];
let jsonData = ["{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"]
let telnetData = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"];
async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}
async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
console.log("Inserted data with schemaless successfully.")
}
catch (err) {
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"protocol=WebSocket;host={host};port=6041;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[] { lineDemo }, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[] { telnetDemo }, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new[] { jsonDemo }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
Console.WriteLine("Inserted data with schemaless successfully.");
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage: " + e.Message);
throw;
}
}
int code = 0;
char *dsn = "ws://localhost:6041";
// connect
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
return -1;
}
// create database
WS_RES *result = ws_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// use database
result = ws_query(taos, "USE power");
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, ws_errstr(result));
ws_close(taos);
return -1;
}
ws_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
int totalLines = 0;
result = ws_schemaless_insert_raw(taos, line_demo, strlen(line_demo), &totalLines, WS_TSDB_SML_LINE_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb telnet protocol
totalLines = 0;
result = ws_schemaless_insert_raw(taos, telnet_demo, strlen(telnet_demo), &totalLines, WS_TSDB_SML_TELNET_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", totalLines);
ws_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
totalLines = 0;
result = ws_schemaless_insert_raw(taos, json_demo, strlen(json_demo), &totalLines, WS_TSDB_SML_JSON_PROTOCOL,
WS_TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = ws_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo,
code, ws_errstr(result));
ws_close(taos);
return -1;
}
free(jsons[0]);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", totalLines);
ws_free_result(result);
// close & clean
ws_close(taos);
return 0;
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
Execute schemaless writing with reqId, where the last parameter reqId can be used for request link tracing.
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
import taos
lineDemo = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
host = "localhost"
port = 6030
try:
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port
)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
# change database. same as execute "USE db"
conn.select_db("power")
conn.schemaless_insert(
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
conn.schemaless_insert(
telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
)
conn.schemaless_insert(
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
)
print("Inserted data with schemaless successfully.");
except Exception as err:
print(f"Failed to insert data with schemaless, ErrMessage:{err}")
raise err
finally:
if conn:
conn.close()
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
func main() {
host := "127.0.0.1"
lineDemo := "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
conn, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
}
defer conn.Close()
_, err = conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = conn.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
// insert influxdb line protocol
err = conn.InfluxDBInsertLines([]string{lineDemo}, "ms")
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + lineDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb telnet protocol
err = conn.OpenTSDBInsertTelnetLines([]string{telnetDemo})
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + telnetDemo + ", ErrMessage: " + err.Error())
}
// insert opentsdb json protocol
err = conn.OpenTSDBInsertJsonPayload(jsonDemo)
if err != nil {
log.Fatalln("Failed to insert data with schemaless, data:" + jsonDemo + ", ErrMessage: " + err.Error())
}
fmt.Println("Inserted data with schemaless successfully.")
}
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use taos::AsyncQueryable;
use taos::AsyncTBuilder;
use taos::TaosBuilder;
use taos::taos_query;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let host = "localhost";
let dsn = format!("taos://{}:6030", host);
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Telnet
let data = [
"metric_telnet 1707095283260 4 host=host0 interface=eth0",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
// SchemalessProtocol::Json
let data = [
r#"[{
"metric": "metric_json",
"timestamp": 1626846400,
"value": 10.3,
"tags": {
"groupid": 2,
"location": "California.SanFrancisco",
"id": "d1001"
}
}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
match client.put(&sml_data).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to insert data with schemaless, data:{:?}, ErrMessage: {}", data, err);
return Err(err.into());
}
}
println!("Inserted data with schemaless successfully.");
Ok(())
}
Not supported
public static void Main(string[] args)
{
var host = "127.0.0.1";
var lineDemo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
var telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
var jsonDemo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
try
{
var builder =
new ConnectionStringBuilder(
$"host={host};port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// insert influx line protocol data
client.SchemalessInsert(new[]{lineDemo}, TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert opentsdb telnet protocol data
client.SchemalessInsert(new[]{telnetDemo}, TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
// insert json data
client.SchemalessInsert(new []{jsonDemo}, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert data with schemaless, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert data with schemaless, ErrMessage:" + e.Message);
throw;
}
}
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
int code = 0;
// connect
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL),
taos_errstr(NULL));
taos_cleanup();
return -1;
}
// create database
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to create database power, ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// use database
result = taos_query(taos, "USE power");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to execute use power, ErrCode: 0x%x, ErrMessage: %s\n.", code, taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
taos_free_result(result);
// schemaless demo data
char *line_demo =
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 "
"1626006833639";
char *telnet_demo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
char *json_demo =
"{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, "
"\"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
// influxdb line protocol
char *lines[] = {line_demo};
result = taos_schemaless_insert(taos, lines, 1, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless line data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", line_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
int rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless line data successfully.\n", rows);
taos_free_result(result);
// opentsdb telnet protocol
char *telnets[] = {telnet_demo};
result = taos_schemaless_insert(taos, telnets, 1, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "Failed to insert schemaless telnet data, data: %s, ErrCode: 0x%x, ErrMessage: %s\n.", telnet_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless telnet data successfully.\n", rows);
taos_free_result(result);
// opentsdb json protocol
char *jsons[1] = {0};
// allocate memory for json data. can not use static memory.
size_t size = 1024;
jsons[0] = malloc(size);
if (jsons[0] == NULL) {
fprintf(stderr, "Failed to allocate memory: %zu bytes.\n", size);
taos_close(taos);
taos_cleanup();
return -1;
}
(void)strncpy(jsons[0], json_demo, 1023);
result = taos_schemaless_insert(taos, jsons, 1, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
code = taos_errno(result);
if (code != 0) {
free(jsons[0]);
fprintf(stderr, "Failed to insert schemaless json data, Server: %s, ErrCode: 0x%x, ErrMessage: %s\n.", json_demo, code,
taos_errstr(result));
taos_close(taos);
taos_cleanup();
return -1;
}
free(jsons[0]);
rows = taos_affected_rows(result);
fprintf(stdout, "Insert %d rows of schemaless json data successfully.\n", rows);
taos_free_result(result);
// close & clean
taos_close(taos);
taos_cleanup();
return 0;
Not supported
Querying the Written Data
By running the example code from the previous section, tables will be automatically created in the power database. We can query the data using taos shell or an application. Below is an example of querying the data from the supertable and meters table using taos shell.
taos> show power.stables;
stable_name |
=================================
meter_current |
stb0_0 |
meters |
Query OK, 3 row(s) in set (0.002527s)
taos> select * from power.meters limit 1 \G;
*************************** 1.row ***************************
_ts: 2021-07-11 20:33:53.639
current: 10.300000199999999
voltage: 219
phase: 0.310000000000000
groupid: 2
location: California.SanFrancisco
Query OK, 1 row(s) in set (0.004501s)