Windows
Aggregates streams based on event or system time.
Count Window
Splits the stream into evenly sized windows
item | description |
---|---|
Size | The number of records in each window |
Frequency | The number of records between the starts of consecutive window |
Global Window
Global windows aggregate until the custom trigger fires
item | description |
---|---|
Trigger | A function that returns the indices to split the buffer on |
Mixed Schemas | Set to true to allow batches to have different schemas |
Sliding
Aggregates the stream using event time over a given period and duration. Stream aggregation can result in overlapping windows depending on applied event times. A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded.
item | description |
---|---|
Editor | Define Time Assigner Function; a function to extract a vector of event times from a batch. It must return a list of timestamps, e.g. "{x`time}" |
Period | Time between window start times (time period; e.g. 0D00:00:05). |
Duration | How much data should be contained in each window (time period; e.g. 0D00:00:10). |
Lateness | How far past the end of a window the watermark goes before the window is finalized and emitted. Defaults to 0D00:00:00 . A new timestamp will trigger when the timestamp exceeds the sum of the start time, window duration and lateness. |
Snap | When enabled, snaps the window start time to the nearest round number. |
Mixed Schemas | When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema. |
Passthrough | When enabled, will send late events through the pipeline with the next batch rather than dropping them. |
Sort | Sort records in ascending order by timestamp when enabled. Example "A Period of 5 seconds and a Duration of 10 seconds will trigger a window every 5 seconds, containing the previous 10 seconds of data." |
Period and Duration settings
If the Period and Duration time settings are equivalent, then the result is tumbling windows. If the Duration is longer than the Period, the result is overlapping windows. If the Period is longer than the Duration, the result is hopping windows.
Timer
Timer windows aggregate the stream by processing time, with much less overhead than other windowing operations. Any data in the buffer will be emitted each period. As event time is ignored, data will not be dropped for being late. Because these windows ignore event time, this will work on streams that do not have event times. Due to variance in when the timer fires, window durations may not be exactly equal, as is the case for sliding and tumbling windows.
item | description |
---|---|
Period | How often the window should fire (time period; e.g. 0D00:00:05). |
Mixed Schemas | When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema. |
Tumbling
Aggregates the stream using non-overlapping windows based on event time. A window is triggered when a timestamp is encountered past the end of that window. Any timestamps prior to the start of the tumbling window will be discarded.
item | description |
---|---|
Editor | Define Time Assigner Function; a function to extract a vector of event times from a batch. It must return a list of timestamps, example "{x`time}". |
Period | The time between window start times (time period; e.g. 0D00:00:05). |
Lateness | How far past the end of a window the watermark goes before the window is finalized and emitted. Defaults to 0D00:00:00 . A new timestamp will trigger when the timestamp exceeds the sum of the start time, window duration and lateness. |
Snap | When enabled, snaps the window start time to the nearest round number. |
Mixed Schemas | When enabled, allows batches to have different schemas (or allow non-table batches). |
Passthrough | When enabled, will send late events through the pipeline with the next batch rather than dropping them. |
Sort | Sort records in ascending order by timestamp when enabled. |
Global
Aggregates the stream until the custom trigger fires.
item | description |
---|---|
Trigger | A function that returns the indices to split the buffer on. |
Initial State | The initial state that is passed to the trigger function. |
Mixed Schemas | When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema. |
The trigger
is a function that takes a number of parameters:
- State
- A function to persist state
- The buffered records
- An offset of where the current batch starts
- The current batch's metadata
- Current batch's data
As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:
- If the trigger function returns an empty list, the incoming batch will be buffered and nothing will be emitted.
- If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.
Last data batch
The last list will remain in the buffer. This last list can be emitted by returning the count of the buffer as the last index. To map indices in the current batch to indices in the buffer, add the offset
parameter to the indices.
The buffered records cannot be modified from the trigger function.
Batches with mixed schemas are only supported when using the Mixed Schema
option.
Caveat when using Mixed Schemas
When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.
The trigger function is passed state
, which is initially defined by setting the initialState
parameter, and can be modified by passing the new value to the persist
function.
Time Assigner Function
A time assigner function's arguments are the message’s data, and the function is expected to return a list of timestamps with a value for every record in data
.