Skip to main content

Stream Processing

Raw time-series data is often cleaned and preprocessed before being permanently stored in a database. Stream processing components like Kafka, Flink, and Spark are often deployed alongside a time-series database to handle these operations, increasing system complexity and maintenance costs.

Because stream processing is built in to TDengine, you are no longer reliant on middleware. TDengine offers a unified platform for writing, preprocessing, permanent storage, complex analysis, and real-time computation and alerting. Additionally, you can use SQL to perform all these tasks.

Create a Stream

CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
}

The subquery is a subset of standard SELECT query syntax:

subquery: SELECT [DISTINCT] select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]

Session windows, state windows, and sliding windows are supported. When you configure a session or state window for a supertable, you must use PARTITION BY TBNAME.

window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}

SESSION indicates a session window, and tol_val indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by tol_val they belong to the same session window; otherwise a new session window is started automatically.

For example, the following SQL statement creates a stream and automatically creates a supertable named avg_vol. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable.

CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);

Delete a Stream

DROP STREAM [IF EXISTS] stream_name

This statement deletes the stream processing service only. The data generated by the stream is retained.

View Streams

SHOW STREAMS;

Trigger Stream Processing

When you create a stream, you can use the TRIGGER parameter to specify triggering conditions for it.

For non-windowed processing, triggering occurs in real time. For windowed processing, there are three methods of triggering:

  1. AT_ONCE: triggers on write

  2. WINDOW_CLOSE: triggers when the window closes. This is determined by the event time. You can use WINDOW_CLOSE together with watermark. For more information, see Stream Processing Strategy for Out-of-Order Data.

  3. MAX_DELAY: triggers when the window closes. If the window has not closed but the time elapsed exceeds MAX_DELAY, stream processing is also triggered.

Because the window closing is determined by the event time, a delay or termination of an event stream will prevent the event time from being updated. This may result in an inability to obtain the latest results.

For this reason, MAX_DELAY is provided as a way to ensure that processing occurs even if the window does not close.

MAX_DELAY also triggers when the window closes. Additionally, if a write occurs but the processing is not triggered before MAX_DELAY expires, processing is also triggered.

Stream Processing Strategy for Out-of-Order Data

When you create a stream, you can specify a watermark in the stream_option parameter.

The watermark is used to specify the tolerance for out-of-order data. The default value is 0.

T = latest event time - watermark

The window closing time for each batch of data that arrives at the system is updated using the preceding formula, and all windows are closed whose closing time is less than T. If the triggering method is WINDOW_CLOSE or MAX_DELAY, the aggregate result for the window is pushed.

Stream processing strategy for expired data The data in expired windows is tagged as expired. TDengine stream processing provides two methods for handling such data:

  1. Drop the data. This is the default and often only handling method for most stream processing engines.

  2. Recalculate the data. In this method, all data in the window is reobtained from the database and recalculated. The latest results are then returned.

In both of these methods, configuring the watermark is essential for obtaining accurate results (if expired data is dropped) and avoiding repeated triggers that affect system performance (if expired data is recalculated).