To produce value from a constant flow of data, it is necessary to filter, split, and transform this flow into streams, which can then be written to a time-series database (TSDB) or pushed to applications. With TDengine 3.0, you can create streams that define rules for transforming data arriving from a source and then writing it to the sink.
The following SQL statement shows an example of a stream that writes the output of scalar functions to a new supertable.
CREATE STREAM power_stream INTO power_stream_output_stb AS SELECT ts, concat_ws(".", location, TBNAME) AS meter_location, current * voltage * cos(phase) AS active_power, current * voltage * sin(phase) AS reactive_power FROM meters WHERE voltage > 100 PARTITION BY TBNAME
The name of the stream is defined by CREATE STREAM, followed by the sink that defines the stream, and immediately after INTO is a supertable that is automatically created at the same time as the stream is created. Immediately after this is a SELECT query that defines the transform of the stream. Here is a scalar function calculation with filtering.
The create stream statement ends with a PARTITION statement: PARTITION BY TBNAME, which indicates how the stream is partitioned, with data from different partitions being written to different sub-tables in the supertable.
- With a sliding window, aggregating the streams of all sub-tables in the supertable
CREATE STREAM current_stream TRIGGER AT_ONCE INTO current_stream_output_stb AS SELECT _wstart as start, _wend as end, max(current) as max_current FROM meters WHERE voltage <= 220 INTEVAL (5s) SLIDING (1s);
Let’s look at the second example, which in this case is a sliding window aggregation. After creating the stream, the trigger mode for the stream is specified via TRIGGER AT_ONCE. AT_ONCE mode means that when data is written, the computation is triggered immediately and the result is pushed without waiting for the window to close.
- The subtables in the supertable are aggregated individually in a session window
CREATE STREAM current_stream TRIGGER WINDOW_CLOSE WATERMARK 60s INTO current_stream_output_stb AS SELECT _wstart as start, _wend as end, avg(current) as avg_current FROM meters SESSION(ts, 30s) PARTITION BY TBNAME
The third example is the session window. The stream supports a session window and a state window in addition to a tumble window and a sliding window (hop window), which are defined exactly the same as the normal queries in TDengine. Here again, we introduce the PARTITION BY TBNAME clause, which indicates that each sub-table computes the session window independently and writes the result to the destination table.
As we can see from the specific implementations of the three triggers above, there are several important elements to creating a stream.
- The source table of the stream and its transformations: this is a SELECT query
- Destination table for the stream: this is an automatically created supertable
- Partitioning of streams: sub-tables are automatically created based on different PARTITION and written to the corresponding sub-tables.
Defining a stream is defining a data transformation, and the transformation of the data with ETL is done internally in TDengine.
In the example above, the live stream is pushed into TDengine, but not directly to the application. This is because the new paradigm brings a new solution, but it also brings the additional overhead of application transformation. If the application needs to process the live streams pushed by TDengine, it will incur a higher cost; instead it can keep the old Query get results paradigm, for example, replacing complex and time-consuming queries with simple queries on stream aggregated results, maximizing the convenience of streams.
However, there are times when applications still need to fetch data with low latency, such as when implementing monitoring alarms and anomaly detection, in which case the streams need to actually reach the application, and TDengine provides data subscription capabilities to meet this need.
From a stream perspective, we can also reinterpret the data subscription feature provided by TDengine. The purpose of data subscription is to push written data to consumers as streams, which need to be persisted and read on demand due to the limited processing power of the application, which corresponds to the feature in Kafka that messages can be stacked infinitely.
The data stream that drops the disk in TDengine is the WAL, and we materialize the writes, i.e., persistently drop the disk down, which is the storage in the message queue. From this point on, we transform WAL into a real storage engine, providing flexible and configurable deletion and file switching policies, and build indexes, and then interface to the query engine.
Data subscriptions use the CREATE TOPIC syntax to generate streams of data from the WAL, providing a message queue-like interface to both user-created tables and SINK tables of the stream. In concrete terms, scalar functions, and filtering can extract data from the WAL and transform it so that the transformation actually produces a live stream that is then pushed to the application.
Stream engine implementation
After explaining what TDengine’s stream engine is and the relationship between data subscriptions and streams, let’s take a look at how TDengine’s stream processing is implemented. Of course, there are a lot of details and a lot of content, so due to time constraints, I will only pick the 3 most important parts, which I think are the most interesting parts to explain, that is, “event-driven”, “incremental computation” and “Chaotic Processing”.
As we have already mentioned, one of the biggest differences between stream computing and continuous query is that stream computing can support event-driven, i.e., every arrival of data triggers the computation, a feature that allows us to compute scalar functions for data cleaning and preprocessing; and to provide AT_ONCE trigger mode for window aggregation, eliminating the need to wait for windows to close, thus supports both session and state windows. The bearer of event-driven execution is Stream Task, so let’s look at how Stream Task is deployed.
The first example is an aggregation of scalar functions with partition by tbname, where each vnode from the source DB is aggregated individually and distributed to the target DB, where the Stream Task is responsible for writing the data to the corresponding sub-table.
As shown in the diagram, streams can span DBs, and different DBs represent different data preservation lifecycles, and the three Stream Tasks of Source DB represent the three vnodes deployed in them. The data can be aggregated in the vnode of the Source DB without going through the aggregation node, and then sent to the target DB through the Stream Task.
The second example of supertable aggregation is a distributed aggregation that aggregates all the sub-tables in the supertable together. It requires the deployment of an aggregation Stream Task to aggregate the data from each vnode. In the concrete implementation, the data is aggregated at the source vnode at the first level, the data from the first level aggregation is pushed to the second level node for the second level aggregation, and the aggregated results are pushed to the target DB on demand based on the trigger pattern.
So what do each of these two levels of aggregation mean? We’ll cover this in more detail in the Incremental Computation section. But before that, let’s zoom in and take a look at the exact structure inside Stream Task.
At execution time, the streaming framework distributes the data in the Output Queue to the downstream Stream Task and notifies the stream’s execution scheduler to dispatch the idle stream threads to trigger the computation.
Inside the Stream Task, the concrete computation is performed by a series of stateful stream operators. When a stream is created, the SQL is parsed into a syntax tree, and the planner splits the syntax tree into multiple pipelines, each of which is a series of concatenated stream operators. As we can see, the biggest difference in the plan level is that the Exchange Operator is removed, all pipelines are made standalone, and data is exchanged between pipelines in push mode. This not only minimizes blocking during stream execution, but also reduces inefficient execution scheduling, since streams no longer need to first schedule the parent Stream Task to pull data from the child Stream Task when an event arrives.
Stream operators, on the other hand, are stateful operators that store the back end in a Stream Task with a stream of state, and overflow to the hard disk when the state data in memory becomes too large.
Stateful incremental calculation
The stream computation can be divided into many kinds depending on the nature of the function, such as invertible, holistic, etc. Here we only discuss how the computation is implemented for incremental.
There is no need to introduce a complex series of mathematical formulas or algebraic structures first; just the simplest example of calculating an average to demonstrate the process of incremental computation.
For the graph on the left, data 1, 2, and 3, the average is computed as 2. When new data arrives, suppose it is 4. Then 4 cannot be computed incrementally with result 2. To compute incrementally, then we need to extract a state vector that records the Sum and Count of the data. the state vector is maintained in the state store of the operator, and when new data arrives, the new data is directly mapped into a vector in the state space, which defines the synthesis operation that eventually yields a new state vector. When the final result is needed, the final result is computed from the state vector, as shown above for 10 / 4 = 2.5.
Let’s abstract the above process by defining “map the original data into the state space” as Lift, “combine the vectors in the state space” as Combine, and “extract the result from the state vector” as Lower. The memory occupied by the state vector is constant and will be released when the data is aggregated, so the memory occupation is no longer positively related to the amount of data, but only to the open window data, thus enabling high throughput aggregation in large windows. This enables high throughput aggregation with large windows without causing memory spikes.
At this point, we are able to understand exactly what the two-level aggregation of the distribution we talked about earlier is.
- Batch inserted data will first perform Lift and Combine operations on the vnode where the data was inserted
- For aggregation across multiple vnodes, the aggregation stream task is deployed on a randomly selected vnode, combining the state of the first level of aggregation again
- Execute lower on demand depending on the trigger mode
- Two-level incremental aggregation reduces the amount of data transfer and spreads CPU-intensive computation across nodes
In order to achieve correctness in various scenarios such as disorder, the streaming computation in TDengine 3.0 adopts a processing model based on event time, and Watermark is the upper bound for disorder tolerance, so to understand the processing of disorder data, we first need to understand Watermark.
In the above diagram, the vertical axis represents the wall clock, i.e. the real time. The horizontal axis represents the data arriving in TDengine at the corresponding T1, T2, and T3 times. The blue dots indicate the latest inserted data, and Watermark is the time axis along which to go in the past, subtracting the Watermark time from the last event time to get the time T = latest event time – watermark. All windows with end times earlier than T are closed. These windows are beyond the upper bound of disorder tolerance, and we consider them safe to close without further data insertion.
The trigger mode is WINDOW_CLOSE, MAX_DELAY the data will be pushed at this time. In contrast, in AT_ONCE mode, window closure is not related to result pushing, but only to memory release, since memory is finite and the data stream is unbounded. Thus, for WINDOW CLOSE or MAX DELAY trigger modes, Watermark’s choice is a trade-off between real-time and correctness of results. in cases where data may be disordered, closing the window early means that data is pushed before all results are aggregated, and real-time is often sacrificed to get more correctness, which is to delay the closing of the window according to Watermark.
As for the AT ONCE trigger mode, since there is no longer the problem of constant data pushing, the more important function of Watermark is to keep the window opening and closing in a dynamic balance, making it possible to “use limited memory to deal with the unbounded data flow and the constant addition of windows”. In terms of actual state storage, TDengine 3.0 has already implemented two levels of memory and hard disk, and anything that exceeds the memory can be overflowed to the hard disk.
Even if a Watermark is defined, what about data that is still out of order and exceeds the Watermark? We provide two strategies, directly discard or pull from TSDB (Time-Series Database) and recalculate, corresponding to IGNORE EXPIRED 1 and IGNORE EXPIRED 0. However, pulling from TSDB and recalculating is only applicable to a small amount of disorder, as it brings a reduction in processing speed.
While I’ve made performance metrics the subtitle of today’s talk, it’s the cornerstone of any new application scenario. But again, performance metrics are not a particular thing to look at: the last thing we want our users to care about is performance, because our performance can meet the needs of most scenarios. To that end, we want to verify that TDengine 3.0 can still achieve millisecond latency on an average machine with a million rows of data written per second.
We’ll be releasing benchmarks as they become available and easier for users to use and verify.
Our performance tests will primarily go to verify the following areas.
- Testing the impact of streams on write performance: write latency vs. throughput for the case with and without streams
- Test stream result latency at large write throughput
- Validate several major scenarios such as scalar functions, individual aggregation per sub-table, and multiple vgroup super-table aggregation, respectively
The next optimizations for the TDengine 3.0 streaming engine will be divided into the following areas.
- More comprehensive SQL support: Join / Fill / Group by / Subquery etc.
- Better stream state management so that users no longer need to care about Watermark in AT ONCE mode
- More flexible partitioning mechanism: partition by column / expressions
- Multi-aggregation nodes; independently deployed, store-and-calculate separate streaming computing nodes: SNODE
- Configurable checkpoint
- Benchmark refinement, end-to-end latency metrics, P99 latency metrics
While these follow-on efforts are listed, it is really the users and community developers of TDengine who will determine the development of the streaming compute processing engine. We hope that everyone will actually use TDengine 3.0’s streaming engine and contribute code to it in the open source community, and we will listen to more real-time feedback from our customers and the community.