Windows
Stream Processor windows.
kxi.sp.window.CountWindow (Window)
kxi.sp.window.GlobalWindow (Window)
kxi.sp.window.SlidingWindow (Window)
kxi.sp.window.TimerWindow (Window)
kxi.sp.window.TumblingWindow (Window)
kxi.sp.window.Window (Operator)
kxi.sp.window.timer
Aggregate the stream into windows by processing time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
period |
Union[datetime.timedelta, numpy.timedelta64] |
The frequency at which windows should fire. |
required |
count_trigger |
int |
The number of buffered records at which the buffer will be flushed automatically. |
9223372036854775807 |
skip_empty_windows |
bool |
True to only emit non-empty windows. |
False |
Returns:
Type | Description |
---|---|
Window |
A |
kxi.sp.window.sliding
Aggregate the stream into potentially overlapping windows based on event time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
period |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
The frequency at which windows should fire. |
required |
duration |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
The length of a window. |
required |
time_column |
Union[str, pykx.wrappers.SymbolAtom] |
Name of the column containing the event timestamps. Mutually exclusive with
the |
None |
lateness |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
The time delay before emitting a window to allow late events to arrive. |
(0, 's') |
passthrough |
bool |
Whether to send late events through the pipeline with the next batch rather than dropping them. |
False |
sort |
bool |
Whether to sort the window in ascending time order. |
False |
count_trigger |
int |
The number of buffered records at which the buffer will be flushed automatically. |
9223372036854775807 |
time_assigner |
Union[str, Callable] |
A function which will be called with the data (or the parameters specified
by the |
None |
skip_empty_windows |
bool |
True to only emit non-empty windows. This can increase performance on sparse historical data. |
False |
Returns:
Type | Description |
---|---|
Window |
A |
kxi.sp.window.tumbling
Aggregate stream into non-overlapping windows based on event time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
period |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
The frequency at which windows should fire. |
required |
time_column |
Union[str, pykx.wrappers.SymbolAtom] |
Name of the column containing the event timestamps. Mutually exclusive with
the |
None |
lateness |
Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec] |
The time delay before emitting a window to allow late events to arrive. |
(0, 's') |
passthrough |
bool |
True to send late events through the pipeline with the next batch rather than dropping them. |
False |
sort |
bool |
True to sort the window in ascending time order. |
False |
count_trigger |
int |
The number of buffered records at which the buffer will be flushed automatically. |
9223372036854775807 |
time_assigner |
Union[str, Callable] |
A function which will be called with the data (or the parameters specified
by the |
None |
skip_empty_windows |
bool |
True to only emit non-empty windows. This can increase performance on sparse historical data. |
False |
Returns:
Type | Description |
---|---|
Window |
A |
kxi.sp.window.count
Split the stream into evenly sized windows.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
size |
int |
The exact number of records to include in each window. |
required |
frequency |
Optional[int] |
The number of records between the starts of consecutive windows. If this is less
than |
None |
Returns:
Type | Description |
---|---|
Window |
A |
kxi.sp.window.global_window
Aggregate the stream using a custom trigger.
This window breaks the naming conventions of kxi.sp
Defining/using a module attribute named global
would be a syntax error, so this function is
named global_window
as a workaround.
Parameters
trigger: A function that splits the stream (see below).
Returns
A global
window, which can be joined to other operators or pipelines.
The trigger
function is passed the following parameters:
- the operator's id
- the buffered records
- an offset of where the current batch starts
- the current batch's metadata
- the current batch's data
As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:
- If the trigger function returns an empty list or generic null, the incoming batch will be buffered and nothing will be emitted.
- If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.
Last data batch
The last list will remain in the buffer. This last list can be emitted by returning
the count of the buffer as the last index. To map indices in the current batch to
indices in the buffer, add the offset
parameter to the indices.
The buffered records cannot be modified from the trigger function.
Batches with mixed schemas are only supported when using the mixed_schemas
option.
Caveat when using mixed_schemas
When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.