Skip to main content

TDengine Node.js Client Library

@tdengine/websocket is the official Node.js client library for TDengine. Node.js developers can develop applications to access the TDengine instance data.

The source code for the Node.js client library is hosted on GitHub.

Connection types

Node.js connector supports only websocket connection through taosAdapter.

For a detailed introduction of the connection types, please refer to: Establish Connection

Supported platforms

Node.js client library needs to be run with Node.js 14 or higher version.

Recent update logs

Node.js connector versionmajor changesTDengine 版本
3.1.0new version, supports websocket3.2.0.0 or later

Supported features

  1. Connection Management
  2. General Query
  3. Continuous Query
  4. Parameter Binding
  5. Subscription
  6. Schemaless

Handling exceptions

After an error is reported, the error message and error code can be obtained through try catch. The Node.js client library error code is between 100 and 110, while the other error codes are for the TDengine function module.

Please refer to the table below for error code, error description and corresponding suggestions.

Error CodeDescriptionSuggested Actions
100invalid variablesThe parameter is invalid. Check the interface specification and adjust the parameter type and size.
101invalid urlURL error, please check if the url is correct.
102received server data but did not find a callback for processingClient waiting timeout, please check network and TaosAdapter status.
103invalid message typePlease check if the client version and server version match.
104connection creation failedConnection creation failed. Please check the network and TaosAdapter status.
105websocket request timeoutIncrease the execution time by adding the messageWaitTimeout parameter, or check the connection to the TaosAdapter.
106authentication failAuthentication failed, please check if the username and password are correct.
107unknown sql type in tdengineCheck the data type supported by TDengine.
108connection has been closedThe connection has been closed, check the connection status, or recreate the connection to execute the relevant instructions.
109fetch block data parse failPlease check if the client version and server version match.
110websocket connection has reached its maximum limitPlease check if the connection has been closed after use

Data Type Mapping

The table below despicts the mapping between TDengine data type and Node.js data type.

TDengine Data TypeNode.js Data Type
TIMESTAMPbigint
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTbigint
TINYINT UNSIGNEDnumber
SMALLINT UNSIGNEDnumber
INT UNSIGNEDnumber
BIGINT UNSIGNEDbigint
FLOATnumber
DOUBLEnumber
BOOLboolean
BINARYstring
NCHARstring
JSONstring
VARBINARYArrayBuffer
GEOMETRYArrayBuffer

Note: Only TAG supports JSON types

Installation Steps

Pre-installation preparation

Install Node.js client library via npm

npm install @tdengine/websocket

Verify

After installing the TDengine client, use the nodejsChecker.js program to verify that the current environment supports Node.js access to TDengine.

Verification in details:

  • Create an installation test folder such as ~/tdengine-test. Download the nodejsChecker.js to your local machine.

  • Execute the following command from the command-line.

  npm init -y
npm install @tdengine/websocket
node nodejsChecker.js host=localhost
  • After executing the above steps, the command-line will output the result of nodejsChecker.js connecting to the TDengine instance and performing a simple insert and query.

Establishing a connection

Install and import the @tdengine/websocket package.

Note: After using the Node.js client library, it is necessary to call taos.destroy() Release connector resources.

const taos = require("@tdengine/websocket");

//database operations......

taos.destroy();
WSConfig configures the Websocket parameters as follows:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
async function createConnect() {
let dsn = 'ws://localhost:6041';
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
return await taos.sqlConnect(conf);
}

view source code

Usage examples

Create database and tables

async function createDbAndTable(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');

await wsSql.exec('USE power');

await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');

taosResult = await wsSql.exec('describe meters');
console.log(taosResult);
} catch (err) {

console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}

}

view source code

Note: If you do not use USE power to specify the database, all subsequent operations on the table need to add the database name as a prefix, such as power.meters.

Insert data

async function insertData(wsSql) {
let wsSql = null;
try {
wsSql = await createConnect();
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
taosResult = await wsSql.exec(insertQuery);
console.log(taosResult);
} catch (err) {
console.error(err.code, err.message);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}

view source code

NOW is an internal function. The default is the current time of the client's computer. NOW + 1s represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years).

Querying data

async function queryData() {
let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('select * from meters');
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

view source code

Discovered data structure

wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]

Execute SQL with reqId

The reqId is very similar to TraceID in distributed tracing systems. In a distributed system, a request may need to pass through multiple services or modules to be completed. The reqId is used to identify and associate all related operations of this request, allowing us to track and understand the complete execution path of the request. Here are some primary usage of reqId:

  • Request Tracing: By associating the same reqId with all related operations of a request, we can trace the complete path of the request within the system.
  • Performance Analysis: By analyzing a request's reqId, we can understand the processing time of the request across various services or modules, thereby identifying performance bottlenecks.
  • Fault Diagnosis: When a request fails, we can identify the location of the issue by examining the reqId associated with that request.

If the user does not set a reqId, the client library will generate one randomly internally, but it is still recommended for the user to set it, as it can better associate with the user's request.

async function sqlWithReqid(wsSql) {
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";

let wsRows = null;
let wsSql = null;
try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery, 1);
wsRows = await wsSql.query('select * from meters', 2);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let result = wsRows.getData();
console.log('queryRes.Scan().then=>', result);
}
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

view source code

Writing data via parameter binding

The Node.js client library provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark ? to indicate the parameters to be bound.

Note: Do not use db.? in prepareStatement when specify the database with the table name, should directly use ?, then specify the database in setTableName, for example: prepareStatement.setTableName("db.t1").

Sample Code:

const taos = require("@tdengine/websocket");

let db = 'power';
let stable = 'meters';
let tags = ['California.SanFrancisco', 3];
let values = [
[1706786044994, 1706786044995, 1706786044996],
[10.2, 10.3, 10.4],
[292, 293, 294],
[0.32, 0.33, 0.34],
];

async function prepare() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}

(async () => {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
await stmt.setTableName('d1001');
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([tags[0]]);
tagParams.setInt([tags[1]]);
await stmt.setTags(tagParams);

let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(values[0]);
bindParams.setFloat(values[1]);
bindParams.setInt(values[2]);
bindParams.setFloat(values[3]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log(stmt.getLastAffected());
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
})();

view source code

The methods to set TAGS values or VALUES columns:

setBoolean(params: any[]): void;
setTinyInt(params: any[]): void;
setUTinyInt(params: any[]): void;
setSmallInt(params: any[]): void;
setUSmallInt(params: any[]): void;
setInt(params: any[]): void;
setUInt(params: any[]): void;
setBigint(params: any[]): void;
setUBigint(params: any[]): void;
setFloat(params: any[]): void;
setDouble(params: any[]): void;
setVarchar(params: any[]): void;
setBinary(params: any[]): void;
setNchar(params: any[]): void;
setJson(params: any[]): void;
setVarBinary(params: any[]): void;
setGeometry(params: any[]): void;
setTimestamp(params: any[]): void;

Note: Only TAG supports JSON types

Schemaless Writing

TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see Schemaless Writing.

const taos = require("@tdengine/websocket");

let influxdbData = ["meters,location=California.LosAngeles,groupId=2 current=11.8,voltage=221,phase=0.28 1648432611249",
"meters,location=California.LosAngeles,groupId=2 current=13.4,voltage=223,phase=0.29 1648432611250",
"meters,location=California.LosAngeles,groupId=3 current=10.8,voltage=223,phase=0.29 1648432611249"];

let jsonData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]

let telnetData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3"];

async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}

async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}
test()

view source code

Schemaless with reqId

This reqId can be used to request link tracing.

await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.NANO_SECONDS, ttl, reqId);

Data Subscription

The TDengine Node.js client library supports subscription functionality with the following application API.

Create a Topic

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);

view source code

Create a Consumer

async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}

view source code

Parameter Description

  • taos.TMQConstants.CONNECT_USER: username.
  • taos.TMQConstants.CONNECT_PASS: password.
  • taos.TMQConstants.GROUP_ID: Specifies the group that the consumer is in.
  • taos.TMQConstants.CLIENT_ID: client id.
  • taos.TMQConstants.WS_URL: The URL address of TaosAdapter.
  • taos.TMQConstants.AUTO_OFFSET_RESET: When offset does not exist, where to start consumption, the optional value is earliest or latest, the default is latest.
  • taos.TMQConstants.ENABLE_AUTO_COMMIT: Specifies whether to commit automatically.
  • taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: Automatic submission interval, the default value is 5000 ms.
  • taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.

For more information, see Consumer Parameters. Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0.

Subscribe to consume data

async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}

view source code

Assignment subscription Offset

        let assignment = await consumer.assignment();
console.log(assignment);

assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}

view source code

Close subscriptions

// Unsubscribe
consumer.unsubscribe();
// Close consumer
consumer.close()
// free connector resource
taos.destroy();

For more information, see Data Subscription.

Full Sample Code

const taos = require("@tdengine/websocket");

const db = 'power';
const stable = 'meters';
const topics = ['power_meters_topic'];

// ANCHOR: create_consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}
// ANCHOR_END: create_consumer

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('power')
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

// ANCHOR: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
// ANCHOR_END: create_topic

for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
wsSql.Close();
}

// ANCHOR: subscribe
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}
// ANCHOR_END: subscribe

async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await subscribe(consumer)
// ANCHOR: assignment
let assignment = await consumer.assignment();
console.log(assignment);

assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// ANCHOR_END: assignment
await consumer.unsubscribe();
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}

test()

view source code

More sample programs

Sample ProgramsSample Program Description
sql_exampleBasic operations such as establishing connections and running SQl commands.
stmt_exampleBinding multi-line parameter insertion.
line_exampleSchemaless insert
telnet_line_exampleOpenTSDB Telnet insert
json_line_exampleOpenTSDB Json insert
tmq_exampleUsing data subscription

Usage limitations

  • Node.js client library (@tdengine/websocket) supports Node.js 14 or higher.
  • It supports only WebSocket connection, so taosAdapter needs to be started in advance.
  • After using the connect, you need to call taos.destroy(); Release connector resources.

Frequently Asked Questions

  1. "Unable to establish connection" or "Unable to resolve FQDN"

    Solution: Usually, the root cause is an incorrect FQDN configuration. You can refer to this section in the FAQ to troubleshoot.