Skip to content

kxi.sp.window

Stream Processor windows.

timer

@Window
def timer(period: Union[timedelta, np.timedelta64],
          *,
          count_trigger: int = 2**63 - 1,
          skip_empty_windows: bool = False,
          accept_dictionaries: bool = True) -> Window

Aggregate the stream into windows by processing time.

Arguments:

  • period - The frequency at which windows should fire.
  • count_trigger - The number of buffered records at which the buffer will be flushed automatically.
  • skip_empty_windows - True to only emit non-empty windows.
  • accept_dictionaries - If batches will never be dictionaries, this can be False to increase performance.

Returns:

A timer window, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> from datetime import timedelta
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.timer(period=timedelta(seconds=5), count_trigger=3)
        | sp.write.to_console())

# Execute 3 in a row
>>> kx.q('publish', pd.DataFrame({'x': [1]}))
>>> kx.q('publish', pd.DataFrame({'x': [2]}))
>>> kx.q('publish', pd.DataFrame({'x': [3]}))
Console, all 3 published immediately, empty tables every 5 seconds
                             | x
-----------------------------| -
2023.10.11D13:23:18.819022773| 1
2023.10.11D13:23:18.819022773| 2
2023.10.11D13:23:18.819022773| 3
| x
| -
...
>>> # execute 2 in a row
>>> kx.q('publish', pd.DataFrame({'x': [4]}))
>>> kx.q('publish', pd.DataFrame({'x': [5]}))
Console, publishes 4,5 after 5 seconds, empty tables every 5 seconds
                             | x
-----------------------------| -
2023.10.11D13:24:30.116093087| 4
2023.10.11D13:24:30.116093087| 5
| x
| -
...

sliding

@Window
def sliding(period: Timedelta,
            duration: Timedelta,
            time_column: Optional[Union[str, kx.SymbolAtom]] = None,
            *,
            lateness: Timedelta = (0, 's'),
            passthrough: bool = False,
            sort: bool = False,
            count_trigger: int = 2**63 - 1,
            time_assigner: Optional[Union[str, Callable]] = None,
            skip_empty_windows: bool = False,
            accept_dictionaries: bool = True) -> Window

Aggregate the stream into potentially overlapping windows based on event time.

Arguments:

  • period - The frequency at which windows should fire.
  • duration - The length of a window.
  • time_column - Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument.
  • lateness - The time delay before emitting a window to allow late events to arrive.
  • passthrough - Whether to send late events through the pipeline with the next batch rather than dropping them.
  • sort - Whether to sort the window in ascending time order.
  • count_trigger - The number of buffered records at which the buffer will be flushed automatically.
  • time_assigner - A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument.
  • skip_empty_windows - True to only emit non-empty windows. This can increase performance on sparse historical data.
  • accept_dictionaries - If batches will never be dictionaries, this can be False to increase performance.

Returns:

A sliding window, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random

>>> df = pd.DataFrame({
        'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
        'data': [random.uniform(0, 1) for _ in range(30)]
        })

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.sliding(period=datetime.timedelta(seconds=1),
                duration=datetime.timedelta(seconds=30), time_column="time")
        | sp.write.to_console())

>>> kx.q('publish', df)
Console, total of 30 windows, first has all 30 entries, 2nd has 29, etc down to 3,2,1 shown
...
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:07.330559000 0.4034322
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:08.330559000 0.4260979
2023.10.11D13:54:06.933491844| 2023.10.11D13:44:09.330559000 0.570041
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D13:54:06.933550232| 2023.10.11D13:44:08.330559000 0.4260979
2023.10.11D13:54:06.933550232| 2023.10.11D13:44:09.330559000 0.570041
                             | time                          data
-----------------------------| --------------------------------------
2023.10.11D13:54:06.933634069| 2023.10.11D13:44:09.330559000 0.570041
>>> sp.teardown()
>>> sp.run(sp.read.from_callback('publish')
        | sp.window.sliding(period=datetime.timedelta(seconds=5),
                duration=datetime.timedelta(seconds=10), time_column="time")
        | sp.write.to_console())

>>> kx.q('publish', df)
Console, 7 overlapping windows, each beginning 5 seconds after previous window, up to 10 seconds of data
...
                             | time                          data
-----------------------------| ----------------------------------------
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:35.040720000 0.3428878
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:36.040720000 0.592038
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:37.040720000 0.7430601
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:38.040720000 0.6335474
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:39.040720000 0.08378837
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:40.040720000 0.9767207
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:41.040720000 0.2440352
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:42.040720000 0.3866914
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:43.040720000 0.9789462
2023.10.11D14:27:52.225268783| 2023.10.11D13:58:44.040720000 0.5799388
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:40.040720000 0.9767207
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:41.040720000 0.2440352
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:42.040720000 0.3866914
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:43.040720000 0.9789462
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:44.040720000 0.5799388
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:45.040720000 0.9282572
2023.10.11D14:27:57.230844829| 2023.10.11D13:58:46.040720000 0.8006013
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D14:27:57.230957348| 2023.10.11D13:58:45.040720000 0.9282572
2023.10.11D14:27:57.230957348| 2023.10.11D13:58:46.040720000 0.8006013

tumbling

@Window
def tumbling(period: Timedelta,
             time_column: Optional[Union[str, kx.SymbolAtom]] = None,
             *,
             lateness: Timedelta = (0, 's'),
             passthrough: bool = False,
             sort: bool = False,
             count_trigger: int = 2**63 - 1,
             time_assigner: Optional[Union[str, Callable]] = None,
             skip_empty_windows: bool = False,
             accept_dictionaries: bool = True) -> Window

Aggregate stream into non-overlapping windows based on event time.

Arguments:

  • period - The frequency at which windows should fire.
  • time_column - Name of the column containing the event timestamps. Mutually exclusive with the time_assigner argument.
  • lateness - The time delay before emitting a window to allow late events to arrive.
  • passthrough - True to send late events through the pipeline with the next batch rather than dropping them.
  • sort - True to sort the window in ascending time order.
  • count_trigger - The number of buffered records at which the buffer will be flushed automatically. Note, this does not specify the size of the partial windows emitted, only the threshold at which the entire buffer will be flushed.
  • time_assigner - A function which will be called with the data (or the parameters specified by the params keyword argument) which should return a list of timestamps with a value for each record in the data. Mutually exclusive with the time_column argument.
  • skip_empty_windows - True to only emit non-empty windows. This can increase performance on sparse historical data.
  • accept_dictionaries - If batches will never be dictionaries, this can be False to increase performance.

Returns:

A tumbling operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random

>>> df = pd.DataFrame({
        'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
        'data': [random.uniform(0, 1) for _ in range(30)]
        })

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.tumbling(period=timedelta(seconds=4), time_column="time")
        | sp.write.to_console())

>>> kx.q('publish', df)
Console, 30 seconds of non-overlapping windows
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552460624| 2023.10.11D16:27:55.250466000 0.4693634
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:56.250466000 0.4031693
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:57.250466000 0.1346816
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:58.250466000 0.4040443
2023.10.11D16:28:26.552539018| 2023.10.11D16:27:59.250466000 0.2443313
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:00.250466000 0.1977503
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:01.250466000 0.3117616
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:02.250466000 0.5984409
2023.10.11D16:28:26.552712430| 2023.10.11D16:28:03.250466000 0.518701
...
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:28:33.497442545| 2023.10.11D16:28:24.250466000 0.7036805

count

@Window
def count(size: int,
          frequency: Optional[int] = None,
          accept_dictionaries: bool = True) -> Window

Split the stream into evenly sized windows.

Arguments:

  • size - The exact number of records to include in each window.
  • frequency - The number of records between the starts of consecutive windows. If this is less than size, the windows will overlap. If None, it defaults to the size argument.
  • accept_dictionaries - If batches will never be dictionaries, this can be False to increase performance.

Returns:

A count operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import datetime
>>> import pandas as pd
>>> import pykx as kx
>>> import random

>>> df = pd.DataFrame({
        'time': pd.date_range(end=datetime.datetime.now(), periods=30, freq='S'),
        'data': [random.uniform(0, 1) for _ in range(30)]
        })

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.count(size=3, frequency=2)
        | sp.write.to_console())

>>> kx.q('publish', df)
Console, 14 windows capturing previous 30 seconds' data with 3 entries, 2 overlapping entries each
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:30.315908000 0.1778936
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:31.315908000 0.3942574
2023.10.11D16:42:02.301313466| 2023.10.11D16:41:32.315908000 0.1997905
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:32.315908000 0.1997905
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:33.315908000 0.3745958
2023.10.11D16:42:02.301390098| 2023.10.11D16:41:34.315908000 0.1060404
                             | time                          data
-----------------------------| ---------------------------------------
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:34.315908000 0.1060404
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:35.315908000 0.9795624
2023.10.11D16:42:02.301437123| 2023.10.11D16:41:36.315908000 0.8657293
...
                             | time                          data
-----------------------------| ----------------------------------------
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:56.315908000 0.9722341
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:57.315908000 0.09027353
2023.10.11D16:42:02.301886505| 2023.10.11D16:41:58.315908000 0.3506035

global_window

@Window
def global_window(trigger: OperatorFunction,
                  *,
                  mixed_schemas: bool = False,
                  accept_dictionaries: bool = True,
                  state: Any = None) -> Window

Aggregate the stream using a custom trigger.

Note: This window breaks the naming conventions of kxi.sp Defining/using a module attribute named global would be a syntax error, so this function is named global_window as a workaround.

Arguments:

  • trigger - A function that splits the stream (see below).
  • mixed_schemas - True to support batches being tables with different schemas.
  • accept_dictionaries - If batches will never be dictionaries, this can be False to increase performance.
  • state - The initial state.

Returns:

A global window, which can be joined to other operators or pipelines.

The trigger function is passed the following parameters:

  • the operator's id
  • the buffered records
  • an offset of where the current batch starts
  • the current batch's metadata
  • the current batch's data

As batches are ingested, the trigger function will be applied to the batch, and data will be buffered. However, the buffering behavior will depend on the output of the trigger function:

  • If the trigger function returns an empty list or generic null, the incoming batch will be buffered and nothing will be emitted.
  • If the trigger function returns numbers, the buffer will be split on those indices, with each index being the start of a new window.

  • Note - Last data batch The last list will remain in the buffer. This last list can be emitted by returning the count of the buffer as the last index. To map indices in the current batch to indices in the buffer, add the offset parameter to the indices.

The buffered records cannot be modified from the trigger function.

Batches with mixed schemas are only supported when using the mixed_schemas option.

  • Note - Caveat when using mixed_schemas When this is set, the buffer passed to the trigger will be a list of batches, rather than a single table. The indices returned by the trigger function will still work as though the buffer were a single list.

    >>> from kxi import sp
    >>> from datetime import timedelta
    >>> import pandas as pd
    >>> import pykx as kx
    
    >>> def trig(id, buf, offset, md, data):
            return [i + offset for i in range(len(data)) if data['status'][i] == 'new']
    
    >>> sp.run(sp.read.from_callback('publish')
            | sp.window.global_window(trig)
            | sp.write.to_console())
    >>> df1 = pd.DataFrame({'val': [12],'status': ['running']})
    >>> df2 = pd.DataFrame({'val': [14,16],'status': ['running', 'running']})
    >>> df3 = pd.DataFrame({'val': [18,20,22],'status': ['running', 'running', 'new']})
    >>> df4 = pd.DataFrame({'val': [24,26,44, 55, 66, 77],
            'status': ['running', 'running', 'new', 'running', 'new', 'running']})
    >>> kx.q('publish', df1) # Buffers one record
    >>> kx.q('publish', df2) # Buffers two more records
    >>> kx.q('publish', df3) # Emits a window, buffering the record with the status 'new'
    
    Console, emits all buffered up to 0, 'new'
                                 | val status
    -----------------------------| -----------
    2023.10.17D17:38:28.883736096| 12  running
    2023.10.17D17:38:28.883736096| 14  running
    2023.10.17D17:38:28.883736096| 16  running
    2023.10.17D17:38:28.883736096| 18  running
    2023.10.17D17:38:28.883736096| 20  running
    
    >>> kx.q('publish', df4)
    
    Console, emits 2 windows, buffers [0,47], ['new', 'running']
                                 | val status
    -----------------------------| -----------
    2023.10.17D17:38:51.211669106| 22  new
    2023.10.17D17:38:51.211669106| 24  running
    2023.10.17D17:38:51.211669106| 26  running
                                 | val status
    -----------------------------| -----------
    2023.10.17D17:38:51.211736552| 44  new
    2023.10.17D17:38:51.211736552| 55  running