Stream Processing
TDengine includes stream processing as a built-in component. You define real-time stream transformations by using SQL statements. Data written to the source table of the stream is then automatically processed in the specified manner and written to the target supertable based on the specified trigger mode. This provides a lightweight alternative to complex stream processing systems while delivering results in milliseconds even under high-throughput conditions.
Streams can include data filtering, scalar functions (including UDFs), and windowing. The source table of a stream can be a supertable, subtable, or basic table, but the target must be a supertable. You can use the PARTITION BY
clause to partition data by table name or tag, and each partition is written to a different subtable in the target supertable.
Streams can aggregate data from supertables distributed across multiple nodes and can handle out-of-order data ingestion. You can specify a tolerance for out-of-order data by using a watermark and decide whether to discard or recompute such data with the IGNORE EXPIRED
option.
Managing Streams
For information about creating and managing streams, see Manage Streams.
Partitioning in Streams
You can use the PARTITION BY
clause with the tbname
pseudocolumn, tag columns, regular columns, or expressions to perform partitioned computations in a stream. Each partition has its own independent timeline and time window, and data is aggregated separately and written to different subtables in the target supertable. In a stream without a PARTITION BY
clause, all data is written to the same subtable.
A group ID is automatically generated for each partition. By default, the subtables created by a stream are named with this group ID. You can use the SUBTABLE
clause to generate custom names for the subtable for each partition. For example:
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
This statement creates subtables using the naming convention new-<subtable-name>_<supertable-name>_<group-id>
.
Prior to TDengine 3.2.3.0, the supertable name and group ID were not appended to the name defined in the SUBTABLE
clause. Therefore the naming convention in this example would be new-<subtable-name>
in earlier versions.
tname
is an alias oftbname
for use in expressions within theSUBTABLE
clause.- Subtable names that exceed the table name limit of 192 bytes are truncated.
- If the generated subtable name is not unique within the database, it will fail to be created and data will not be written to it.
Handling Historical Data
By default, a stream processes only data ingested after the stream is created. If you want a stream to process pre-existing data, you can specify the FILL_HISTORY 1
parameter. This parameter enables streams to process data ingested at any time before, during, or after the creation of the stream.
For example, the following SQL statement creates a stream that counts the number of records generated by all smart meters every 10 seconds, including all historical data:
CREATE STREAM IF NOT EXISTS count_history_s FILL_HISTORY 1 INTO count_history AS SELECT COUNT(*) FROM power.meters INTERVAL(10s);
You can also specify a time range. For example, the following SQL statement processes records after January 30, 2020:
CREATE STREAM IF NOT EXISTS count_history_s FILL_HISTORY 1 INTO count_history AS SELECT COUNT(*) FROM power.meters WHERE ts > '2020-01-30' INTERVAL(10s);
The following statement processes records between January 30, 2020 and January 1, 2023. Note that you can specify an end time in the future.
CREATE STREAM IF NOT EXISTS count_history_s FILL_HISTORY 1 INTO count_history AS SELECT COUNT(*) FROM power.meters WHERE ts > '2020-01-30' AND ts < '2023-01-01' INTERVAL(10s);
A stream can process a maximum of 20 million records. Exceeding this limit will cause an error.
Trigger Modes
You use the TRIGGER
directive to specify when stream processing occurs for windowed computations:
AT_ONCE
: Triggered immediately upon ingestion.WINDOW_CLOSE
: Triggered when the window closes, with optional watermark.MAX_DELAY time
: Triggered when the specified time elapses or the window closes, whichever is earlier.
The default value is WINDOW_CLOSE
.
Note that non-windowed computations are processed in real time.
Watermark
The time at which a window closes is determined by the event time, which is the primary key (timestamp) of the record ingested. This prevents problems caused by discrepancies between client and server times and addresses challenges such as out-of-order data ingestion.
You can specify a watermark to define the upper threshold of out-of-order data in your stream. The default value is 0, indicating that out-of-order data is not processed.
When data is ingested, the window closure time is calculated as . All windows whose end time is earlier than are then closed. This process is described in the following figure.
In the diagram, the vertical axis represents time, while the dots on the horizontal axis represent the received data points.
- At time , the 7th data point arrives. The calculated time falls within the second window, so the second window does not close.
- At time , the 6th and 8th data points are delayed. Since the latest event has not changed, also remains unchanged, and the out-of-order data in the second window is processed.
- At time , the 10th data point arrives, and moves past the closure time of the second window, which is then closed, allowing the out-of-order data to be correctly processed.
For streams whose trigger mode is WINDOW_CLOSE
or MAX_DELAY
, window closure triggers computation. However, streams in AT_ONCE
mode compute results immediately upon data ingestion regardless of window closure.
Handling Expired Data
Data that is ingested into a closed window is considered to be expired. You can specify the IGNORE EXPIRED
parameter to determine how to handle expired data:
IGNORE EXPIRED 0
: Recalculate the latest results taking expired data into account.IGNORE EXPIRED 1
: Ignore expired data.
The default value is IGNORE EXPIRED 1
.
Ensure that an appropriate watermark has been set regardless of how you choose to handle expired data.
Handling Updated Data
You can specify the IGNORE UPDATE
parameter to determine how to handle data that is updated after ingestion:
IGNORE UPDATE 0
: Check for updates and recompute results accordingly.IGNORE UPDATE 1
: Do not check for updates.
The default value is IGNORE UPDATE 0
.
Writing to an Existing Supertable
Generally, the results of stream processing are stored in new supertables. If it is necessary to write results to an existing supertable, ensure that the columns in the supertable correspond exactly to the results of the subquery in your stream.
When writing to an existing supertable, note the following:
- If the data types of the columns in the subquery results do not match those of the target supertable, the system will automatically convert them to the types specified in the supertable. If the length of the resultant data exceeds 4096 bytes, an error will occur.
- If the number and position of the columns in the subquery results do not match those of the target supertable, you must explicitly specify the relationships between columns.
- Multiple streams cannot write to the same target supertable.
Customizing Tag Values for Target Tables
You can specify custom tag values for the subtable corresponding to each partition. The syntax is described as follows:
CREATE STREAM output_tag TRIGGER AT_ONCE INTO output_tag_s TAGS(alias_tag varchar(100)) AS SELECT _wstart, COUNT(*) FROM power.meters PARTITION BY CONCAT("tag-", tbname) AS alias_tag INTERVAL(10s);
In the PARTITION BY
clause, an alias alias_tag
is defined for CONCAT("tag-", tbname)
, corresponding to the custom tag name of the supertable output_tag_s
. In this example, the tag of the newly created subtables for the stream will have the prefix tag-
concatenated with the original table name as the tag value.
When defining custom tag values, note the following:
- If the data types of the defined tags do not match those of the target supertable, the system will automatically convert them to the types specified in the supertable. If the length of the resultant data exceeds 4096 bytes, an error will occur.
- If the number and position of the defined tags do not match those of the target supertable, you must explicitly specify the relationships between the defined tags and the tag columns in the target supertable.