State
This page explains the methods and concepts involved in managing the cached state of a pipeline operator.
Get
Gets cached state of an operator.
.qsp.get[op;md]
Parameters:
name | type | description | default |
---|---|---|---|
op | symbol or dict | The name of the operator, or the operator dictionary. | Required |
md | .qsp.message.metadata or generic null (::) | The metadata for specialized data storage and retrieval, or null. | (::) |
Returns:
Cached state for the specified operator. If md
contains a key, returns the state
specifically associated with that key. Otherwise, returns unkeyed state of the operator.
Examples:
.qsp.set[`map;::;0 1]
.qsp.set[`map;``key!(::;`key1);0 1 2 3 4]
.qsp.get[`map;``key!(::;`key1)] => 0 1 2 3 4
.qsp.get[`map;``key!(::;`key2)] => ()
.qsp.get[`map;::] => 0 1
Simple stateful operator example:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[op;md;data]
previousSum: .qsp.get[op;md];
newSum: previousSum + data;
.qsp.set[op;md;newSum]
}; .qsp.use ``state!(::;0)]
.qsp.write.toConsole[];
publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6
sp.state.get(operator, metadata)
Parameters:
name | type | description | default |
---|---|---|---|
operator | Operator | The operator, specified as a configuration dictionary or name. | Required |
metadata | Metadata | The metadata dictionary, provided for specialized data retrieval. | None |
unpickle | Unpickle | An Unpickle enum value representing whether the retrieved data should be unpickled. Can be provided as a boolean. Defaults to Unpickle.auto , which unpickles the data only if it appears to be bytes in a pickled format. |
Unpickle.auto |
For all common arguments, refer to configuring operators
Returns:
The data that was stored in the operator's state.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> def map_func(op, md, data):
highest = max(max(data), sp.state.get(op, md))
sp.state.set(op, md, highest)
return highest
>>> sp.run(sp.read.from_callback('publish')
| sp.map(map_func, state=float('-inf'), name='map')
| sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9
Keyed State
Message metadata can contain a key
field. The value of this field represents the key of the events contained in that message. Messages with different keys are processed independently of one another, and to help achieve this, operator state is separated by key, as demonstrated in the examples below.
.qsp.run
.qsp.read.fromCallback[`publish; .qsp.use ``key!(::;`y)]
.qsp.map[{[op;md;data]
previousSum: .qsp.get[op;md];
newSum: previousSum + data`x;
.qsp.set[op;md;newSum]
}; .qsp.use ``state!(::;0)]
.qsp.write.toConsole[]
// Publishing data with key=0
publish `x`y!1 0;
publish `x`y!2 0;
publish `x`y!3 0;
// Publishing data with key=1
publish `x`y!1 1;
publish `x`y!2 1;
publish `x`y!3 1;
// Publishing data with key=0 again
publish `x`y!4 0;
2022.03.10D16:28:02.220001229 | 1
2022.03.10D16:28:03.968131241 | 3
2022.03.10D16:28:04.569659787 | 6
2022.03.10D16:28:05.533655577 | 1
2022.03.10D16:28:06.051010015 | 3
2022.03.10D16:28:06.523569938 | 6
2022.03.10D16:28:08.076519264 | 10
In the above example, the .qsp.map
is a stateful operator which outputs the sum of all numbers it has received so far. Since .qsp.read.fromCallback
uses its key
option, data pushed to the pipeline using publish
will have the value of its y
field extracted. This value will be the key of the data. You can see above that when publishing to key 0, the map outputs 1, 3, then 6, but when publishing to key 1, the sum resets and also outputs 1, 3, then 6. This is because the state of the map operator is split between the 2 keys, 0 and 1. Finally, you can see that upon publishing to key 0 again, it continues from where it left off, outputting 10.
Coming Soon
Set
Sets cached state of an operator.
.qsp.set[op;md;data]
Parameters:
name | type | description | default |
---|---|---|---|
op | symbol or dict | The name of the operator, or the operator dictionary. | Required |
md | .qsp.message.metadata or generic null (::) | The metadata for specialized data storage and retrieval, or null. | (::) |
data | any | The new state. | Required |
Passing metadata allows data to be stored on a per-key basis, which allows for dynamic rebalancing of state across workers.
Stores state for the operator and returns the data added to the state.
If no operator configuration is provided, state is retrieved from global storage.
Stores state for the specified operator and returns the data added to the state. If md
contains a key, updates the state specifically associated with that key. Otherwise, updates
the unkeyed state of the operator.
Returns:
The data that was stored in the operator's state.
Examples:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[op;md;data]
previousSum: .qsp.get[op;md];
newSum: previousSum + data;
.qsp.set[op;md;newSum]
}; .qsp.use ``state!(::;0)]
.qsp.write.toConsole[];
publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6
sp.state.set(operator, metadata, data)
Parameters:
name | type | description | default |
---|---|---|---|
operator | Operator | The operator, specified as a configuration dictionary or name. | Required |
metadata | Metadata | The metadata dictionary, provided for specialized data retrieval. | None |
data | Any | The data to be stored. By default the data will be converted to a pykx.K object using the default conversion logic. To store the object as-is set pickle=True. | Required |
pickle | Pickle | A Pickle enum value representing whether the data should be pickled or not. Can be provided as a boolean. |
Pickle.false |
Returns: The data that was stored in the operator's state.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> def map_func(op, md, data):
highest = max(max(data), sp.state.get(op, md))
sp.state.set(op, md, highest)
return highest
>>> sp.run(sp.read.from_callback('publish')
| sp.map(map_func, state=float('-inf'), name='map')
| sp.write.to_variable('out'))
>>> kx.q('publish', range(10))
>>> kx.q('publish', range(5))
>>> kx.q('out')
9 9