Skip to content

kxi.sp.state

Stream Processor state interface.

get

def get(operator: Operator,
        metadata: Metadata,
        *,
        unpickle: Unpickle = Unpickle.auto) -> Any

Get the state of an operator.

Arguments:

  • operator - The operator, specified as a configuration dictionary or name.
  • metadata - The metadata dictionary, provided for specialized data retrieval.
  • unpickle - A Unpickle enum value representing whether the retrieved data should be unpickled. Can be provided as a boolean. Defaults to Unpickle.auto, which unpickles the data only if it appears to be bytes in a pickled format.

Returns:

The data that was stored in the operator's state.

>>> from kxi import sp
>>> import pykx as kx

>>> def map_func(op, md, data):
        highest = max(max(data), sp.state.get(op, md))
        sp.state.set(op, md, highest)
        return highest

>>> sp.run(sp.read.from_callback('publish')
        | sp.map(map_func, state=float('-inf'), name='map')
        | sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9

set

def set(operator: Operator,
        metadata: Metadata,
        data: Any,
        *,
        pickle: Pickle = Pickle.false) -> Any

Set the state of an operator.

Arguments:

  • operator - The operator, specified as a configuration dictionary or name.
  • metadata - The metadata dictionary, provided for specialized data storage.
  • data - The data to be stored. By default the data will be converted to a pykx.K object using the default conversion logic. To store the object as-is set pickle=True.
  • pickle - A Pickle enum value representing whether the data should be pickled or not. Can be provided as a boolean.

Returns:

The data that was stored in the operator's state.

>>> from kxi import sp
>>> import pykx as kx

>>> def map_func(op, md, data):
        highest = max(max(data), sp.state.get(op, md))
        sp.state.set(op, md, highest)
        return highest

>>> sp.run(sp.read.from_callback('publish')
        | sp.map(map_func, state=float('-inf'), name='map')
        | sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9