Ingesting Data in Parameter Binding Mode
When inserting data using parameter binding, it can avoid the resource consumption of SQL syntax parsing, thereby significantly improving the write performance. The reasons why parameter binding can improve writing efficiency include:
- Reduced parsing time: With parameter binding, the structure of the SQL statement is determined at the first execution, and subsequent executions only need to replace parameter values, thus avoiding syntax parsing each time and reducing parsing time.
- Precompilation: When using parameter binding, the SQL statement can be precompiled and cached. When executed later with different parameter values, the precompiled version can be used directly, improving execution efficiency.
- Reduced network overhead: Parameter binding also reduces the amount of data sent to the database because only parameter values need to be sent, not the complete SQL statement, especially when performing a large number of similar insert or update operations, this difference is particularly noticeable.
Tips: It is recommended to use parameter binding for data insertion
Next, we continue to use smart meters as an example to demonstrate the efficient writing functionality of parameter binding with various language connectors:
- Prepare a parameterized SQL insert statement for inserting data into the supertable
meters
. This statement allows dynamically specifying subtable names, tags, and column values. - Loop to generate multiple subtables and their corresponding data rows. For each subtable:
- Set the subtable's name and tag values (group ID and location).
- Generate multiple rows of data, each including a timestamp, randomly generated current, voltage, and phase values.
- Perform batch insertion operations to insert these data rows into the corresponding subtable.
- Finally, print the actual number of rows inserted into the table.
WebSocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
There are two kinds of interfaces for parameter binding: one is the standard JDBC interface, and the other is an extended interface. The extended interface offers better performance.
public class WSParameterBindingStdInterfaceDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-WS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
// If you are certain that the child table exists, you can avoid binding the tag column to improve performance.
String sql = "INSERT INTO power.meters (tbname, groupid, location, ts, current, voltage, phase) VALUES (?,?,?,?,?,?,?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long current = System.currentTimeMillis();
for (int i = 1; i <= numOfSubTable; i++) {
for (int j = 0; j < numOfRow; j++) {
pstmt.setString(1, "d_bind_" + i);
pstmt.setInt(2, i);
pstmt.setString(3, "location_" + i);
pstmt.setTimestamp(4, new Timestamp(current + j));
pstmt.setFloat(5, random.nextFloat() * 30);
pstmt.setInt(6, random.nextInt(300));
pstmt.setFloat(7, random.nextFloat());
pstmt.addBatch();
}
}
int[] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
public class WSParameterBindingExtendInterfaceDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-WS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
This is a more detailed parameter binding example
from datetime import datetime
import random
import taosws
numOfSubTable = 10
numOfRow = 10
conn = None
stmt = None
host="localhost"
port=6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# ANCHOR: stmt
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement()
stmt.prepare(sql)
for i in range(numOfSubTable):
tbname = f"d_bind_{i}"
tags = [
taosws.int_to_tag(i),
taosws.varchar_to_tag(f"location_{i}"),
]
stmt.set_tbname_tags(tbname, tags)
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i)
currents.append(random.random() * 30)
voltages.append(random.randint(100, 300))
phases.append(random.random())
stmt.bind_param(
[
taosws.millis_timestamps_to_column(timestamps),
taosws.floats_to_column(currents),
taosws.ints_to_column(voltages),
taosws.floats_to_column(phases),
]
)
stmt.add_batch()
stmt.execute()
print(f"Successfully inserted to power.meters.")
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()
if conn:
conn.close()
package main
import (
"database/sql"
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/stmt"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
taosDSN := fmt.Sprintf("root:taosdata@http(%s:6041)/", host)
db, err := sql.Open("taosRestful", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable power.meters, ErrMessage: " + err.Error())
}
config := stmt.NewConfig(fmt.Sprintf("ws://%s:6041", host), 0)
config.SetConnectUser("root")
config.SetConnectPass("taosdata")
config.SetConnectDB("power")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
connector, err := stmt.NewConnector(config)
if err != nil {
log.Fatalln("Failed to create stmt connector,url: " + taosDSN + "; ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt, err := connector.Init()
if err != nil {
log.Fatalln("Failed to init stmt, sql: " + sql + ", ErrMessage: " + err.Error())
}
err = stmt.Prepare(sql)
if err != nil {
log.Fatal("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
tagsType := param.NewColumnType(2).AddInt().AddBinary(24)
columnType := param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
// set tableName
err = stmt.SetTableName(tableName)
if err != nil {
log.Fatal("Failed to set table name, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// set tags
err = stmt.SetTags(tags, tagsType)
if err != nil {
log.Fatal("Failed to set tags, ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
columnData := make([]*param.Param, 4)
columnData[0] = param.NewParam(1).AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond)
columnData[1] = param.NewParam(1).AddFloat(rand.Float32() * 30)
columnData[2] = param.NewParam(1).AddInt(rand.Intn(300))
columnData[3] = param.NewParam(1).AddFloat(rand.Float32())
err = stmt.BindParam(columnData, columnType)
if err != nil {
log.Fatal("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatal("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Exec()
if err != nil {
log.Fatal("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("Failed to close stmt, ErrMessage: " + err.Error())
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
const taos = require("@tdengine/websocket");
let db = 'power';
let stable = 'meters';
let numOfSubTable = 10;
let numOfRow = 10;
let dsn = 'ws://localhost:6041'
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}
async function prepare() {
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}
async function test() {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
for (let i = 0; i < numOfSubTable; i++) {
await stmt.setTableName(`d_bind_${i}`);
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([`location_${i}`]);
tagParams.setInt([i]);
await stmt.setTags(tagParams);
let timestampParams = [];
let currentParams = [];
let voltageParams = [];
let phaseParams = [];
const currentMillis = new Date().getTime();
for (let j = 0; j < numOfRow; j++) {
timestampParams.push(currentMillis + j);
currentParams.push(Math.random() * 30);
voltageParams.push(getRandomInt(100, 300));
phaseParams.push(Math.random());
}
let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(timestampParams);
bindParams.setFloat(currentParams);
bindParams.setInt(voltageParams);
bindParams.setFloat(phaseParams);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
}
}
catch (err) {
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"protocol=WebSocket;host={host};port=6041;useSSL=false;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taosws.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(WS_TAOS *taos, const char *sql) {
WS_RES *res = ws_query(taos, sql);
int code = ws_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", ws_errstr(res));
ws_free_result(res);
ws_close(taos);
exit(EXIT_FAILURE);
}
ws_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
ws_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(WS_TAOS *taos) {
// init
WS_STMT *stmt = ws_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = ws_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
WS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
WS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = ws_stmt_bind_param_batch(stmt, params, 4);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = ws_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
int affected_rows = 0;
code = ws_stmt_execute(stmt, &affected_rows);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = ws_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
ws_stmt_close(stmt);
}
int main() {
int code = 0;
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
ws_close(taos);
}
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class ParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
This is a more detailed parameter binding example
import taos
from datetime import datetime
import random
numOfSubTable = 10
numOfRow = 10
conn = None
stmt2 = None
host="localhost"
port=6030
try:
# 1 connect
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port,
)
# 2 create db and table
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# 3 prepare
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt2 = conn.statement2(sql)
tbnames = []
tags = []
datas = []
for i in range(numOfSubTable):
# tbnames
tbnames.append(f"d_bind_{i}")
# tags
tags.append([i, f"location_{i}"])
# datas
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i*1000 + j)
currents.append(float(random.random() * 30))
voltages.append(random.randint(100, 300))
phases.append(float(random.random()))
data = [timestamps, currents, voltages, phases]
datas.append(data)
# 4 bind param
stmt2.bind_param(tbnames, tags, datas)
# 5 execute
stmt2.execute()
# show
print(f"Successfully inserted with stmt2 to power.meters. child={numOfSubTable} rows={numOfRow} \n")
except Exception as err:
print(f"Failed to insert to table meters using stmt2, ErrMessage:{err}")
raise err
finally:
if stmt2:
stmt2.close()
if conn:
conn.close()
The example code for binding parameters with stmt2 (Go connector v3.6.0 and above, TDengine v3.3.5.0 and above) is as follows:
package main
import (
"database/sql/driver"
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/stmt"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
db, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
reqID := common.GetReqID()
stmt2 := db.Stmt2(reqID, false)
err = stmt2.Prepare(sql)
if err != nil {
log.Fatalln("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
// generate column data
current := time.Now()
columns := make([][]driver.Value, 4)
for j := 0; j < numOfRow; j++ {
columns[0] = append(columns[0], current.Add(time.Millisecond*time.Duration(j)))
columns[1] = append(columns[1], rand.Float32()*30)
columns[2] = append(columns[2], rand.Int31n(300))
columns[3] = append(columns[3], rand.Float32())
}
// generate bind data
tableName := fmt.Sprintf("d_bind_%d", i)
tags := []driver.Value{int32(i), []byte(fmt.Sprintf("location_%d", i))}
bindData := []*stmt.TaosStmt2BindData{
{
TableName: tableName,
Tags: tags,
Cols: columns,
},
}
// bind params
err = stmt2.Bind(bindData)
if err != nil {
log.Fatalln("Failed to bind params, ErrMessage: " + err.Error())
}
// execute batch
err = stmt2.Execute()
if err != nil {
log.Fatalln("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt2.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt2.Close()
if err != nil {
log.Fatal("failed to close statement, err:", err)
}
}
The example code for binding parameters with stmt is as follows:
package main
import (
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
db, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt := db.Stmt()
err = stmt.Prepare(sql)
if err != nil {
log.Fatalln("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
// set tableName and tags
err = stmt.SetTableNameWithTags(tableName, tags)
if err != nil {
log.Fatalln("Failed to set table name and tags, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
row := param.NewParam(4).
AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond).
AddFloat(rand.Float32() * 30).
AddInt(rand.Intn(300)).
AddFloat(rand.Float32())
err = stmt.BindRow(row)
if err != nil {
log.Fatalln("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatalln("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Execute()
if err != nil {
log.Fatalln("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("failed to close statement, err:", err)
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
Not supported
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"host={host};port=6030;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taos.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
TAOS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length =(int32_t *) &tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
TAOS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = taos_stmt_bind_param(stmt, params);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = taos_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = taos_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
Not supported