kxi.sp.state
Stream Processor state interface.
get
def get(operator: Operator,
metadata: Metadata,
*,
unpickle: Unpickle = Unpickle.auto) -> Any
Get the state of an operator.
Arguments:
operator
- The operator, specified as a configuration dictionary or name.metadata
- The metadata dictionary, provided for specialized data retrieval.unpickle
- AUnpickle
enum value representing whether the retrieved data should be unpickled. Can be provided as a boolean. Defaults toUnpickle.auto
, which unpickles the data only if it appears to be bytes in a pickled format.
Returns:
The data that was stored in the operator's state.
>>> from kxi import sp
>>> import pykx as kx
>>> def map_func(op, md, data):
highest = max(max(data), sp.state.get(op, md))
sp.state.set(op, md, highest)
return highest
>>> sp.run(sp.read.from_callback('publish')
| sp.map(map_func, state=float('-inf'), name='map')
| sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9
set
def set(operator: Operator,
metadata: Metadata,
data: Any,
*,
pickle: Pickle = Pickle.false) -> Any
Set the state of an operator.
Arguments:
operator
- The operator, specified as a configuration dictionary or name.metadata
- The metadata dictionary, provided for specialized data storage.data
- The data to be stored. By default the data will be converted to apykx.K
object using the default conversion logic. To store the object as-is setpickle=True
.pickle
- APickle
enum value representing whether the data should be pickled or not. Can be provided as a boolean.
Returns:
The data that was stored in the operator's state.
>>> from kxi import sp
>>> import pykx as kx
>>> def map_func(op, md, data):
highest = max(max(data), sp.state.get(op, md))
sp.state.set(op, md, highest)
return highest
>>> sp.run(sp.read.from_callback('publish')
| sp.map(map_func, state=float('-inf'), name='map')
| sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9