Ingesting Data in Parameter Binding Mode
Using parameter binding for writing data can avoid the resource consumption of SQL syntax parsing, thus significantly improving writing performance. The reasons parameter binding can enhance writing efficiency include:
- Reduced Parsing Time: With parameter binding, the structure of the SQL statement is determined upon the first execution. Subsequent executions only need to replace the parameter values, thereby avoiding syntax parsing for each execution, which reduces parsing time.
- Precompilation: When using parameter binding, SQL statements can be precompiled and cached. When executing with different parameter values later, the precompiled version can be used directly, improving execution efficiency.
- Reduced Network Overhead: Parameter binding can also reduce the amount of data sent to the database since only parameter values need to be sent rather than the full SQL statement. This difference is particularly noticeable when executing a large number of similar insert or update operations.
Tips: Data writing is recommended to use parameter binding.
Next, we will continue using smart meters as an example to demonstrate how various language connectors efficiently write data using parameter binding:
- Prepare a parameterized SQL insert statement for inserting data into the supertable
meters
. This statement allows dynamically specifying the subtable name, tags, and column values. - Loop to generate multiple subtables and their corresponding data rows. For each subtable:
- Set the subtable name and tag values (group ID and location).
- Generate multiple rows of data, each including a timestamp, randomly generated current, voltage, and phase values.
- Execute batch insert operations to insert these data rows into the corresponding subtable.
- Finally, print the actual number of rows inserted into the table.
Websocket Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class WSParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-WS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat() * 30);
pstmt.setInt(3, random.nextInt(300));
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
int[] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
Here is a more detailed parameter binding example.
from datetime import datetime
import random
import taosws
numOfSubTable = 10
numOfRow = 10
conn = None
stmt = None
host="localhost"
port=6041
try:
conn = taosws.connect(user="root",
password="taosdata",
host=host,
port=port)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# ANCHOR: stmt
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement()
stmt.prepare(sql)
for i in range(numOfSubTable):
tbname = f"d_bind_{i}"
tags = [
taosws.int_to_tag(i),
taosws.varchar_to_tag(f"location_{i}"),
]
stmt.set_tbname_tags(tbname, tags)
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i)
currents.append(random.random() * 30)
voltages.append(random.randint(100, 300))
phases.append(random.random())
stmt.bind_param(
[
taosws.millis_timestamps_to_column(timestamps),
taosws.floats_to_column(currents),
taosws.ints_to_column(voltages),
taosws.floats_to_column(phases),
]
)
stmt.add_batch()
stmt.execute()
print(f"Successfully inserted to power.meters.")
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()
if conn:
conn.close()
package main
import (
"database/sql"
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/stmt"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
taosDSN := fmt.Sprintf("root:taosdata@http(%s:6041)/", host)
db, err := sql.Open("taosRestful", taosDSN)
if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable power.meters, ErrMessage: " + err.Error())
}
config := stmt.NewConfig(fmt.Sprintf("ws://%s:6041", host), 0)
config.SetConnectUser("root")
config.SetConnectPass("taosdata")
config.SetConnectDB("power")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
connector, err := stmt.NewConnector(config)
if err != nil {
log.Fatalln("Failed to create stmt connector,url: " + taosDSN + "; ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt, err := connector.Init()
if err != nil {
log.Fatalln("Failed to init stmt, sql: " + sql + ", ErrMessage: " + err.Error())
}
err = stmt.Prepare(sql)
if err != nil {
log.Fatal("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
tagsType := param.NewColumnType(2).AddInt().AddBinary(24)
columnType := param.NewColumnType(4).AddTimestamp().AddFloat().AddInt().AddFloat()
// set tableName
err = stmt.SetTableName(tableName)
if err != nil {
log.Fatal("Failed to set table name, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// set tags
err = stmt.SetTags(tags, tagsType)
if err != nil {
log.Fatal("Failed to set tags, ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
columnData := make([]*param.Param, 4)
columnData[0] = param.NewParam(1).AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond)
columnData[1] = param.NewParam(1).AddFloat(rand.Float32() * 30)
columnData[2] = param.NewParam(1).AddInt(rand.Intn(300))
columnData[3] = param.NewParam(1).AddFloat(rand.Float32())
err = stmt.BindParam(columnData, columnType)
if err != nil {
log.Fatal("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatal("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Exec()
if err != nil {
log.Fatal("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("Failed to close stmt, ErrMessage: " + err.Error())
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
const taos = require("@tdengine/websocket");
let db = 'power';
let stable = 'meters';
let numOfSubTable = 10;
let numOfRow = 10;
let dsn = 'ws://localhost:6041'
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}
async function prepare() {
let conf = new taos.WSConfig(dsn);
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb(db)
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(`CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`);
await wsSql.exec(`CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`);
return wsSql
}
async function test() {
let stmt = null;
let connector = null;
try {
connector = await prepare();
stmt = await connector.stmtInit();
await stmt.prepare(`INSERT INTO ? USING ${db}.${stable} (location, groupId) TAGS (?, ?) VALUES (?, ?, ?, ?)`);
for (let i = 0; i < numOfSubTable; i++) {
await stmt.setTableName(`d_bind_${i}`);
let tagParams = stmt.newStmtParam();
tagParams.setVarchar([`location_${i}`]);
tagParams.setInt([i]);
await stmt.setTags(tagParams);
let timestampParams = [];
let currentParams = [];
let voltageParams = [];
let phaseParams = [];
const currentMillis = new Date().getTime();
for (let j = 0; j < numOfRow; j++) {
timestampParams.push(currentMillis + j);
currentParams.push(Math.random() * 30);
voltageParams.push(getRandomInt(100, 300));
phaseParams.push(Math.random());
}
let bindParams = stmt.newStmtParam();
bindParams.setTimestamp(timestampParams);
bindParams.setFloat(currentParams);
bindParams.setInt(voltageParams);
bindParams.setFloat(phaseParams);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
}
}
catch (err) {
console.error(`Failed to insert to table meters using stmt, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
finally {
if (stmt) {
await stmt.close();
}
if (connector) {
await connector.close();
}
taos.destroy();
}
}
test()
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"protocol=WebSocket;host={host};port=6041;useSSL=false;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taosws.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(WS_TAOS *taos, const char *sql) {
WS_RES *res = ws_query(taos, sql);
int code = ws_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", ws_errstr(res));
ws_free_result(res);
ws_close(taos);
exit(EXIT_FAILURE);
}
ws_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(WS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg, code, ws_stmt_errstr(stmt));
ws_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(WS_TAOS *taos) {
// init
WS_STMT *stmt = ws_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init ws_stmt, error: %s\n", ws_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = ws_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute ws_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
WS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length = (int32_t *)&tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = ws_stmt_set_tbname_tags(stmt, table_name, tags, 2);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
WS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = ws_stmt_bind_param_batch(stmt, params, 4);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = ws_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
int affected_rows = 0;
code = ws_stmt_execute(stmt, &affected_rows);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = ws_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
ws_stmt_close(stmt);
}
int main() {
int code = 0;
char *dsn = "ws://localhost:6041";
WS_TAOS *taos = ws_connect(dsn);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s, ErrCode: 0x%x, ErrMessage: %s.\n", dsn, ws_errno(NULL), ws_errstr(NULL));
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
ws_close(taos);
}
Not supported
Native Connection
- Java
- Python
- Go
- Rust
- Node.js
- C#
- C
- REST API
public class ParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
Here is a more detailed parameter binding example.
import taos
from datetime import datetime
import random
numOfSubTable = 10
numOfRow = 10
conn = None
stmt = None
host="localhost"
port=6030
try:
conn = taos.connect(
user="root",
password="taosdata",
host=host,
port=port,
)
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
# ANCHOR: stmt
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
stmt = conn.statement(sql)
for i in range(numOfSubTable):
tbname = f"d_bind_{i}"
tags = taos.new_bind_params(2)
tags[0].int([i])
tags[1].binary([f"location_{i}"])
stmt.set_tbname_tags(tbname, tags)
current = int(datetime.now().timestamp() * 1000)
timestamps = []
currents = []
voltages = []
phases = []
for j in range (numOfRow):
timestamps.append(current + i)
currents.append(random.random() * 30)
voltages.append(random.randint(100, 300))
phases.append(random.random())
params = taos.new_bind_params(4)
params[0].timestamp(timestamps)
params[1].float(currents)
params[2].int(voltages)
params[3].float(phases)
stmt.bind_param_batch(params)
stmt.execute()
print(f"Successfully inserted to power.meters.")
except Exception as err:
print(f"Failed to insert to table meters using stmt, ErrMessage:{err}")
raise err
finally:
if stmt:
stmt.close()
if conn:
conn.close()
package main
import (
"fmt"
"log"
"math/rand"
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func main() {
host := "127.0.0.1"
numOfSubTable := 10
numOfRow := 10
db, err := af.Open(host, "root", "taosdata", "", 0)
if err != nil {
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
}
defer db.Close()
// prepare database and table
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("USE power")
if err != nil {
log.Fatalln("Failed to use database power, ErrMessage: " + err.Error())
}
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
}
// prepare statement
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
stmt := db.Stmt()
err = stmt.Prepare(sql)
if err != nil {
log.Fatalln("Failed to prepare sql, sql: " + sql + ", ErrMessage: " + err.Error())
}
for i := 1; i <= numOfSubTable; i++ {
tableName := fmt.Sprintf("d_bind_%d", i)
tags := param.NewParam(2).AddInt(i).AddBinary([]byte(fmt.Sprintf("location_%d", i)))
// set tableName and tags
err = stmt.SetTableNameWithTags(tableName, tags)
if err != nil {
log.Fatalln("Failed to set table name and tags, tableName: " + tableName + "; ErrMessage: " + err.Error())
}
// bind column data
current := time.Now()
for j := 0; j < numOfRow; j++ {
row := param.NewParam(4).
AddTimestamp(current.Add(time.Millisecond*time.Duration(j)), common.PrecisionMilliSecond).
AddFloat(rand.Float32() * 30).
AddInt(rand.Intn(300)).
AddFloat(rand.Float32())
err = stmt.BindRow(row)
if err != nil {
log.Fatalln("Failed to bind params, ErrMessage: " + err.Error())
}
}
// add batch
err = stmt.AddBatch()
if err != nil {
log.Fatalln("Failed to add batch, ErrMessage: " + err.Error())
}
// execute batch
err = stmt.Execute()
if err != nil {
log.Fatalln("Failed to exec, ErrMessage: " + err.Error())
}
// get affected rows
affected := stmt.GetAffectedRows()
// you can check exeResult here
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
}
err = stmt.Close()
if err != nil {
log.Fatal("failed to close statement, err:", err)
}
}
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![Value::Int(i as i32), Value::VarChar(format!("location_{}", i).into())];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", table_name, tags, err);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to bind values, values:{:?}, ErrMessage: {}", values, err);
return Err(err.into());
}
}
}
match stmt.add_batch().await{
Ok(_) => {},
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await{
Ok(affected_rows) => println!("Successfully inserted {} rows to power.meters.", affected_rows),
Err(err) => {
eprintln!("Failed to insert to table meters using stmt, ErrMessage: {}", err);
return Err(err.into());
}
}
Ok(())
}
Not supported
public static void Main(string[] args)
{
var host = "127.0.0.1";
var numOfSubTable = 10;
var numOfRow = 10;
var random = new Random();
var connectionString = $"host={host};port=6030;username=root;password=taosdata";
try
{
var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder))
{
// create database
client.Exec("CREATE DATABASE IF NOT EXISTS power");
// use database
client.Exec("USE power");
// create table
client.Exec(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
stmt.Prepare(sql);
for (int i = 1; i <= numOfSubTable; i++)
{
var tableName = $"d_bind_{i}";
// set table name
stmt.SetTableName(tableName);
// set tags
stmt.SetTags(new object[] { i, $"location_{i}" });
var current = DateTime.Now;
// bind rows
for (int j = 0; j < numOfRow; j++)
{
stmt.BindRow(new object[]
{
current.Add(TimeSpan.FromMilliseconds(j)),
random.NextSingle() * 30,
random.Next(300),
random.NextSingle()
});
}
// add batch
stmt.AddBatch();
// execute
stmt.Exec();
// get affected rows
var affectedRows = stmt.Affected();
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
}
}
}
}
catch (TDengineError e)
{
// handle TDengine error
Console.WriteLine("Failed to insert to table meters using stmt, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
throw;
}
catch (Exception e)
{
// handle other exceptions
Console.WriteLine("Failed to insert to table meters using stmt, ErrMessage: " + e.Message);
throw;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include "taos.h"
/**
* @brief execute sql only.
*
* @param taos
* @param sql
*/
void executeSQL(TAOS *taos, const char *sql) {
TAOS_RES *res = taos_query(taos, sql);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "%s\n", taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(EXIT_FAILURE);
}
taos_free_result(res);
}
/**
* @brief check return status and exit program when error occur.
*
* @param stmt
* @param code
* @param msg
*/
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
if (code != 0) {
fprintf(stderr, "%s. code: %d, error: %s\n", msg,code,taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
exit(EXIT_FAILURE);
}
}
typedef struct {
int64_t ts;
float current;
int voltage;
float phase;
} Row;
int num_of_sub_table = 10;
int num_of_row = 10;
int total_affected = 0;
/**
* @brief insert data using stmt API
*
* @param taos
*/
void insertData(TAOS *taos) {
// init
TAOS_STMT *stmt = taos_stmt_init(taos);
if (stmt == NULL) {
fprintf(stderr, "Failed to init taos_stmt, error: %s\n", taos_stmt_errstr(NULL));
exit(EXIT_FAILURE);
}
// prepare
const char *sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "Failed to execute taos_stmt_prepare");
for (int i = 1; i <= num_of_sub_table; i++) {
char table_name[20];
sprintf(table_name, "d_bind_%d", i);
char location[20];
sprintf(location, "location_%d", i);
// set table name and tags
TAOS_MULTI_BIND tags[2];
// groupId
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
tags[0].buffer_length = sizeof(int);
tags[0].length = (int32_t *)&tags[0].buffer_length;
tags[0].buffer = &i;
tags[0].is_null = NULL;
tags[0].num = 1;
// location
tags[1].buffer_type = TSDB_DATA_TYPE_BINARY;
tags[1].buffer_length = strlen(location);
tags[1].length =(int32_t *) &tags[1].buffer_length;
tags[1].buffer = location;
tags[1].is_null = NULL;
tags[1].num = 1;
code = taos_stmt_set_tbname_tags(stmt, table_name, tags);
checkErrorCode(stmt, code, "Failed to set table name and tags\n");
// insert rows
TAOS_MULTI_BIND params[4];
// ts
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(int64_t);
params[0].length = (int32_t *)¶ms[0].buffer_length;
params[0].is_null = NULL;
params[0].num = 1;
// current
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[1].buffer_length = sizeof(float);
params[1].length = (int32_t *)¶ms[1].buffer_length;
params[1].is_null = NULL;
params[1].num = 1;
// voltage
params[2].buffer_type = TSDB_DATA_TYPE_INT;
params[2].buffer_length = sizeof(int);
params[2].length = (int32_t *)¶ms[2].buffer_length;
params[2].is_null = NULL;
params[2].num = 1;
// phase
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
params[3].buffer_length = sizeof(float);
params[3].length = (int32_t *)¶ms[3].buffer_length;
params[3].is_null = NULL;
params[3].num = 1;
for (int j = 0; j < num_of_row; j++) {
struct timeval tv;
gettimeofday(&tv, NULL);
long long milliseconds = tv.tv_sec * 1000LL + tv.tv_usec / 1000; // current timestamp in milliseconds
int64_t ts = milliseconds + j;
float current = (float)rand() / RAND_MAX * 30;
int voltage = rand() % 300;
float phase = (float)rand() / RAND_MAX;
params[0].buffer = &ts;
params[1].buffer = ¤t;
params[2].buffer = &voltage;
params[3].buffer = &phase;
// bind param
code = taos_stmt_bind_param(stmt, params);
checkErrorCode(stmt, code, "Failed to bind param");
}
// add batch
code = taos_stmt_add_batch(stmt);
checkErrorCode(stmt, code, "Failed to add batch");
// execute batch
code = taos_stmt_execute(stmt);
checkErrorCode(stmt, code, "Failed to exec stmt");
// get affected rows
int affected = taos_stmt_affected_rows_once(stmt);
total_affected += affected;
}
fprintf(stdout, "Successfully inserted %d rows to power.meters.\n", total_affected);
taos_stmt_close(stmt);
}
int main() {
const char *host = "localhost";
const char *user = "root";
const char *password = "taosdata";
uint16_t port = 6030;
TAOS *taos = taos_connect(host, user, password, NULL, port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to %s:%hu, ErrCode: 0x%x, ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
exit(EXIT_FAILURE);
}
// create database and table
executeSQL(taos, "CREATE DATABASE IF NOT EXISTS power");
executeSQL(taos, "USE power");
executeSQL(taos,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
insertData(taos);
taos_close(taos);
taos_cleanup();
}
Not supported