Skip to main content

TDengine Node.js Client Library

@tdengine/websocket is the official Node.js client library for TDengine. Node.js developers can develop applications to access the TDengine instance data.

The source code for the Node.js client library is hosted on GitHub.

Connection types

Node.js connector supports only websocket connection through taosAdapter.

For a detailed introduction of the connection types, please refer to: Establish Connection

Supported platforms

Node.js client library needs to be run with Node.js 14 or higher version.

Recent update logs

Node.js connector versionmajor changesTDengine 版本
3.1.0new version, supports websocket3.2.0.0 or later

Supported features

  1. Connection Management
  2. General Query
  3. Continuous Query
  4. Parameter Binding
  5. Subscription
  6. Schemaless

Handling exceptions

After an error is reported, the error message and error code can be obtained through try catch. The Node.js client library error code is between 100 and 110, while the other error codes are for the TDengine function module.

Please refer to the table below for error code, error description and corresponding suggestions.

Error CodeDescriptionSuggested Actions
100invalid variablesThe parameter is invalid. Check the interface specification and adjust the parameter type and size.
101invalid urlURL error, please check if the url is correct.
102received server data but did not find a callback for processingClient waiting timeout, please check network and TaosAdapter status.
103invalid message typePlease check if the client version and server version match.
104connection creation failedConnection creation failed. Please check the network and TaosAdapter status.
105websocket request timeoutIncrease the execution time by adding the messageWaitTimeout parameter, or check the connection to the TaosAdapter.
106authentication failAuthentication failed, please check if the username and password are correct.
107unknown sql type in tdengineCheck the data type supported by TDengine.
108connection has been closedThe connection has been closed, check the connection status, or recreate the connection to execute the relevant instructions.
109fetch block data parse failPlease check if the client version and server version match.
110websocket connection has reached its maximum limitPlease check if the connection has been closed after use

Data Type Mapping

The table below despicts the mapping between TDengine data type and Node.js data type.

TDengine Data TypeNode.js Data Type
TIMESTAMPbigint
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTbigint
TINYINT UNSIGNEDnumber
SMALLINT UNSIGNEDnumber
INT UNSIGNEDnumber
BIGINT UNSIGNEDbigint
FLOATnumber
DOUBLEnumber
BOOLboolean
BINARYstring
NCHARstring
JSONstring
VARBINARYArrayBuffer
GEOMETRYArrayBuffer

Note: Only TAG supports JSON types

Installation Steps

Pre-installation preparation

Install Node.js client library via npm

npm install @tdengine/websocket

Verify

After installing the TDengine client, use the nodejsChecker.js program to verify that the current environment supports Node.js access to TDengine.

Verification in details:

  • Create an installation test folder such as ~/tdengine-test. Download the nodejsChecker.js to your local machine.

  • Execute the following command from the command-line.

  npm init -y
npm install @tdengine/websocket
node nodejsChecker.js host=localhost
  • After executing the above steps, the command-line will output the result of nodejsChecker.js connecting to the TDengine instance and performing a simple insert and query.

Establishing a connection

Install and import the @tdengine/websocket package.

Note: After using the Node.js client library, it is necessary to call taos.destroy() Release connector resources.

const taos = require("@tdengine/websocket");

//database operations......

taos.destroy();
WSConfig configures the Websocket parameters as follows:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
const taos = require("@tdengine/websocket");

let dsn = 'ws://localhost:6041';
async function createConnect() {
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
conn = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
return conn;
} catch (err) {
console.log("Failed to connect to " + dsn + ", ErrCode: " + err.code + ", ErrMessage: " + err.message);
throw err;
}

}

view source code

Usage examples

Create database and tables

async function createDbAndTable() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
console.log("Create database power successfully.");
// create table
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
'(ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);');

console.log("Create stable power.meters successfully");
} catch (err) {
console.error(`Failed to create database power or stable meters, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}

}

view source code

Note: If you do not use USE power to specify the database, all subsequent operations on the table need to add the database name as a prefix, such as power.meters.

Insert data

async function insertData() {
let wsSql = null
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";

try {
wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
} catch (err) {
console.error(`Failed to insert data to power.meters, sql: ${insertQuery}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
} finally {
if (wsSql) {
await wsSql.close();
}
}
}

view source code

NOW is an internal function. The default is the current time of the client's computer. NOW + 1s represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years).

Querying data

async function queryData() {
let wsRows = null;
let wsSql = null;
let sql = 'SELECT ts, current, location FROM power.meters limit 100';
try {
wsSql = await createConnect();
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
}
}
catch (err) {
console.error(`Failed to query data from power.meters, sql: ${sql}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

view source code

Discovered data structure

wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]

Execute SQL with reqId

The reqId is very similar to TraceID in distributed tracing systems. In a distributed system, a request may need to pass through multiple services or modules to be completed. The reqId is used to identify and associate all related operations of this request, allowing us to track and understand the complete execution path of the request. Here are some primary usage of reqId:

  • Request Tracing: By associating the same reqId with all related operations of a request, we can trace the complete path of the request within the system.
  • Performance Analysis: By analyzing a request's reqId, we can understand the processing time of the request across various services or modules, thereby identifying performance bottlenecks.
  • Fault Diagnosis: When a request fails, we can identify the location of the issue by examining the reqId associated with that request.

If the user does not set a reqId, the client library will generate one randomly internally, but it is still recommended for the user to set it, as it can better associate with the user's request.

async function sqlWithReqid() {
let wsRows = null;
let wsSql = null;
let reqId = 1;
try {
wsSql = await createConnect();
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', reqId);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
}
}
catch (err) {
console.error(`Failed to query data from power.meters, reqId: ${reqId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
}
}

view source code

Writing data via parameter binding

The Node.js client library provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark ? to indicate the parameters to be bound.

Note: Do not use db.? in prepareStatement when specify the database with the table name, should directly use ?, then specify the database in setTableName, for example: prepareStatement.setTableName("db.t1").

Sample Code:

const taos = require("@tdengine/websocket");

let db = 'power';
let stable = 'meters';
let numOfSubTable = 10;
let numOfRow = 10;
let dsn = 'ws://localhost:6041'
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}

async function prepare() {

let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}

async function test() {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
for (let i = 0; i < numOfSubTable; i++) {
await stmt.setTableName(`d_bind_${i}`);
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([`location_${i}`]);
tagParams.setInt([i]);
await stmt.setTags(tagParams);
let timestampParams = [];
let currentParams = [];
let voltageParams = [];
let phaseParams = [];
const currentMillis = new Date().getTime();
for (let j = 0; j < numOfRow; j++) {
timestampParams.push(currentMillis + j);
currentParams.push(Math.random() * 30);
voltageParams.push(getRandomInt(100, 300));
phaseParams.push(Math.random());
}
let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(timestampParams);
bindParams.setFloat(currentParams);
bindParams.setInt(voltageParams);
bindParams.setFloat(phaseParams);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
}
}
catch (err) {
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
}

test()

view source code

The methods to set TAGS values or VALUES columns:

setBoolean(params: any[]): void;
setTinyInt(params: any[]): void;
setUTinyInt(params: any[]): void;
setSmallInt(params: any[]): void;
setUSmallInt(params: any[]): void;
setInt(params: any[]): void;
setUInt(params: any[]): void;
setBigint(params: any[]): void;
setUBigint(params: any[]): void;
setFloat(params: any[]): void;
setDouble(params: any[]): void;
setVarchar(params: any[]): void;
setBinary(params: any[]): void;
setNchar(params: any[]): void;
setJson(params: any[]): void;
setVarBinary(params: any[]): void;
setGeometry(params: any[]): void;
setTimestamp(params: any[]): void;

Note: Only TAG supports JSON types

Schemaless Writing

TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see Schemaless Writing.

const taos = require("@tdengine/websocket");

let influxdbData = ["meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"];
let jsonData = ["{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"]
let telnetData = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"];

async function createConnect() {
let dsn = 'ws://localhost:6041'
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
await wsSql.exec('USE power');
return wsSql;
}


async function test() {
let wsSql = null;
let wsRows = null;
let ttl = 0;
try {
wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
console.log("Inserted data with schemaless successfully.")
}
catch (err) {
console.error(`Failed to insert data with schemaless, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (wsRows) {
await wsRows.close();
}
if (wsSql) {
await wsSql.close();
}
taos.destroy();
}
}

test()

view source code

Schemaless with reqId

This reqId can be used to request link tracing.

await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.NANO_SECONDS, ttl, reqId);

Data Subscription

The TDengine Node.js client library supports subscription functionality with the following application API.

Create a Topic

const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");

const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";

async function createConsumer() {

let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}

async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
await wsSql.close();
}

async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}

async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}

test()

view source code

Create a Consumer

const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";

async function createConsumer() {

let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}

view source code

Parameter Description

  • taos.TMQConstants.CONNECT_USER: username.
  • taos.TMQConstants.CONNECT_PASS: password.
  • taos.TMQConstants.GROUP_ID: Specifies the group that the consumer is in.
  • taos.TMQConstants.CLIENT_ID: client id.
  • taos.TMQConstants.WS_URL: The URL address of TaosAdapter.
  • taos.TMQConstants.AUTO_OFFSET_RESET: When offset does not exist, where to start consumption, the optional value is earliest or latest, the default is latest.
  • taos.TMQConstants.ENABLE_AUTO_COMMIT: Specifies whether to commit automatically.
  • taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: Automatic submission interval, the default value is 5000 ms.
  • taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.

For more information, see Consumer Parameters. Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0.

Subscribe to consume data

const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");

const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";

async function createConsumer() {

let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}

async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
await wsSql.close();
}

async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}

async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}

test()

view source code

Assignment subscription Offset

const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");

const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";

async function createConsumer() {

let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}

async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
await wsSql.close();
}

async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}

async function test() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
}

test()

view source code

Close subscriptions

// Unsubscribe
consumer.unsubscribe();
// Close consumer
consumer.close()
// free connector resource
taos.destroy();

For more information, see Data Subscription.

Full Sample Code

const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket");

// ANCHOR: create_consumer
const db = 'power';
const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters'
const topics = [topic];
const groupId = "group1";
const clientId = "client1";

async function createConsumer() {

let groupId = "group1";
let clientId = "client1";
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) {
console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}

}
// ANCHOR_END: create_consumer

async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;

let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);

let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
await wsSql.close();
}

async function insert() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 50; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
await sleep(100);
}
await wsSql.close();
}

async function subscribe(consumer) {
// ANCHOR: commit
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
await consumer.commit();
console.log("Commit offset manually successfully.");
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
// ANCHOR_END: commit
}

async function test() {
// ANCHOR: unsubscribe
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
}
catch (err) {
console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (consumer) {
await consumer.close();
console.log("Consumer closed successfully.");
}
taos.destroy();
}
// ANCHOR_END: unsubscribe
}

test()

view source code

More sample programs

Sample ProgramsSample Program Description
sql_exampleBasic operations such as establishing connections and running SQl commands.
stmt_exampleBinding multi-line parameter insertion.
line_exampleSchemaless insert
telnet_line_exampleOpenTSDB Telnet insert
json_line_exampleOpenTSDB Json insert
tmq_exampleUsing data subscription

Usage limitations

  • Node.js client library (@tdengine/websocket) supports Node.js 14 or higher.
  • It supports only WebSocket connection, so taosAdapter needs to be started in advance.
  • After using the connect, you need to call taos.destroy(); Release connector resources.

Frequently Asked Questions

  1. "Unable to establish connection" or "Unable to resolve FQDN"

    Solution: Usually, the root cause is an incorrect FQDN configuration. You can refer to this section in the FAQ to troubleshoot.