Skip to content

Windows

Stream Processor windows.

kxi.sp.window.CountWindow (Window)

kxi.sp.window.GlobalWindow (Window)

kxi.sp.window.SlidingWindow (Window)

kxi.sp.window.TimerWindow (Window)

kxi.sp.window.TumblingWindow (Window)

kxi.sp.window.Window (Operator)

kxi.sp.window.timer

Aggregate the stream into windows by processing time.

Parameters:

Name Type Description Default
period Union[datetime.timedelta, numpy.timedelta64]

The frequency at which windows should fire.

required
count_trigger int

The number of buffered records at which the buffer will be flushed automatically.

9223372036854775807
skip_empty_windows bool

True to only emit non-empty windows.

False

Returns:

Type Description
Window

A timer window, which can be joined to other operators or pipelines.

kxi.sp.window.sliding

Aggregate the stream into potentially overlapping windows based on event time.

Parameters:

Name Type Description Default
period Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

The frequency at which windows should fire.

required
duration Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

The length of a window.

required
time_column Union[str, pykx.wrappers.SymbolAtom]

Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument.

None
lateness Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

The time delay before emitting a window to allow late events to arrive.

(0, 's')
passthrough bool

Whether to send late events through the pipeline with the next batch rather than dropping them.

False
sort bool

Whether to sort the window in ascending time order.

False
count_trigger int

The number of buffered records at which the buffer will be flushed automatically.

9223372036854775807
time_assigner Union[str, Callable]

A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument.

None
skip_empty_windows bool

True to only emit non-empty windows. This can increase performance on sparse historical data.

False

Returns:

Type Description
Window

A sliding window, which can be joined to other operators or pipelines.

kxi.sp.window.tumbling

Aggregate stream into non-overlapping windows based on event time.

Parameters:

Name Type Description Default
period Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

The frequency at which windows should fire.

required
time_column Union[str, pykx.wrappers.SymbolAtom]

Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument.

None
lateness Union[datetime.timedelta, numpy.timedelta64, pykx.wrappers.TimespanAtom, kxi.sp.types.TimedeltaSpec]

The time delay before emitting a window to allow late events to arrive.

(0, 's')
passthrough bool

True to send late events through the pipeline with the next batch rather than dropping them.

False
sort bool

True to sort the window in ascending time order.

False
count_trigger int

The number of buffered records at which the buffer will be flushed automatically.

9223372036854775807
time_assigner Union[str, Callable]

A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument.

None
skip_empty_windows bool

True to only emit non-empty windows. This can increase performance on sparse historical data.

False

Returns:

Type Description
Window

A tumbling operator, which can be joined to other operators or pipelines.

kxi.sp.window.count

Split the stream into evenly sized windows.

Parameters:

Name Type Description Default
size int

The exact number of records to include in each window.

required
frequency Optional[int]

The number of records between the starts of consecutive windows. If this is less than size, the windows will overlap. If None, it defaults to the size argument.

None

Returns:

Type Description
Window

A count operator, which can be joined to other operators or pipelines.

kxi.sp.window.global_window

Aggregate the stream using a custom trigger.

This window breaks the naming conventions of kxi.sp

Defining/using a module attribute named global would be a syntax error, so this function is named global_window as a workaround.

Parameters

trigger: A function that splits the stream (see below).

Returns

A global window, which can be joined to other operators or pipelines.

The trigger function is passed the following parameters:

- the operator's id
- the buffered records
- an offset of where the current batch starts
- the current batch's metadata
- the current batch's data

As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:

  • If the trigger function returns an empty list or generic null, the incoming batch will be buffered and nothing will be emitted.
  • If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.

Last data batch

The last list will remain in the buffer. This last list can be emitted by returning the count of the buffer as the last index. To map indices in the current batch to indices in the buffer, add the offset parameter to the indices.

The buffered records cannot be modified from the trigger function.

Batches with mixed schemas are only supported when using the mixed_schemas option.

Caveat when using mixed_schemas

When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.