TDengine Java Client Library
taos-jdbcdriver
is the official Java client library for TDengine. Java developers can use it to develop applications that access data in TDengine. taos-jdbcdriver
implements standard JDBC driver interfaces.
TDengine's JDBC driver implementation is as consistent as possible with the relational database driver. Still, there are differences in the use scenarios and technical characteristics of TDengine and relational object databases. So 'taos-jdbcdriver' also has some differences from traditional JDBC drivers. It is important to keep the following points in mind:
- TDengine does not currently support delete operations for individual data records.
- Transactional operations are not currently supported.
Connection types
taos-jdbcdriver
mainly provides 3 connection types, among which we recommend using websocket connection.
- Native connection, which connects to TDengine instances natively through the TDengine client driver (taosc), supporting data writing, querying, subscriptions, schemaless writing, and bind interface.
- REST connection, which is implemented through taosAdapter. Some features like schemaless and subscriptions are not supported.
- Websocket connection which is implemented through taosAdapter. The set of features implemented by the WebSocket connection differs slightly from those implemented by the native connection.
For a detailed introduction of the connection types, please refer to: Establish Connection
Compatibility with JDBC and JRE Versions
- JDBC: implements JDBC 4.2 version, some features like schemaless and data subscription are provided individually.
- JRE: supports JRE 8 or higher.
Supported platforms
Native connections are supported on the same platforms as the TDengine client driver. REST connection supports all platforms that can run Java.
Recent update logs
taos-jdbcdriver version | major changes | TDengine version |
---|---|---|
3.3.2 | 1. Optimized websocket prepareStatement performance; 2. Improved mybatis support | - |
3.3.0 | 1. Optimized data transmission performance under Websocket connection; 2. SSL validation skipping is supported but disabled by default | 3.3.2.0 or later |
3.2.11 | Fixed the result set closing bug when using a native connection. | - |
3.2.10 | 1. Automatic compression/decompression for data transmission, disabled by default; 2.Automatic reconnection for websocket with configurable parameter, disabled by default; 3. A new method for schemaless writing is added in the connection class; 4. Optimized performance for data fetching on native connection; 5. Fixing for some known issues; 6. The list of supported functions can be returned by the API for retrieving metadata | - |
3.2.9 | Fixed websocket prepareStatement closing bug. | - |
3.2.8 | Improved autocommit, fixed commit offset on websocket connection bug, websocket prepareStatement uses one connection and meta data supports view. | - |
3.2.7 | Support VARBINARY and GEOMETRY types, and add time zone support for native connections. Support websocket auto reconnection | 3.2.0.0 or later |
3.2.5 | Subscription add committed() and assignment() method | 3.1.0.3 or later |
3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | - |
3.2.3 | Fixed resultSet data parsing failure in some cases | - |
3.2.2 | Subscription add seek function | 3.0.5.0 or later |
3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
3.2.0 | This version has been deprecated | - |
3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
3.0.1 - 3.0.4 | Fixed the issue of result set data sometimes parsing incorrectly. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
2.0.42 | Fix wasNull interface return value in WebSocket connection | - |
2.0.41 | Fix decode method of username and password in REST connection | - |
2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
2.0.38 | JDBC REST connections add bulk pull function | - |
2.0.37 | Support json tags | - |
2.0.36 | Support schemaless writing | - |
Handling exceptions
After an error is reported, the error message and error code can be obtained through SQLException.
try (Statement statement = connection.createStatement();
// executeQuery
ResultSet tempResultSet = statement.executeQuery(sql)) {
// print result
printResult(tempResultSet);
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to execute statement, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
There are four types of error codes that the JDBC client library can report:
- Error code of the JDBC driver itself (error code between 0x2301 and 0x2350),
- Error code of the native connection method (error code between 0x2351 and 0x2360)
- Error code of the consumer method (error code between 0x2371 and 0x2380)
- Error code of other TDengine function modules.
For specific error codes, please refer to.
Error Code | Description | Suggested Actions |
---|---|---|
0x2301 | connection already closed | The connection has been closed, check the connection status, or recreate the connection to execute the relevant instructions. |
0x2302 | this operation is NOT supported currently! | The current interface does not support the connection. You can use another connection mode. |
0x2303 | invalid variables | The parameter is invalid. Check the interface specification and adjust the parameter type and size. |
0x2304 | statement is closed | The statement is closed. Check whether the statement is closed and used again, or whether the connection is normal. |
0x2305 | resultSet is closed | result set The result set is released. Check whether the result set is released and used again. |
0x2306 | Batch is empty! | prepare statement Add parameters and then execute batch. |
0x2307 | Can not issue data manipulation statements with executeQuery() | The update operation should use execute update(), not execute query(). |
0x2308 | Can not issue SELECT via executeUpdate() | The query operation should use execute query(), not execute update(). |
0x230d | parameter index out of range | The parameter is out of bounds. Check the proper range of the parameter. |
0x230e | connection already closed | The connection has been closed. Please check whether the connection is closed and used again, or whether the connection is normal. |
0x230f | unknown sql type in tdengine | Check the data type supported by TDengine. |
0x2310 | can't register JDBC-JNI driver | The native driver cannot be registered. Please check whether the url is correct. |
0x2312 | url is not set | Check whether the REST connection url is correct. |
0x2314 | numeric value out of range | Check that the correct interface is used for the numeric types in the obtained result set. |
0x2315 | unknown taos type in tdengine | Whether the correct TDengine data type is specified when converting the TDengine data type to the JDBC data type. |
0x2317 | wrong request type was used in the REST connection. | |
0x2318 | data transmission exception occurred during the REST connection. Please check the network status and try again. | |
0x2319 | user is required | The user name information is missing when creating the connection |
0x231a | password is required | Password information is missing when creating a connection |
0x231c | httpEntity is null, sql: | Execution exception occurred during the REST connection |
0x231d | can't create connection with server within | Increase the connection time by adding the httpConnectTimeout parameter, or check the connection to the taos adapter. |
0x231e | failed to complete the task within the specified time | Increase the execution time by adding the messageWaitTimeout parameter, or check the connection to the taos adapter. |
0x2350 | unknown error | Unknown exception, please return to the developer on github. |
0x2352 | Unsupported encoding | An unsupported character encoding set is specified under the native Connection. |
0x2353 | internal error of database, please see taoslog for more details | An error occurs when the prepare statement is executed on the native connection. Check the taos log to locate the fault. |
0x2354 | JNI connection is NULL | When the command is executed, the native Connection is closed. Check the connection to TDengine. |
0x2355 | JNI result set is NULL | The result set is abnormal. Please check the connection status and try again. |
0x2356 | invalid num of fields | The meta information of the result set obtained by the native connection does not match. |
0x2357 | empty sql string | Fill in the correct SQL for execution. |
0x2359 | JNI alloc memory failed, please see taoslog for more details | Memory allocation for the native connection failed. Check the taos log to locate the problem. |
0x2371 | consumer properties must not be null! | The parameter is empty when you create a subscription. Please fill in the correct parameter. |
0x2372 | configs contain empty key, failed to set consumer property | The parameter key contains a null value. Please enter the correct parameter. |
0x2373 | failed to set consumer property, | The parameter value contains a null value. Please enter the correct parameter. |
0x2375 | topic reference has been destroyed | The topic reference is released during the creation of the data subscription. Check the connection to TDengine. |
0x2376 | failed to set consumer topic, topic name is empty | During data subscription creation, the subscription topic name is empty. Check that the specified topic name is correct. |
0x2377 | consumer reference has been destroyed | The subscription data transfer channel has been closed. Please check the connection to TDengine. |
0x2378 | consumer create error | Failed to create a data subscription. Check the taos log according to the error message to locate the fault. |
0x2379 | seek offset must not be a negative number | The seek interface parameter cannot be negative. Use the correct parameter |
0x237a | vGroup not found in result set | subscription is not bound to the VGroup due to the rebalance mechanism |
TDengine DataType vs. Java DataType
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Java is as follows:
TDengine DataType | JDBCType |
---|---|
TIMESTAMP | java.sql.Timestamp |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
SMALLINT | java.lang.Short |
TINYINT | java.lang.Byte |
BOOL | java.lang.Boolean |
BINARY | byte array |
NCHAR | java.lang.String |
JSON | java.lang.String |
VARBINARY | byte[] |
GEOMETRY | byte[] |
Note: Only TAG supports JSON types
Due to historical reasons, the BINARY type data in TDengine is not truly binary data and is no longer recommended for use. Please use VARBINARY type instead.
GEOMETRY type is binary data in little endian byte order, which complies with the WKB specification. For detailed information, please refer to Data Type
For WKB specifications, please refer to Well Known Binary (WKB)
For Java connector, the jts library can be used to easily create GEOMETRY type objects, serialize them, and write them to TDengine. Here is an example Geometry example
Installation Steps
Pre-installation preparation
Before using Java client library to connect to the database, the following conditions are required.
- Java 1.8 or above runtime environment and Maven 3.6 or above installed
- TDengine client driver installed (required for native connections, not required for REST connections), please refer to Install Client Driver
Install the client library
- Install via Maven
- Build from source code
taos-jdbcdriver has been published on the Sonatype Repository and synchronized to other major repositories.
Add following dependency in the pom.xml
file of your Maven project:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.2</version>
</dependency>
You can build Java client library from source code after cloning the TDengine project:
git clone https://github.com/taosdata/taos-connector-jdbc.git
cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true
After you have compiled taos-jdbcdriver, the taos-jdbcdriver-3.2.*-dist.jar
file is created in the target directory. The compiled JAR file is automatically stored in your local Maven repository.
Establishing a connection
TDengine's JDBC URL specification format is:
jdbc:[TAOS|TAOS-RS]://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]
For establishing connections, native connections differ slightly from REST connections.
Note: adding batchfetch
to the REST connection and setting it to true will enable the WebSocket connection.
- native connection
- REST connection
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/power?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
In the above example, TSDBDriver, which uses a JDBC native connection, establishes a connection to a hostname taosdemo.com
, port 6030
(the default port for TDengine), and a database named power
. In this URL, the user name user
is specified as root
, and the password
is taosdata
.
Note: With JDBC native connections, taos-jdbcdriver relies on the client driver (libtaos.so
on Linux; taos.dll
on Windows; libtaos.dylib
on macOS).
The configuration parameters in the URL are as follows:
- user: Log in to the TDengine username. The default value is 'root'.
- password: User login password, the default value is 'taosdata'.
- cfgdir: client configuration file directory path, default '/etc/taos' on Linux OS, 'C:/TDengine/cfg' on Windows OS, '/etc/taos' on macOS.
- charset: The character set used by the client, the default value is the system character set.
- locale: Client locale, by default, use the system's current locale.
- timezone: The time zone used by the client, the default value is the system's current time zone.
- batchfetch: true: pulls result sets in batches when executing queries; false: pulls result sets row by row. The default value is true. Enabling batch pulling and obtaining a batch of data can improve query performance when the query data volume is large.
- batchErrorIgnore:true: When executing statement executeBatch, if there is a SQL execution failure in the middle, the following SQL will continue to be executed. false: No more statements after the failed SQL are executed. The default value is: false.
**Connect using the TDengine client-driven configuration file **
When you use a JDBC native connection to connect to a TDengine cluster, you can use the TDengine client driver configuration file to specify parameters such as firstEp
and secondEp
of the cluster in the configuration file as below:
- Do not specify hostname and port in Java applications.
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://:/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
- specify the firstEp and the secondEp in the configuration file taos.cfg
# first fully qualified domain name (FQDN) for TDengine system
firstEp cluster_node1:6030
# second fully qualified domain name (FQDN) for TDengine system, for cluster only
secondEp cluster_node2:6030
# default system charset
# charset UTF-8
# system locale
# locale en_US.UTF-8
In the above example, JDBC uses the client's configuration file to establish a connection to a hostname cluster_node1
, port 6030, and a database named power
. When the firstEp node in the cluster fails, JDBC attempts to connect to the cluster using secondEp.
In TDengine, as long as one node in firstEp and secondEp is valid, the connection to the cluster can be established normally.
The configuration file here refers to the configuration file on the machine where the application that calls the JDBC client library is located, the default path is /etc/taos/taos.cfg
on Linux, the default path is C://TDengine/cfg/taos.cfg
on Windows, and the default path is /etc/taos/taos.cfg
on macOS.
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/power?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
In the above example, a RestfulDriver with a JDBC REST connection is used to establish a connection to a database named power
with hostname taosdemo.com
on port 6041
. The URL specifies the user name as root
and the password as taosdata
.
There is no dependency on the client driver when Using a JDBC REST connection. Compared to a JDBC native connection, only the following are required:
- driverClass specified as "com.taosdata.jdbc.rs.RestfulDriver".
- jdbcUrl starting with "jdbc:TAOS-RS://".
- use 6041 as the connection port.
The configuration parameters in the URL are as follows:
- user: Log in to the TDengine username. The default value is 'root'.
- password: User login password, the default value is 'taosdata'.
- batchfetch: true: pulls result sets in batches when executing queries; false: pulls result sets row by row. The default value is: false. batchfetch uses HTTP for data transfer. JDBC REST supports batch pulls. taos-jdbcdriver and TDengine transfer data via WebSocket connection. Compared with HTTP, WebSocket enables JDBC REST connection to support large data volume querying and improve query performance.
- charset: specify the charset to parse the string, this parameter is valid only when set batchfetch to true.
- batchErrorIgnore: true: when executing executeBatch of Statement, if one SQL execution fails in the middle, continue to execute the following SQL. false: no longer execute any statement after the failed SQL. The default value is: false.
- httpConnectTimeout: REST connection timeout in milliseconds, the default value is 60000 ms.
- httpSocketTimeout: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is false.
- messageWaitTimeout: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when batchfetch is true.
- useSSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection.
- httpPoolSize: size of REST concurrent requests. The default value is 20.
Note: Some configuration items (e.g., locale, timezone) do not work in the REST connection.
- Unlike the native connection method, the REST interface is stateless. When using the JDBC REST connection, you need to specify the database name of the table and super table in SQL. For example:
INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW, 10.30000, 219, 0.31000);
- If the dbname is specified in the URL, the JDBC REST connection uses /rest/sql/dbname as the default URL for RESTful requests. In this case, it is not necessary to specify the dbname in SQL. For example, if the URL is
jdbc:TAOS-RS://127.0.0.1:6041/power
, then the SQL can be executed: INSERT INTO d1001 USING meters TAGS(2,'California.SanFrancisco') VALUES (NOW, 10.30000, 219, 0.31000);
Specify the URL and Properties to get the connection
In addition to getting the connection from the specified URL, you can use Properties to specify parameters when the connection is established.
Note:
- The client parameter set in the application is process-level. If you want to update the parameters of the client, you need to restart the application. This is because the client parameter is a global parameter that takes effect only the first time the application is set.
- The following sample code is based on taos-jdbcdriver-3.1.0 version or above.
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty("debugFlag", "135");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
public Connection getRestConn() throws Exception{
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
In the above example, a connection is established to taosdemo.com
, port is 6030/6041, and database named power
. The connection specifies the user name as root
and the password as taosdata
in the URL and specifies the character set, language environment, time zone, and whether to enable bulk fetching in the connProps.The url specifies the user name as root
and the password as taosdata
.
The configuration parameters in properties are as follows.
- TSDBDriver.PROPERTY_KEY_USER: login TDengine user name, default value 'root'.
- TSDBDriver.PROPERTY_KEY_PASSWORD: user login password, default value 'taosdata'.
- TSDBDriver.PROPERTY_KEY_BATCH_LOAD: true: pull the result set in batch when executing query; false: pull the result set row by row. The default value is: false.
- TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNORE: true: when executing executeBatch of Statement, if there is a SQL execution failure in the middle, continue to execute the following sql. false: no longer execute any statement after the failed SQL. The default value is: false.
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR: only works when using JDBC native connection. Client configuration file directory path, default value
/etc/taos
on Linux OS, default valueC:/TDengine/cfg
on Windows OS, default value/etc/taos
on macOS. - TSDBDriver.PROPERTY_KEY_CHARSET: In the character set used by the client, the default value is the system character set.
- TSDBDriver.PROPERTY_KEY_LOCALE: this only takes effect when using JDBC native connection. Client language environment, the default value is system current locale.
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone. Due to historical reasons, we only support some specifications of the POSIX standard, such as UTC-8 (representing timezone Shanghai in China), GMT-7, Europe/Paris.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_POOL_SIZE: size of REST concurrent requests. The default value is 20.
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: Whether to enable compression during transmission. It only takes effect when using REST/Websocket connections. true: enabled, false: disabled. The default is false.
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. It only takes effect when using Websocket connections. true: enabled, false: disabled. The default is false.
Note:Enabling automatic reconnection is only effective for simple SQL statement execution, schemaless writing, and data subscription. It is not effective for parameter binding. Automatic reconnection is only effective for the database specified by parameters when the connection is established, and it is not effective for the
use db
statement to switch databases later.
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: The interval for automatic reconnection retries, in milliseconds, with a default value of 2000. It only takes effect when PROPERTY_KEY_ENABLE_AUTO_RECONNECT is true.
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The number of automatic reconnection retries, with a default value of 3. It only takes effect when PROPERTY_KEY_ENABLE_AUTO_RECONNECT is true.
- TSDBDriver.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Whether to disable SSL certification validation. It only takes effect when using Websocket connections. true: enabled, false: disabled. The default is false.
For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to Client Configuration.
Priority of configuration parameters
If the configuration parameters are duplicated in the URL, Properties, or client configuration file, the priority
of the parameters, from highest to lowest, are as follows:
- JDBC URL parameters, as described above, can be specified in the parameters of the JDBC URL.
- Properties connProps
- the configuration file taos.cfg of the TDengine client driver when using a native connection
For example, if you specify the password as taosdata
in the URL and specify the password as taosdemo
in the Properties simultaneously, JDBC will use the password in the URL to establish the connection.
Usage examples
Create database and tables
// create database
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
// use database
stmt.executeUpdate("USE power");
// create table
stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
Note: If you do not use
USE power
to specify the database, all subsequent operations on the table need to add the database name as a prefix, such as power.meters.
Insert data
// insert data
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
int affectedRows = stmt.executeUpdate(insertQuery);
System.out.println("insert " + affectedRows + " rows.");
NOW is an internal function. The default is the current time of the client's computer.
NOW + 1s
represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years).
Querying data
// query data
ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters");
Timestamp ts;
float current;
String location;
while (resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
location = resultSet.getString("location");
System.out.printf("%s, %f, %s\n", ts, current, location);
}
The query is consistent with operating a relational database. When using subscripts to get the contents of the returned fields, you have to start from 1. However, we recommend using the field names to get the values of the fields in the result set.
Execute SQL with reqId
The reqId is very similar to TraceID in distributed tracing systems. In a distributed system, a request may need to pass through multiple services or modules to be completed. The reqId is used to identify and associate all related operations of this request, allowing us to track and understand the complete execution path of the request. Here are some primary usage of reqId:
- Request Tracing: By associating the same reqId with all related operations of a request, we can trace the complete path of the request within the system.
- Performance Analysis: By analyzing a request's reqId, we can understand the processing time of the request across various services or modules, thereby identifying performance bottlenecks.
- Fault Diagnosis: When a request fails, we can identify the location of the issue by examining the reqId associated with that request.
If the user does not set a reqId, the client library will generate one randomly internally, but it is still recommended for the user to set it, as it can better associate with the user's request.
AbstractStatement aStmt = (AbstractStatement) connection.createStatement();
aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L);
aStmt.executeUpdate("USE power", 2L);
try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) {
while (rs.next()) {
Timestamp timestamp = rs.getTimestamp(1);
System.out.println("timestamp = " + timestamp);
}
}
aStmt.close();
Writing data via parameter binding
TDengine has significantly improved the bind APIs to support data writing (INSERT) scenarios. Writing data in this way avoids the resource consumption of SQL syntax parsing, resulting in significant write performance improvements in many cases.
Note:
- JDBC REST connections do not currently support bind interface
- The following sample code is based on taos-jdbcdriver-3.2.1 version or above
- The setString method should be called for binary type data, and the setNString method should be called for nchar type data
- Do not use
db.?
in prepareStatement when specify the database with the table name, should directly use?
, then specify the database in setTableName, for example:prepareStatement.setTableName("db.t1")
.
- native connection
- WebSocket connection
public class ParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + (numOfSubTable * numOfRow) + " rows to power.meters.");
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
This is the Detailed Example
public class WSParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING power.meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat() * 30);
pstmt.setInt(3, random.nextInt(300));
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
int [] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("Successfully inserted " + exeResult.length + " rows to power.meters.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert to table meters using stmt, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
This is the Detailed Example
The methods to set VALUES columns:
public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException
public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException
public void setTimestamp(int columnIndex, ArrayList<Long> list) throws SQLException
public void setLong(int columnIndex, ArrayList<Long> list) throws SQLException
public void setDouble(int columnIndex, ArrayList<Double> list) throws SQLException
public void setBoolean(int columnIndex, ArrayList<Boolean> list) throws SQLException
public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
Note: both String and byte[] require the user to declare the width of the corresponding column in the size parameter of the table definition
The methods to set TAGS values:
public void setTagNull(int index, int type)
public void setTagBoolean(int index, boolean value)
public void setTagInt(int index, int value)
public void setTagByte(int index, byte value)
public void setTagShort(int index, short value)
public void setTagLong(int index, long value)
public void setTagTimestamp(int index, long value)
public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
Schemaless Writing
TDengine supports schemaless writing. It is compatible with InfluxDB's Line Protocol, OpenTSDB's telnet line protocol, and OpenTSDB's JSON format protocol. For more information, see Schemaless Writing.
- native connection
- WebSocket connection
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String jdbcUrl = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try (Connection connection = DriverManager.getConnection(jdbcUrl)) {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "metric_telnet 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041?user=root&password=taosdata&batchfetch=true";
try(Connection connection = DriverManager.getConnection(url)){
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.out.println("Inserted data with schemaless successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data with schemaless, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
Schemaless with reqId
This reqId can be used to request link tracing.
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
Data Subscription
The TDengine Java client library supports subscription functionality with the following application API.
Create a Topic
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
The preceding example uses the SQL statement SELECT ts, current, voltage, phase, groupid, location FROM meters
and creates a topic named topic_meters
.
Note: the query statement of the topic which can only be a SELECT statement. Only the original data should be queried, and data can only be queried in temporal order.
Create a Consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "client1");
config.setProperty("value.deserializer", "com.taos.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create jni consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
- bootstrap.servers:
ip:port
where the TDengine server is located, orip:port
where the taosAdapter is located if WebSocket connection is used. - enable.auto.commit: Specifies whether to commit automatically.
- group.id: consumer: Specifies the group that the consumer is in.
- value.deserializer: To deserialize the results, you can inherit
com.taosdata.jdbc.tmq.ReferenceDeserializer
and specify the result set bean. You can also inheritcom.taosdata.jdbc.tmq.Deserializer
and perform custom deserialization based on the SQL result set. - td.connect.type: Specifies the type connect with TDengine,
jni
orWebSocket
. default isjni
- httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
- messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
- httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type.
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: Whether to enable compression during transmission. It only takes effect when using Websocket connections. true: enabled, false: disabled. The default is false.
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. It only takes effect when using Websocket connections. true: enabled, false: disabled. The default is false.
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: The interval for automatic reconnection retries, in milliseconds, with a default value of 2000. It only takes effect when PROPERTY_KEY_ENABLE_AUTO_RECONNECT is true.
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The number of automatic reconnection retries, with a default value of 3. It only takes effect when PROPERTY_KEY_ENABLE_AUTO_RECONNECT is true.
For more information, see Consumer Parameters. Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0.
Subscribe to consume data
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
process(bean);
}
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
} finally {
consumer.close();
shutdownLatch.countDown();
}
The parameters of the subscribe method are defined as: a list of topics to subscribe, and it supports subscribing to multiple topics at the same time.
poll
retrieves a single message with each execution, and a single message may contain multiple records.
Assignment subscription Offset
// get topicPartition
Set<TopicPartition> assignment() throws SQLException;
// get offset
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
// Overrides the fetch offsets that the consumer will use on the next poll(timeout).
void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
Example usage is as follows.
String topic = "topic_meters";
Map<TopicPartition, Long> offset = null;
try (TaosConsumer<AbsConsumerLoop.ResultBean> consumer = new TaosConsumer<>(config)) {
consumer.subscribe(Collections.singletonList(topic));
for (int i = 0; i < 10; i++) {
if (i == 3) {
// Saving consumption position
offset = consumer.position(topic);
}
if (i == 5) {
// reset consumption to the previously saved position
for (Map.Entry<TopicPartition, Long> entry : offset.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());
}
}
ConsumerRecords<AbsConsumerLoop.ResultBean> records = consumer.poll(Duration.ofMillis(500));
// you can handle data here
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute consumer functions. server: " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
}
Commit offset
If enable.auto.commit
is false, offset can be submitted manually.
void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
// async commit only support jni connection
void commitAsync(OffsetCommitCallback<V> callback) throws SQLException;
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback<V> callback) throws SQLException;
Close subscriptions
// Unsubscribe
consumer.unsubscribe();
// Close consumer
consumer.close()
For more information, see Data Subscription.
Full Sample Code
- native connection
- WebSocket connection
In addition to the native connection, the Java client library also supports subscribing via websocket.
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create native consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
static private volatile boolean stopThread = false;
static private String groupId = "group1";
static private String clientId = "clinet1";
public static TaosConsumer<ResultBean> getConsumer() throws Exception {
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "clinet1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taos.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
TaosConsumer<ResultBean> consumer= new TaosConsumer<>(config);
System.out.printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"));
return consumer;
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
config.getProperty("bootstrap.servers"),
config.getProperty("group.id"),
config.getProperty("client.id"),
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void pollExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void seekExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("Subscribe topics successfully.");
Set<TopicPartition> assignment = consumer.assignment();
System.out.println("Now assignment: " + JSON.toJSONString(assignment));
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(100));
}
consumer.seekToBeginning(assignment);
System.out.println("Assignment seek to beginning successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to seek offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void commitExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
try {
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// Add your data processing logic here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
System.out.println("Commit offset manually successfully.");
}
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to commit offset, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void unsubscribeExample(TaosConsumer<ResultBean> consumer) throws SQLException {
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
try {
// unsubscribe the consumer
consumer.unsubscribe();
System.out.println("Consumer unsubscribed successfully.");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, %sErrMessage: %s%n",
topics.get(0),
groupId,
clientId,
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
finally {
// close the consumer
consumer.close();
System.out.println("Consumer closed successfully.");
}
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException, InterruptedException {
try {
int i = 0;
while (!stopThread) {
String insertQuery = "INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + " + i + "a, 10.30000, 219, 0.31000) ";
int affectedRows = statement.executeUpdate(insertQuery);
assert affectedRows == 1;
i++;
Thread.sleep(1);
}
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to insert data to power.meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void prepareMeta() throws SQLException {
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create db and table, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
ex.printStackTrace();
throw ex;
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException, InterruptedException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
prepareData();
} catch (SQLException ex) {
System.out.println("Failed to prepare data, ErrCode:" + ex.getErrorCode() + ", ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to prepare data, ErrMessage: " + ex.getMessage());
return;
}
System.out.println("pollDataExample executed successfully.");
});
try {
TaosConsumer<ResultBean> consumer = getConsumer();
pollExample(consumer);
System.out.println("pollExample executed successfully.");
consumer.unsubscribe();
seekExample(consumer);
System.out.println("seekExample executed successfully.");
consumer.unsubscribe();
commitExample(consumer);
System.out.println("commitExample executed successfully.");
consumer.unsubscribe();
unsubscribeExample(consumer);
System.out.println("unsubscribeExample executed successfully");
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
return;
} catch (Exception ex) {
System.out.println("Failed to poll data from topic_meters, ErrMessage: " + ex.getMessage());
return;
}
stopThread = true;
// close the executor, which will make the executor reject new tasks
executor.shutdown();
try {
// wait for the executor to terminate
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
closeConnection();
System.out.println("program end.");
}
}
Note: The value of value.deserializer should be adjusted based on the package path of the test environment.
Use with connection pool
HikariCP
Example usage is as follows.
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
// jdbc properties
config.setJdbcUrl("jdbc:TAOS://127.0.0.1:6030/log");
config.setUsername("root");
config.setPassword("taosdata");
// connection pool configurations
config.setMinimumIdle(10); // minimum number of idle connection
config.setMaximumPoolSize(10); // maximum number of connection in the pool
config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool
config.setMaxLifetime(0); // maximum life time for each connection
config.setIdleTimeout(0); // max idle time for recycle idle connection
config.setConnectionTestQuery("SELECT SERVER_VERSION()"); // validation query
HikariDataSource dataSource = new HikariDataSource(config); // create datasource
Connection connection = dataSource.getConnection(); // get connection
Statement statement = connection.createStatement(); // get statement
// query or insert
// ...
statement.close();
connection.close(); // put back to connection pool
dataSource.close();
}
getConnection(), you need to call the close() method after you finish using it. It doesn't close the connection. It just puts it back into the connection pool. For more questions about using HikariCP, please see the official instructions.
Druid
Example usage is as follows.
public static void main(String[] args) throws Exception {
String url = "jdbc:TAOS://127.0.0.1:6030/log";
DruidDataSource dataSource = new DruidDataSource();
// jdbc properties
dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
dataSource.setUrl(url);
dataSource.setUsername("root");
dataSource.setPassword("taosdata");
// pool configurations
dataSource.setInitialSize(10);
dataSource.setMinIdle(10);
dataSource.setMaxActive(10);
dataSource.setMaxWait(30000);
dataSource.setValidationQuery("SELECT SERVER_VERSION()");
Connection connection = dataSource.getConnection(); // get connection
Statement statement = connection.createStatement(); // get statement
// query or insert
// ...
statement.close();
connection.close(); // put back to connection pool
dataSource.close();
}
For more questions about using druid, please see Official Instructions.
More sample programs
The source code of the sample application is under TDengine/docs/examples/JDBC
:
- JDBCDemo: JDBC sample source code.
- connectionPools: using taos-jdbcdriver in connection pools such as HikariCP, Druid, dbcp, c3p0, etc.
- SpringJdbcTemplate: using taos-jdbcdriver in Spring JdbcTemplate.
- mybatisplus-demo: using taos-jdbcdriver in Springboot + Mybatis.
- springbootdemo: using taos-jdbcdriver in Springboot.
- consumer-demo: consumer TDengine data example, the consumption rate can be controlled by parameters.
Frequently Asked Questions
-
Why is there no performance improvement when using Statement's
addBatch()
andexecuteBatch()
to performbatch data writing/update
?Cause: In TDengine's JDBC implementation, SQL statements submitted by
addBatch()
method are executed sequentially in the order they are added, which does not reduce the number of interactions with the server and does not bring performance improvement.Solution: 1. splice multiple values in a single insert statement; 2. use multi-threaded concurrent insertion; 3. use parameter-bound writing
-
java.lang.UnsatisfiedLinkError: no taos in java.library.path
Cause: The program did not find the dependent native library
taos
.Solution: On Windows you can copy
C:\TDengine\driver\taos.dll
to theC:\Windows\System32
directory, on Linux the following soft link will be createdln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so
will work, on macOS the lib soft link will be/usr/local/lib/libtaos.dylib
. -
java.lang.UnsatisfiedLinkError: taos.dll Can't load AMD 64 bit on a IA 32-bit platform
Cause: Currently, TDengine only supports 64-bit JDK.
Solution: Reinstall the 64-bit JDK.
-
java.lang.NoSuchMethodError: setByteArray
Cause: taos-jbdcdriver 3.* only supports TDengine 3.0 and later.
Solution: Use taos-jdbcdriver 2.* with your TDengine 2.* deployment.
-
java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer; ... taos-jdbcdriver-3.0.1.jar
Cause: taos-jdbcdriver 3.0.1 is compiled on JDK 11.
Solution: Use taos-jdbcdriver 3.0.2.
For additional troubleshooting, see FAQ.
API Reference