TDengine Flink Connector
Apache Flink is an open-source distributed stream batch integrated processing framework supported by the Apache Software Foundation, which can be used for many big data processing scenarios such as stream processing, batch processing, complex event processing, real-time data warehouse construction, and providing real-time data support for machine learning. At the same time, Flink has a wealth of connectors and various tools that can interface with numerous different types of data sources to achieve data reading and writing. In the process of data processing, Flink also provides a series of reliable fault-tolerant mechanisms, effectively ensuring that tasks can run stably and continuously even in the event of unexpected situations.
With the help of TDengine's Flink connector, Apache Flink can seamlessly integrate with TDengine Database. It enables efficient and stable reading of massive volumes of data from TDengine Database, based on which comprehensive and in-depth data analysis and processing can be conducted. This fully taps into the potential value of data, providing robust data support and scientific basis for enterprise decision-making, significantly improving the efficiency and quality of data processing, and enhancing enterprises' competitiveness and innovation capabilities in the digital era.
Note: This feature is only available in TDengine Enterprise Edition.
Prerequisites
Prepare the following environment:
- TDengine cluster has been deployed and is running normally (both enterprise and community versions are available)
- TaosAdapter can run normally.
- Apache Flink v1.19.0 or above is installed. Please refer to the installation of Apache Flink Official documents
Supported platforms
Flink Connector supports all platforms that can run Flink 1.19 and above versions.
Version History
Flink Connector Version | Major Changes | TDengine TSDB-Enterprise Version |
---|---|---|
2.1.4 | Upgrade the JDBC driver to version 3.7.3. For details, see JDBC Version History | - |
2.1.3 | Add exception information output for data conversion. | - |
2.1.2 | Add backtick filtering for written fields. | - |
2.1.1 | Fix the issue of data binding failure for the same table in Stmt. | - |
2.1.0 | Fix the issue of writing varchar types from different data sources. | - |
2.0.2 | The Table Sink supports types such as RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, and RowKind.DELETE. | - |
2.0.1 | Sink supports writing types from Rowdata implementations. | - |
2.0.0 | 1.Sink supports custom data structure serialization and writing to TDengine. 2. Supports writing to TDengine database using Table SQL. | 3.3.5.1 and higher |
1.0.0 | Support Sink function to write data from other sources to TDengine in the future. | 3.3.2.0 and higher |
Exception and error codes
After the task execution fails, check the Flink task execution log to confirm the reason for the failure Please refer to:
Error Code | Description | Suggested Actions |
---|---|---|
0xa000 | connection param error | Connector parameter error. |
0xa010 | database name configuration error | database name configuration error. |
0xa011 | table name configuration error | Table name configuration error. |
0xa013 | value.deserializer parameter not set | No serialization method set. |
0xa014 | list of column names set incorrectly | List of column names for target table not set. |
0x2301 | connection already closed | The connection has been closed. Check the connection status or create a new connection to execute the relevant instructions. |
0x2302 | this operation is NOT supported currently | The current interface is not supported, you can switch to other connection methods. |
0x2303 | invalid variables | The parameter is invalid. Please check the corresponding interface specification and adjust the parameter type and size. |
0x2304 | statement is closed | Statement has already been closed. Please check if the statement is closed and reused, or if the connection is working properly. |
0x2305 | resultSet is closed | The ResultSet has been released. Please check if the ResultSet has been released and used again. |
0x230d | parameter index out of range | parameter out of range, please check the reasonable range of the parameter. |
0x230e | connection already closed | The connection has been closed. Please check if the connection is closed and used again, or if the connection is working properly. |
0x230f | unknown SQL type in TDengine | Please check the Data Type types supported by TDengine. |
0x2315 | unknown tao type in TDengine | Did the correct TDengine data type be specified when converting TDengine data type to JDBC data type. |
0x2319 | user is required | Username information is missing when creating a connection. |
0x231a | password is required | Password information is missing when creating a connection. |
0x231d | can't create connection with server within | Increase connection time by adding the parameter httpConnectTimeout, or check the connection status with taosAdapter. |
0x231e | failed to complete the task within the specified time | Increase execution time by adding the parameter messageWaitTimeout, or check the connection with taosAdapter. |
0x2352 | unsupported encoding | An unsupported character encoding set was specified under the local connection. |
0x2353 | internal error of database, Please see taoslog for more details | An error occurred while executing prepareStatement on the local connection. Please check the taoslog for problem localization. |
0x2354 | connection is NULL | Connection has already been closed while executing the command on the local connection. Please check the connection with TDengine. |
0x2355 | result set is NULL | Local connection to obtain result set, result set exception, please check connection status and retry. |
0x2356 | invalid num of fields | The meta information obtained from the local connection result set does not match. |
Data type mapping
TDengine currently supports timestamp, number, character, and boolean types, and the corresponding type conversions with Flink RowData Type are as follows:
TDengine DataType | Flink RowDataType |
---|---|
TIMESTAMP | TimestampData |
INT | Integer |
BIGINT | Long |
FLOAT | Float |
DOUBLE | Double |
SMALLINT | Short |
TINYINT | Byte |
BOOL | Boolean |
VARCHAR | StringData |
BINARY | StringData |
NCHAR | StringData |
JSON | StringData |
VARBINARY | byte[] |
GEOMETRY | byte[] |
Instructions for use
Flink Semantic Selection Instructions
The semantic reason for using At-Least-Once
is:
- TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination.
- Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations.
- Using
At-Least-Once
to ensure high data processing performance and low data latency, the setting method is as follows:
Instructions:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
If using Maven to manage a project, simply add the following dependencies in pom.xml.
<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>2.1.4</version>
</dependency>
Connection Parameters
The parameters for establishing a connection include URL and Properties. The URL specification format is:
jdbc: TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]
Parameter description:
- User: Login TDengine username, default value is' root '.
- Password: User login password, default value 'taosdata'.
- database_name: database name。
- timezone: time zone。
- HttpConnectTimeout: The connection timeout time, measured in milliseconds, with a default value of 60000.
- MessageWaitTimeout: The timeout period for a message, measured in milliseconds, with a default value of 60000.
- UseSSL: Whether SSL is used in the connection.
Source
Source retrieves data from the TDengine database, converts it into a format and type that Flink can handle internally, and reads and distributes it in parallel, providing efficient input for subsequent data processing. By setting the parallelism of the data source, multiple threads can read data from the data source in parallel, improving the efficiency and throughput of data reading, and fully utilizing cluster resources for large-scale data processing capabilities.
Source Properties
The configuration parameters in Properties are as follows:
- TDengineConfigParams.PROPERTY_KEY_USER: Login to TDengine username, default value is 'root '.
- TDengineConfigParams.PROPERTY_KEY_PASSWORD: User login password, default value 'taosdata'.
- TDengineConfigParams.VALUE_DESERIALIZER: The downstream operator receives the result set deserialization method. If the received result set type is
RowData
ofFlink
, it only needs to be set toRowData
. It is also possible to inheritTDengineRecordDeserialization
and implementconvert
andgetProducedType
methods, customizing the deserialization method based onResultSet
ofSQL
. - TDengineConfigParams.TD_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the
TDengine Source
object, it is necessary to specify the data type as aTemplate
form of theSourceRecords
type. - TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: Message timeout time, in milliseconds, default value is 60000.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when
PROPERTY_KEY_ENABLE_AUTO_RECONNECT
is true. - TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when
PROPERTY_KEY_ENABLE_AUTO_RECONNECT
is true. - TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Turn off SSL certificate verification. true: Enable, false: Not enabled. The default is false.
Split by time
Users can split the SQL query into multiple subtasks based on time, entering: start time, end time, split interval, time field name. The system will split and obtain data in parallel according to the set interval (time left closed and right open).
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
Splitting by Super Table TAG
Users can split the query SQL into multiple query conditions based on the TAG field of the super table, and the system will split them into subtasks corresponding to each query condition, thereby obtaining data in parallel.
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, current, voltage, phase, groupid, location from meters where voltage > 100")
.setTagList(Arrays.asList("groupid >100 and location = 'Shanghai'",
"groupid >50 and groupid < 100 and location = 'Guangzhou'",
"groupid >0 and groupid < 50 and location = 'Beijing'"))
.setSplitType(SplitType.SPLIT_TYPE_TAG);
Classify by table
Support sharding by inputting multiple super tables or regular tables with the same table structure. The system will split them according to the method of one table, one task, and then obtain data in parallel.
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSelect("ts, current, voltage, phase, groupid, location")
.setTableList(Arrays.asList("d1001", "d1002"))
.setOther("order by ts limit 100")
.setSplitType(SplitType.SPLIT_TYPE_TABLE);
Use Source connector
The query result is RowData data type example:
RowData Source
static void testSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
return sb.toString();
});
resultStream.print();
env.execute("tdengine flink source");
}
Example of batch query results:
Batch Source
void testBatchSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class;
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass);
DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> {
StringBuilder sb = new StringBuilder();
Iterator<RowData> iterator = records.iterator();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next();
sb.append("ts: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
Example of custom data type query result:
Custom Type Source
void testCustomTypeSource() throws Exception {
System.out.println("testTDengineSourceByTimeSplit start!");
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultSourceDeserialization");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
//按照时间分片
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
TDengineSource<ResultBean> source = new TDengineSource<>(connProps, splitSql, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
- ResultBean is a custom inner class used to define the data type of the Source query results.
- ResultSourceDeserialization is a custom inner class that inherits
TDengine
RecordDesrialization and implements convert and getProducedType methods.
CDC Data Subscription
Flink CDC is mainly used to provide data subscription functionality, which can monitor real-time changes in TDengine database data and transmit these changes in the form of data streams to Flink for processing, while ensuring data consistency and integrity.
Parameter Description
- TDengineCdcParams.BOOTSTRAP_SERVERS:
ip:port
of the TDengine server, if using WebSocket connection, then it is theip:port
where taosAdapter is located. - TDengineCdcParams.CONNECT_USER: Login to TDengine username, default value is 'root '.
- TDengineCdcParams.CONNECT_PASS: User login password, default value 'taosdata'.
- TDengineCdcParams.POLL_INTERVAL_MS: Pull data interval, default 500ms.
- TDengineCdcParams. VALUE_DESERIALIZER: Result set deserialization method, If the received result set type is
RowData
ofFlink
, simply set it to 'RowData'. You can inheritcom.taosdata.jdbc.tmq.ReferenceDeserializer
, specify the result set bean, and implement deserialization. You can also inheritcom.taosdata.jdbc.tmq.Deserializer
and customize the deserialization method based on the SQL resultSet. - TDengineCdcParams.TMQ_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the
TDengineCdcSource
object, it is necessary to specify the data type as a template form of theConsumerRecords
type. - TDengineCdcParams.GROUP_ID: Consumer group ID, the same consumer group shares consumption progress。Maximum length: 192.
- TDengineCdcParams.AUTO_OFFSET_RESET: Initial position of the consumer group subscription (
earliest
subscribe from the beginning,latest
subscribe from the latest data, defaultlatest
)。 - TDengineCdcParams.ENABLE_AUTO_COMMIT: Whether to enable automatic consumption point submission,true: automatic submission;false:submit based on the
checkpoint
time, default to false.
Note:The automatic submission mode of the reader automatically submits data after obtaining it, regardless of whether the downstream operator has processed the data correctly. There is a risk of data loss, and it is mainly used for efficient stateless operator scenarios or scenarios with low data consistency requirements.
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS: Time interval for automatically submitting consumption records, in milliseconds, default 5000. This parameter takes effect when
ENABLE_AUTO_COMMIT
is set to true. - TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when
PROPERTY_KEY_ENABLE_AUTO_RECONNECT
is true. - TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when
PROPERTY_KEY_ENABLE_AUTO_RECONNECT
is true. - TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS: Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0),Default is 12000, range [6000, 1800000].
- TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS: The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) Default is 300000, range [1000, INT32_MAX].
Use CDC connector
The CDC connector will create consumers based on the parallelism set by the user, so the user should set the parallelism reasonably according to the resource situation. The subscription result is RowData data type example:
CDC Source
void testTDengineCdc() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.ENABLE_AUTO_COMMIT, "true");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("tsxx: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(rowData.getInt(2));
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
// The task submitted by Flink UI cannot be cancle and needs to be stopped on the UI page.
jobClient.cancel().get();
}
Example of batch query results:
CDC Batch Source
void testTDengineCdcBatch() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true");
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> {
Iterator<ConsumerRecord<RowData>> iterator = records.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next().value();
sb.append("tsxx: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + rowData.getString(4).toString());
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
Example of custom data type query result:
CDC Custom Type
static void testCustomTypeCdc() throws Exception {
System.out.println("testCustomTypeTDengineCdc start!");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(4);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
- ResultBean is a custom inner class whose field names and data types correspond one-to-one with column names and data types. This allows the deserialization class corresponding to the value.ddeserializer property to deserialize objects of ResultBean type.
Table SQL
Extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.) using Table SQL, perform custom operator operations (such as data cleaning, format conversion, associating data from different tables, etc.), and then load the processed results into the target data source (such as TDengine, MySQL, etc.).
Source connector
Parameter configuration instructions:
Parameter Name | Type | Parameter Description |
---|---|---|
connector | string | connector identifier, set tdengine-connector |
td.jdbc.url | string | url of the connection |
td.jdbc.mode | string | connector type: source , sink |
table.name | string | original or target table name |
scan.query | string | SQL statement to retrieve data |
sink.db.name | string | target database name |
sink.supertable.name | string | name of the supertable |
sink.batch.size | integer | batch size written |
sink.table.name | string | the table name of a sub table or a normal table |
Usage example:
Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
Table Source
static void testTableToSink() throws Exception {
System.out.println("testTableToSink start!");
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
" 'td.jdbc.mode' = 'source'," +
" 'table-name' = 'meters'," +
" 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
}
Table CDC connector
Parameter configuration instructions:
Parameter Name | Type | Parameter Description |
---|---|---|
connector | string | connector identifier, set tdengine-connector |
user | string | username, default root |
password | string | password, default taosdata |
bootstrap. servers | string | server address |
topic | string | subscribe to topic |
td.jdbc.mode | string | connector type: cdc , sink |
group.id | string | consumption group ID, sharing consumption progress within the same consumption group |
auto.offset.reset | string | initial position for consumer group subscription. earliest : subscribe from the beginning latest subscribe from the latest data default latest |
poll.interval_mas | integer | pull data interval, default 500ms |
sink.db.name | string | target database name |
sink.supertable.name | string | name of the supertable |
sink.batch.size | integer | batch size written |
sink.table.name | string | the table name of a sub table or a normal table |
Usage example:
Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
Table CDC
static void testCdcTableToSink() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'bootstrap.servers' = 'localhost:6041'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'group.id' = 'group_22'," +
" 'auto.offset.reset' = 'earliest'," +
" 'enable.auto.commit' = 'false'," +
" 'topic' = 'topic_meters'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARCHAR(255)," +
" groupid INT," +
" tbname VARCHAR(255)" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
Thread.sleep(5000L);
tableResult.getJobClient().get().cancel().get();
}