Skip to content

kxi.sp.op

Stream Processor operators.

An operator is a first-class building block in the stream processor API. Operators are strung together in a user's program, and provide the core functionality for the Stream Processor.

Note: "operator" terminology In the KX Stream Processor, the term "operator" can refer to either one of the basic operators like map, or filter, but can also refer to any component of a pipeline. In other words: all readers/writers/decoders/etc. are operators, but not all operators are readers/writers/decoders/etc.

Note: kxi.sp.op aliases in kxi.sp All operators in this module are available through aliases in kxi.sp for convenience. For example, you can access kxi.sp.op.map as kxi.sp.map.

OperatorParams Objects

class OperatorParams(AutoNameEnum)

Specifies a parameter that will be provided to the function you provide to an operator.

operator

Provide the operator's dictionary, which may be required by other SP functions.

metadata

Provide the message's metadata, which may be required by other SP functions.

data

Provide the message's data.

Operator Objects

class Operator()

Stream Processor operator interface.

An operator is a first-class building block in the stream processor API. Operators can be strung together to form a kxi.sp.Pipeline instance, which can then be run.

Pipeline and operator objects can be joined together using the | operator. Operators can also be joined to lists/tuples of pipelines or operators.

See Also:

kxi.sp.Pipeline

Attributes:

  • id str - The unique ID of this operator.

as_pipeline

@property
def as_pipeline()

A new pipeline that only contains this operator.

accumulate

@Operator
def accumulate(function: OperatorFunction,
               initial: Any = None,
               output: OperatorFunction = None) -> Operator

Aggregates a stream into an accumulator.

Aggregates a stream into an accumulator, which is updated and then emitted for each incoming batch. The value being emitted is passed to the output function, which can modify it before passing it to the next operator. If the accumulator is a dictionary, it may be necessary to enlist the result in the output function so the next operator receives a table.

By default, .qsp.accumulate emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like {enlist x} could be used to emit tables.

Arguments:

  • function - An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator.
  • initial - The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.
  • output - An optional function to transform output before emitting it. It gets passed the value of the accumulator.

Returns:

An accumulate operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> def accumulator(md, data, acc):
        return acc + sum(data)

>>> def output(data):
        return data * 10

>>> sp.run(sp.read.from_callback('publish')
        | sp.accumulate(accumulator, 100, output)
        | sp.write.to_variable('out'))

>>> kx.q('publish',[1,2,3])
>>> kx.q('publish',[4,5,6])
>>> kx.q('out')
1060 1210

apply

@Operator
def apply(
    function: OperatorFunction,
    *,
    on_finish: Optional[OperatorFunction] = None,
    state: Any = None,
    params: Optional[dict] = kx.q('()!()')) -> Operator

Apply a function to incoming batches in the stream.

Unlike other operators, apply is asynchronous. To return data from it, you must use sp.push to push data through the pipeline when it is ready. Apply is commonly used with the state API to implement custom windowing, or with the task API.

When apply is used to buffer data, the onFinish option should be used to flush the buffer when the pipeline finishes. It will be called once for every key in the state including the default key, with the metadata dictionary passed to it containing only the key.

Arguments:

  • function - A function that will be called with three arguments: operator, metadata, and data. Operator and metadata can be used to get/set the state of the operator, among other uses.
  • on_finish - A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata.
  • state - The initial state.
  • params - The arguments to pass to function.

Returns:

An apply operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> from itertools import groupby

>>> def split_list(lst):
        return [list(group) for key, group in groupby(lst, lambda x: x == 1) if not key]

>>> data = [1, 2, 3, 1, 4, 5, 6, 1, 7, 8, 9]

>>> sp.run(sp.read.from_callback('publish')
        | sp.apply(lambda op, md, data: [sp.push(op, md, sublist)
            for sublist in split_list(data)])
        | sp.map(lambda x: [x])
        | sp.write.to_variable('out'))
>>> kx.q('publish', data)
>>> kx.q('out')
2 3
4 5 6
7 8 9

filter

@Operator
def filter(
    function: OperatorFunction,
    dropEmptyBatches: bool = False,
    allowPartials: bool = True,
    state: Any = None,
    params: Optional[dict] = kx.q('()!()')) -> Operator

Filter elements out of a batch, or a batch from a stream.

Arguments:

  • function - A predicate function which will be applied to the data that passes through the operator. If the result is a boolean list, then only records that it flags in the batch progress through the stream. If the result is a singular boolean value, then either all records in the batch, or none of them progress through the stream.
  • dropEmptyBatches - 'True' to only emit non-empty batches.
  • allowPartials - True indicates this filter operation can handle partial batches of data. See below for more details on partial batches.
  • state - The initial state.
  • params - The arguments to pass to function.

Returns:

A filter operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> def example_filter(data):
        return [i for i in 'sale' == data['transaction']]

>>> sp.run(sp.read.from_callback('publish')
                | sp.op.filter(example_filter)
                | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
        'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
        'price': [1, 2, 3, 3, 3, 4]
        })

>>> kx.q('publish', data)
>>> kx.q('out')
transaction price
-----------------
sale        1
sale        3
sale        3

key_by

@Operator
def key_by(field: Union[str, int, Callable],
           allowPartials: bool = True) -> Operator

(Beta Feature)Add a key to a stream to partition data.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • field - The method of keying the stream. In the first instance we can pass a string to designate a symbol column on which to partition the data. Otherwise, an integer or callable can be supplied.
  • allowPartials - True indicates this filter operation can handle partial batches of data. See below for more details on partial batches.

Returns:

A key_by operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
          | sp.op.key_by('transaction')
          | sp.write.to_variable('out'))

>>> data = pd.DataFrame({
          'transaction': ['sale', 'purchase', 'sale', 'sale', 'return', 'purchase'],
          'price': [1, 2, 3, 3, 3, 4]
      })

>>> kx.q('publish', data)
>>> kx.q('out')
      transaction price
      -----------------
      sale        1
      sale        3
      sale        3
      purchase    2
      purchase    4
      return      3

map

@Operator
def map(function: OperatorFunction, allowPartials: bool = True) -> Operator

Apply a function to data passing through the operator.

The function provided will be called on every message that passes through the pipeline stream. The map operation can modify the data by performing aggregations, type changes, transformations, etc.

Arguments:

  • function - The function that will be applied to the data that passes through the operator. Can be provided either as a Python function, or as a string of q code that defines a q function.
  • allowPartials - True indicates this filter operation can handle partial batches of data. See below for more details on partial batches.

Returns:

A map operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np

>>> def square(data):
        data['x'] *= 10
        return data

>>> sp.run(sp.read.from_callback('publish')
        | sp.map(square)
        | sp.write.to_variable('out'))
>>> data = pd.DataFrame({
        'x': np.random.randn(5),
        'y': np.random.randn(5)
        })
>>> kx.q('publish', data)
>>> kx.q('out')
x         y
---------------------
-4.91861  -0.03564342
2.311394  0.8991633
-1.670712 0.433386
22.39179  -0.8260305
9.908324  -0.3064973

merge

@Operator
def merge(
    stream: Pipeline,
    function: OperatorFunction,
    flush: Optional[str] = None,
    trigger: Union[str, Callable] = None,
    concat: Optional[OperatorFunction] = None,
    state: Any = None,
    params: Optional[dict] = kx.q('()!()')) -> Operator

Merge two streams using a function to combine them.

Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream. See also: union

Arguments:

  • stream - The separate pipeline that will be merged with.
  • function - A function that takes two arguments, the data from each stream, and returns the data from both streams combined into one.
  • flush - Indicates which side of the merge operation to flush data from.
  • trigger - This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger.
  • concat - A function to update the buffer with incoming data.
  • state - The initial state.
  • params - The arguments to pass to function.

Returns:

A merge operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd

>>> df1 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'price': [150, 2800]})
>>> df1.set_index('symbol', inplace=True)
>>> df2 = pd.DataFrame({'symbol': ['AAPL', 'GOOGL'], 'size': [100, 200]})

>>> sp.run(sp.read.from_callback('publish')
        | sp.merge(sp.read.from_callback('publish2'),
            lambda x, y: pd.merge(x.pd(), y.pd(), on='symbol',how='left'))
        | sp.write.to_variable('out'))
>>> kx.q('publish', df2)
>>> kx.q('publish2', df1)
>>> kx.q('out')
symbol size price
-----------------
AAPL   100  150
GOOGL  200  2800

parallel

@Operator
def parallel(
    functions: List[OperatorFunction],
    *,
    merge: Optional[OperatorFunction] = None,
    params: Optional[dict] = kx.q('()!()')
) -> Operator

(Beta Feature)Applies multiple functions in parallel to a stream.

This operator is a core operator that can perform functions in parallel. This operator allows multiple aggregations to be run over the same data in parallel.

Note: Beta Features To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Note: State is not supported Cannot update multiple states within the same global variable therefore, state cannot be supported within parallel execution.

Arguments:

  • functions - A function or functions applied to incoming data.
  • merge - A function run on the outputs of the functions passed in.
  • params - The arguments to pass to function.

Returns:

A parallel operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd

>>> def avg(data):
        return sum(data)/len(data)
>>> sp.run(sp.read.from_callback('publish')
        | sp.parallel([sum,avg])
        | sp.map(lambda x:[x]) # gets a list of batches, instead of appending all together
        | sp.write.to_variable('out'))
>>> data = [1,2,3]
>>> kx.q('publish', data)
>>> kx.q('out')
6 2f

reduce

@Operator
def reduce(function: OperatorFunction,
           initial: Any = None,
           output: OperatorFunction = None) -> Operator

Aggregates partial windows.

A window may include more records than can fit in memory. As such, it may be necessary to reduce the buffered records into a smaller, aggregated value at regular intervals. If the window operator uses the count_trigger option, a partial window will be emitted when the number of buffered records exceeds the count_trigger. Partial windows will also be emitted when a stream goes idle. These partial windows can be aggregated using this operator. When the window is complete, this operator will emit the result of reducing the partial windows.

The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator.

For partial windows, such as those emitted by count_trigger or idle streams, the accumulator will be updated but not emitted.

When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.

By default, kxi.sp.reduce emits the accumulator, but this value can be transformed with the output function. If the accumulator is a dictionary, an output function like pykx.q.enlist could be used to emit tables.

Arguments:

  • function - An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator.
  • initial - The initial state of the accumulator. The accumulation is performed in q, and as such this initial value must be convertible to q. This conversion can be performed manually with pykx.K.
  • output - An optional function to transform output before emitting it. It gets passed the value of the accumulator.

Returns:

A reduce operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import random
>>> import datetime

>>> def accumulator(md, data, acc):
        total_length = len(data)
        min_time = min(data['time'])
        sum_val = sum(data['val'])
        return {
            'start_time': min_time,
            'avg_val': sum_val / total_length
            }


>>> def output(accumulator_result):
        return pd.DataFrame([accumulator_result])


>>> vals = [random.uniform(0, 1) for _ in range(10000)]
>>> current_datetime = datetime.datetime.now()
>>> time_values = [current_datetime + datetime.timedelta(seconds=i/10) for i in range(10000)]
>>> df = pd.DataFrame({'time': time_values, 'val': vals})

>>> empty_df = pd.DataFrame(columns=df.columns)

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.tumbling(datetime.timedelta(seconds=1),
            time_assigner=lambda x: x['time'], count_trigger=5)
        | sp.reduce(accumulator, empty_df, output)
        | sp.write.to_variable('out'))
>>> kx.q('publish', df)
>>> kx.q('out')
avg_val   start_time
---------------------------------------
0.4525153 2023.10.06D20:37:26.985194000
0.5691399 2023.10.06D20:37:27.085194000
0.5124086 2023.10.06D20:37:28.085194000
0.6062494 2023.10.06D20:37:29.085194000
0.3942734 2023.10.06D20:37:30.085194000
0.4774162 2023.10.06D20:37:31.085194000
...

rolling

@Operator
def rolling(n: int, fn: OperatorFunction) -> Operator

(Beta Feature)Applies a moving-window function to a stream.

This operator is equivalent to sp.map, but for moving window functions such as moving averages or those comparing a vector to a shifted version of itself, such as the difference function.

This operator does not emit a moving window, that can be done with .qsp.window.count. Rather, it maintains a buffer of the last n records, which is prepended to each incoming batch. The results of the function on these prepended elements are dropped, as their values would have already been emitted in an earlier batch.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Warnings:

Functions that aggregate data to a constant number of data points (example sum) will not work in conjunction with the rolling operator because it needs to have data points in the buffer to provide the correct output.

Arguments:

  • n - The size of the buffer.
  • fn - A function that takes a vector.

Returns:

A rolling operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import numpy as np
>>> import pykx as kx

>>> def moving_average(x, w):
       return np.convolve(x, np.ones(w), 'valid') / w

>>> sp.run(sp.read.from_callback('publish')
       | sp.rolling(2, lambda x: moving_average(x,2))
       | sp.write.to_variable('out'))
>>> kx.q('publish', [1,2,3])
>>> kx.q('publish', [4,5,6])
>>> kx.q('publish', [7,8,9])
>>> kx.q('out')
1.5 2.5 3.5 4.5 5.5 6.5 7.5 8.5

split

@Operator
def split() -> Operator

Split the current stream.

The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.

Split operators can be explicitly added, but are implicitly added if the same operator appears as a parent multiple times when resolving the streams given to sp.run

Returns:

A split operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd

>>> def addOne(data):
       data['x'] += 1
       return data


>>> def addFive(data):
       data['x'] += 5
       return data


>>> def timesTen(data):
       data['x'] *= 10
       return data


>>> streamA=( sp.read.from_callback('publish')
       | sp.map(addOne)
       | sp.split())

>>> streamB=( streamA
       | sp.map(addFive)
       | sp.write.to_console())
>>> streamC=( streamA
       | sp.map(timesTen)
       | sp.write.to_variable('out'))

>>> sp.run(streamB, streamC)
>>> data = pd.DataFrame({
       'x': np.random.randn(5),
       'y': np.random.randn(5)
    })
>>> kx.q('publish', data)
>>> kx.q('out')
x        y
--------------------
7.807491 1.565018
8.19516  2.387283
15.90946 -0.03481929
12.44505 -0.5897372
2.520515 -0.06977756

sql

@Operator
def sql(query: CharString, schema: Optional[kx.Table] = None) -> Operator

Perform an SQL query on tabular data in the stream.

The query must:

If data in the stream is not table data, an error will be raised. Providing a schema will allow the query to be precompiled enabling faster processing on streaming data.

Note: Queries run in a local worker process SQL queries are not distributed across workers, and are currently run on each worker's substream.

Arguments:

  • query - An SQL query, which will be performed on tabular data in the stream.
  • schema - An empty table with the same schema as the data.
    >>> from kxi import sp
    >>> import pykx as kx
    >>> import pandas as pd
    
    >>> df = pd.DataFrame({'x': range(10), 'y': range(10)})
    
    >>> sp.run(sp.read.from_callback('publish')
            | sp.sql('SELECT * FROM $1 ORDER BY x')
            | sp.write.to_variable('out'))
    >>> kx.q('publish', df)
    >>> kx.q('out')
    
    x y
    ---
    0 0
    1 1
    2 2
    3 3
    4 4
    5 5
    6 6
    7 7
    8 8
    9 9
    

udf

def udf(
    name: CharString,
    package: CharString,
    version: CharString = None,
    params: Optional[dict] = kx.q('()!()')
) -> Operator

Retrieve a User-Defined-Function(UDF) from a defined 'KX_PACKAGE_PATH' package directory

Arguments:

  • name - Name of the UDF that is to be retrieved
  • package - Name of the package from which to retrieve a UDF
  • version - Version of the package from which to retrieve the UDF
  • params - Additional optional input information to the UDF passed as the final argument to the function

Returns:

A function which is to be used as a UDF within a merge, filter, map operator

Examples:

Retrieve a UDF providing all necessary information

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function (in this example the UDF adds 10 to input)
>>> udf = sp.udf('map_udf', 'test', '1.0.0', {'param':10})
>>> # Use the UDF within SP operator
>>> sp.run(sp.read.from_callback('publish')
        | sp.map(udf)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.rand(5),
        'x1': np.random.rand(5)
    })
>>> kx.q('publish', data)
x        x1
-----------------
10.49318 10.39275
10.57852 10.51709
10.08389 10.51598
10.19599 10.40666
10.37564 10.17808
Retrieve a UDF using latest package and default parameters
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function
>>> udf = sp.udf('filter_udf', 'test')
>>> # Use the UDF within SP operator (this UDF filters data for x<0.5)
>>> sp.run(sp.read.from_callback('publish')
        | sp.filter(udf)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.rand(5),
        'x1': np.random.rand(5)
    })
>>> kx.q('publish', data)
x          x1
--------------------
0.07347808 0.7263142
0.3159526  0.9216436
0.3410485  0.1809536

union

@Operator
def union(stream: Pipeline,
          flush: Optional[str] = None,
          trigger: Union[str, Callable] = None) -> Operator

Unite two streams.

Like merge, but elements from both sides of the union are left as-is.

Arguments:

  • stream - The separate pipeline that will be unified with.
  • flush - Indicates which side of the merge operation to flush data from.
  • trigger - This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger.

Returns:

A union operator, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
    | sp.union(sp.read.from_callback('publish2'))
    | sp.write.to_variable('out'))
>>> kx.q('publish', range(0, 10))
>>> kx.q('publish2', range(10, 20))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19