Ingesting Data Efficiently
This section introduces how to efficiently write data to TDengine.
Principles of Efficient Writing
From the Perspective of the Client Program
From the perspective of the client program, efficient data writing should consider the following factors:
- The amount of data written at a time. Generally, the larger the batch size, the more efficient the writing (though the advantage may diminish beyond a certain threshold). When using SQL to write to TDengine, try to concatenate more data into a single SQL statement. Currently, the maximum length of a single SQL statement supported by TDengine is 1,048,576 (1MB) characters.
- The number of concurrent connections. Generally, the more concurrent connections writing data simultaneously, the more efficient the writing (though performance may decline beyond a certain threshold, depending on server capacity).
- The distribution of data across different tables (or subtables), i.e., the proximity of the data being written. Generally, writing data only to the same table (or subtable) in each batch is more efficient than writing to multiple tables (or subtables).
- The method of writing. In general:
- Parameter binding is more efficient than SQL writing because it avoids SQL parsing (though it increases the number of C interface calls, which can incur performance overhead).
- SQL writing without automatic table creation is more efficient than with automatic table creation because the latter frequently checks for table existence.
- SQL writing is more efficient than schema-less writing, as the latter automatically creates tables and supports dynamic changes to table structures.
The client program should fully and appropriately utilize these factors. In each writing operation, data should ideally only be written to the same table (or subtable), and the amount of data written per batch should be set to a value that is optimal for the current system's processing capacity based on testing and tuning. The number of concurrent write connections should also be set to an optimal value for the current system's processing capacity to achieve the best writing speed in the existing system.
From the Perspective of the Data Source
Client programs typically need to read data from a data source and then write it to TDengine. From the data source perspective, the following scenarios necessitate a queue between the read and write threads:
- Multiple data sources generate data at a rate that is significantly lower than the single-threaded write speed, but the overall data volume is considerable. In this case, the queue's role is to aggregate data from multiple sources to increase the amount of data written in a single operation.
- A single data source generates data at a rate significantly higher than the single-threaded write speed. Here, the queue's role is to increase the write concurrency.
- Data for a single table is scattered across multiple data sources. In this case, the queue's role is to aggregate data for the same table in advance, enhancing the proximity of data during writing.
If the data source for the writing application is Kafka, and the writing application itself is a Kafka consumer, the characteristics of Kafka can be leveraged for efficient writing. For example:
- Write data for the same table to the same Topic's same Partition, increasing data proximity.
- Aggregate data by subscribing to multiple Topics.
- Increase write concurrency by adding more Consumer threads.
- Increase the maximum data amount fetched each time to raise the maximum amount written in a single operation.
From the Perspective of Server Configuration
From the perspective of server configuration, it's important to set an appropriate number of vgroups when creating the database based on the number of disks in the system, their I/O capabilities, and processor capacity to fully utilize system performance. If there are too few vgroups, the system performance cannot be fully realized; if there are too many vgroups, unnecessary resource contention may occur. A general recommendation is to set the number of vgroups to twice the number of CPU cores, but tuning should still be based on the specific system resource configuration.
For more tuning parameters, refer to Manage Databases and taosd Reference.
Examples of Efficient Writing
Scenario Design
The following example program demonstrates how to efficiently write data, with the scenario designed as follows:
- The TDengine client program continuously reads data from other data sources, simulated in this example by generating mock data.
- A single connection cannot match the reading speed, so the client program starts multiple threads, each establishing a connection to TDengine with a dedicated fixed-size message queue.
- The client program hashes received data based on the associated table name (or subtable name) to determine the corresponding Queue index, ensuring that data belonging to a specific table (or subtable) is processed by a designated thread.
- Each sub-thread writes the data from its associated message queue to TDengine after emptying the queue or reaching a predefined data volume threshold, and continues processing the subsequently received data.
Example Code
This part provides example code for the above scenario. The principles of efficient writing are the same for other scenarios, but the code needs to be modified accordingly.
This example code assumes that the source data belongs to different subtables of the same supertable (meters
). The program creates this supertable in the test
database before writing data. For subtables, they will be automatically created by the application based on the received data. If the actual scenario involves multiple supertables, simply modify the code for automatic table creation in the write task.
- Java
Program Listing
Class Name | Function Description |
---|---|
FastWriteExample | Main program |
ReadTask | Reads data from the simulated source, hashes the table name to obtain the Queue index, and writes to the corresponding Queue |
WriteTask | Retrieves data from the Queue, composes a Batch, and writes to TDengine |
MockDataSource | Simulates the generation of data for various meters subtables |
SQLWriter | Depends on this class for SQL concatenation, automatic table creation, SQL writing, and SQL length checking |
StmtWriter | Implements batch writing via parameter binding (not yet completed) |
DataBaseMonitor | Monitors write speed and prints the current write speed to the console every 10 seconds |
Below is the complete code for each class and a more detailed function description.
FastWriteExample
The main program is responsible for:
- Creating message queues
- Starting write threads
- Starting read threads
- Monitoring write speed every 10 seconds
The main program exposes 4 parameters by default, which can be adjusted during each program start for testing and tuning:
- The number of read threads. Default is 1.
- The number of write threads. Default is 3.
- The total number of simulated tables. Default is 1,000. This will be evenly distributed among the read threads. If the total number of tables is large, table creation will take longer, and the initial monitoring of write speed may be slower.
- The maximum number of records to write in a single batch. Default is 3,000.
The queue capacity (taskQueueCapacity
) is also a performance-related parameter that can be adjusted by modifying the program. Generally speaking, the larger the queue capacity, the lower the probability of being blocked during enqueuing, and the greater the throughput of the queue, although memory usage will also increase. The default value in the example program has been set sufficiently high.
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int taskQueueCapacity = 1000000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}
public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
readTaskCount, writeTaskCount, tableCount, maxBatchSize);
databaseMonitor.init().prepareDatabase();
// Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue, maxBatchSize);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
}
// create reading tasks and start reading threads
int tableCountPerTask = tableCount / readTaskCount;
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
}
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) {
Thread.sleep(10000);
long numberOfTable = databaseMonitor.getTableCount();
long count = databaseMonitor.count();
logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
lastCount = count;
}
}
}
ReadTask
The read task is responsible for reading data from the data source. Each read task is associated with a simulated data source. Each simulated data source generates a limited amount of data for a table. Different simulated data sources generate data for different tables.
The read task uses a blocking method to write to the message queue. This means that once the queue is full, the write operation will block.
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private final int tableCount;
private boolean active = true;
public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
this.taskId = readTaskId;
this.taskQueues = queues;
this.queueCount = queues.size();
this.tableCount = tableCount;
}
/**
* Assign data received to different queues.
* Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/
public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
try {
while (it.hasNext() && active) {
String line = it.next();
int queueId = getQueueId(line);
taskQueues.get(queueId).put(line);
}
} catch (Exception e) {
logger.error("Read Task Error", e);
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
WriteTask
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;
// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;
// A flag indicate whether to continue.
private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
this.queue = taskQueue;
this.maxBatchSize = maxBatchSize;
}
@Override
public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try {
writer.init();
while (active) {
line = queue.poll();
if (line != null) {
// parse raw data and buffer the data.
writer.processLine(line);
} else if (writer.hasBufferedValues()) {
// write data immediately if no more data in the queue
writer.flush();
} else {
// sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
Thread.sleep(100);
}
}
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}
MockDataSource
package com.taos.example.highvolume;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"California.LosAngeles", "California.SanDiego", "California.SanJose", "California.Campbell", "California.SanFrancisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}
SQLWriter
The SQLWriter
class encapsulates the logic for SQL concatenation and data writing. Note that none of the tables are created in advance; they are batch-created using the supertable as a template when a table-not-found exception occurs, and the INSERT statement is then re-executed. For other exceptions, the SQL statement executed at that time is simply logged, and you can log more clues for error diagnosis and fault recovery.
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Maximum SQL length.
*/
private int maxSQLLength = 800_000;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + value);
} else {
tbValues.put(tbName, value);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
String q = tableName + " values " + values + " ";
if (sb.length() + q.length() > maxSQLLength) {
executeSQL(sb.toString());
logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
sb = new StringBuilder("INSERT INTO ");
}
sb.append(q);
}
executeSQL(sb.toString());
tbValues.clear();
bufferedCount = 0;
}
private void executeSQL(String sql) throws SQLException {
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x2603) {
// Table does not exist
createTables();
executeSQL(sql);
} else {
logger.error("Execute SQL: {}", sql);
throw e;
}
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}
DataBaseMonitor
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public long count() throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters")) {
result.next();
return result.getLong(1);
}
}
public long getTableCount() throws SQLException {
try (ResultSet result = stmt.executeQuery("select count(*) from information_schema.ins_tables where db_name = 'test';")) {
result.next();
return result.getLong(1);
}
}
}
Execution Steps
Running the Java Example Program
Before running the program, configure the environment variable TDENGINE_JDBC_URL
. If the TDengine Server is deployed locally and the username, password, and port are the default values, configure it as follows:
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
Running the Example Program in a Local IDE
-
Clone the TDengine repository
git clone git@github.com:taosdata/TDengine.git --depth 1
-
Open the
docs/examples/java
directory in the IDE. -
Configure the environment variable
TDENGINE_JDBC_URL
in the development environment. If you have already set a global environment variable forTDENGINE_JDBC_URL
, you can skip this step. -
Run the class
com.taos.example.highvolume.FastWriteExample
.
Running the Example Program on a Remote Server
To run the example program on a server, follow these steps:
-
Package the example code. Execute the following in the directory
TDengine/docs/examples/java
:mvn package
-
Create an
examples
directory on the remote server:mkdir -p examples/java
-
Copy the dependencies to the specified directory on the server:
-
Copy the dependency packages (only do this once)
scp -r ./target/lib <user>@<host>:~/examples/java
-
Copy the jar file for this program (need to copy each time the code is updated)
scp -r ./target/javaexample-1.0.jar <user>@<host>:~/examples/java
-
-
Configure the environment variable. Edit
~/.bash_profile
or~/.bashrc
to add the following content, for example:export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
The above uses the default JDBC URL for a locally deployed TDengine Server. You need to modify it according to your actual situation.
-
Use the Java command to start the example program with the command template:
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <write_thread_count> <total_table_count> <max_batch_size>
-
Terminate the test program. The test program will not automatically end. After achieving a stable write speed under the current configuration, press CTRL + C to terminate the program. Below is an actual run log output, with a machine configuration of 16 cores + 64G + SSD.
root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
When using Python connectors for multi-process connections to TDengine, there is a limitation: connections cannot be established in the parent process; all connections must be created in the child processes. If a connection is established in the parent process, and then connections are created in the child processes, it will cause a blockage. This is a known issue.