Skip to content

State

This page explains the methods and concepts involved in managing the cached state of a pipeline operator.

Get

Gets cached state of an operator.

.qsp.get[op;md]

Parameters:

name type description default
op symbol or dict The name of the operator, or the operator dictionary. Required
md .qsp.message.metadata or generic null (::) The metadata for specialized data storage and retrieval, or null. (::)

Returns:

Cached state for the specified operator. If md contains a key, returns the state specifically associated with that key. Otherwise, returns unkeyed state of the operator.

Examples:

.qsp.set[`map;::;0 1]
.qsp.set[`map;``key!(::;`key1);0 1 2 3 4]

.qsp.get[`map;``key!(::;`key1)] => 0 1 2 3 4
.qsp.get[`map;``key!(::;`key2)] => ()
.qsp.get[`map;::]               => 0 1

Simple stateful operator example:

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op;md;data]
        previousSum: .qsp.get[op;md];
        newSum: previousSum + data;
        .qsp.set[op;md;newSum]
        }; .qsp.use ``state!(::;0)]
    .qsp.write.toConsole[];

publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6
sp.state.get(operator, metadata)

Parameters:

name type description default
operator Operator The operator, specified as a configuration dictionary or name. Required
metadata Metadata The metadata dictionary, provided for specialized data retrieval. None
unpickle Unpickle An Unpickle enum value representing whether the retrieved data should be unpickled. Can be provided as a boolean. Defaults to Unpickle.auto, which unpickles the data only if it appears to be bytes in a pickled format. Unpickle.auto

For all common arguments, refer to configuring operators

Returns:

The data that was stored in the operator's state.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> def map_func(op, md, data):
        highest = max(max(data), sp.state.get(op, md))
        sp.state.set(op, md, highest)
        return highest

>>> sp.run(sp.read.from_callback('publish')
        | sp.map(map_func, state=float('-inf'), name='map')
        | sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9

Keyed State

Message metadata can contain a key field. The value of this field represents the key of the events contained in that message. Messages with different keys are processed independently of one another, and to help achieve this, operator state is separated by key, as demonstrated in the examples below.

.qsp.run
    .qsp.read.fromCallback[`publish; .qsp.use ``key!(::;`y)]
    .qsp.map[{[op;md;data]
        previousSum: .qsp.get[op;md];
        newSum: previousSum + data`x;
        .qsp.set[op;md;newSum]
        }; .qsp.use ``state!(::;0)]
    .qsp.write.toConsole[]

// Publishing data with key=0
publish `x`y!1 0;
publish `x`y!2 0;
publish `x`y!3 0;

// Publishing data with key=1
publish `x`y!1 1;
publish `x`y!2 1;
publish `x`y!3 1;

// Publishing data with key=0 again
publish `x`y!4 0;
2022.03.10D16:28:02.220001229 | 1
2022.03.10D16:28:03.968131241 | 3
2022.03.10D16:28:04.569659787 | 6
2022.03.10D16:28:05.533655577 | 1
2022.03.10D16:28:06.051010015 | 3
2022.03.10D16:28:06.523569938 | 6
2022.03.10D16:28:08.076519264 | 10

In the above example, the .qsp.map is a stateful operator which outputs the sum of all numbers it has received so far. Since .qsp.read.fromCallback uses its key option, data pushed to the pipeline using publish will have the value of its y field extracted. This value will be the key of the data. You can see above that when publishing to key 0, the map outputs 1, 3, then 6, but when publishing to key 1, the sum resets and also outputs 1, 3, then 6. This is because the state of the map operator is split between the 2 keys, 0 and 1. Finally, you can see that upon publishing to key 0 again, it continues from where it left off, outputting 10.

Coming Soon

Set

Sets cached state of an operator.

.qsp.set[op;md;data]

Parameters:

name type description default
op symbol or dict The name of the operator, or the operator dictionary. Required
md .qsp.message.metadata or generic null (::) The metadata for specialized data storage and retrieval, or null. (::)
data any The new state. Required

Passing metadata allows data to be stored on a per-key basis, which allows for dynamic rebalancing of state across workers.

Stores state for the operator and returns the data added to the state. If no operator configuration is provided, state is retrieved from global storage. Stores state for the specified operator and returns the data added to the state. If md contains a key, updates the state specifically associated with that key. Otherwise, updates the unkeyed state of the operator.

Returns:

The data that was stored in the operator's state.

Examples:

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op;md;data]
        previousSum: .qsp.get[op;md];
        newSum: previousSum + data;
        .qsp.set[op;md;newSum]
        }; .qsp.use ``state!(::;0)]
    .qsp.write.toConsole[];

publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6
sp.state.set(operator, metadata, data)

Parameters:

name type description default
operator Operator The operator, specified as a configuration dictionary or name. Required
metadata Metadata The metadata dictionary, provided for specialized data retrieval. None
data Any The data to be stored. By default the data will be converted to a pykx.K object using the default conversion logic. To store the object as-is set pickle=True. Required
pickle Pickle A Pickle enum value representing whether the data should be pickled or not. Can be provided as a boolean. Pickle.false

Returns: The data that was stored in the operator's state.

Examples:

>>> from kxi import sp
>>> import pykx as kx

>>> def map_func(op, md, data):
        highest = max(max(data), sp.state.get(op, md))
        sp.state.set(op, md, highest)
        return highest

>>> sp.run(sp.read.from_callback('publish')
        | sp.map(map_func, state=float('-inf'), name='map')
        | sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9