Windows

Aggregates streams based on event or system time.

Count Window

Splits the stream into evenly sized windows

item description
Size The number of records in each window
Frequency The number of records between the starts of consecutive window

Global Window

Global windows aggregate until the custom trigger fires

item description
Trigger A function that returns the indices to split the buffer on
Mixed Schemas Set to true to allow batches to have different schemas

Sliding

Aggregates the stream using event time over a given period and duration. Stream aggregation can result in overlapping windows depending on applied event times. A window is triggered when a timestamp is encountered past the end of that window. After that, subsequent events with a timestamp within that window will be discarded.

item description
Editor Define Time Assigner Function; a function to extract a vector of event times from a batch. It must return a list of timestamps, e.g. "{xtime}"
Period Time between window start times (time period; e.g. 0D00:00:05).
Duration How much data should be contained in each window (time period; e.g. 0D00:00:10).
Lateness How far past the end of a window the watermark goes before the window is finalized and emitted. Defaults to 0D00:00:00. A new timestamp will trigger when the timestamp exceeds the sum of the start time, window duration and lateness.
Snap When enabled, snaps the window start time to the nearest round number.
Mixed Schemas When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema.
Passthrough When enabled, will send late events through the pipeline with the next batch rather than dropping them.
Sort Sort records in ascending order by timestamp when enabled. Example "A Period of 5 seconds and a Duration of 10 seconds will trigger a window every 5 seconds, containing the previous 10 seconds of data."

Period and Duration settings

If the Period and Duration time settings are equivalent, then the result is tumbling windows. If the Duration is longer than the Period, the result is overlapping windows. If the Period is longer than the Duration, the result is hopping windows.

Timer

Timer windows aggregate the stream by processing time, with much less overhead than other windowing operations. Any data in the buffer will be emitted each period. As event time is ignored, data will not be dropped for being late. Because these windows ignore event time, this will work on streams that do not have event times. Due to variance in when the timer fires, window durations may not be exactly equal, as is the case for sliding and tumbling windows.

item description
Period How often the window should fire (time period; e.g. 0D00:00:05).
Mixed Schemas When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema.

Tumbling

Aggregates the stream using non-overlapping windows based on event time. A window is triggered when a timestamp is encountered past the end of that window. Any timestamps prior to the start of the tumbling window will be discarded.

item description
Editor Define Time Assigner Function; a function to extract a vector of event times from a batch. It must return a list of timestamps, example "{xtime}".
Period The time between window start times (time period; e.g. 0D00:00:05).
Lateness How far past the end of a window the watermark goes before the window is finalized and emitted. Defaults to 0D00:00:00. A new timestamp will trigger when the timestamp exceeds the sum of the start time, window duration and lateness.
Snap When enabled, snaps the window start time to the nearest round number.
Mixed Schemas When enabled, allows batches to have different schemas (or allow non-table batches).
Passthrough When enabled, will send late events through the pipeline with the next batch rather than dropping them.
Sort Sort records in ascending order by timestamp when enabled.

Global

Aggregates the stream until the custom trigger fires.

item description
Trigger A function that returns the indices to split the buffer on.
Initial State The initial state that is passed to the trigger function.
Mixed Schemas When enabled, allows batches to have different schemas (or allow non-table batches). Throughput is higher if batches are tables rather than a list of tuples, and higher again if all tables share the same schema.

The trigger is a function that takes a number of parameters:

• State
• A function to persist state
• The buffered records
• An offset of where the current batch starts
• Current batch's data

As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:

• If the trigger function returns an empty list, the incoming batch will be buffered and nothing will be emitted.
• If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.

Last data batch

The last list will remain in the buffer. This last list can be emitted by returning the count of the buffer as the last index. To map indices in the current batch to indices in the buffer, add the offset parameter to the indices.

The buffered records cannot be modified from the trigger function.

Batches with mixed schemas are only supported when using the Mixed Schema option.

Caveat when using Mixed Schemas

When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.

The trigger function is passed state, which is initially defined by setting the initialState parameter, and can be modified by passing the new value to the persist function.

Time Assigner Function

A time assigner function's arguments are the message’s data, and the function is expected to return a list of timestamps with a value for every record in data.