Skip to content

State

.qsp.get

Get cached state of an operator

.qsp.get[op;md]

Parameters:

name q type description
op symbol or dict The name of the operator, or the operator dictionary.
md .qsp.message.metadata or generic null (::) The metadata for specialized data storage and retrieval, or null.

Returns cached state for the specified operator. If md contains a key, returns the state specifically associated with that key. Otherwise, returns unkeyed state of the operator.

Example:

.qsp.set[`map;::;0 1]
.qsp.set[`map;``key!(::;`key1);0 1 2 3 4]

.qsp.get[`map;``key!(::;`key1)] // => 0 1 2 3 4
.qsp.get[`map;``key!(::;`key2)] // => ()
.qsp.get[`map;::]               // => 0 1

Simple stateful operator example:

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op;md;data]
        previousSum: .qsp.get[op;md];
        newSum: previousSum + data;
        .qsp.set[op;md;newSum]
        }; .qsp.use ``state!(::;0)]
    .qsp.write.toConsole[];

publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6

.qsp.set

Sets state of an operator

.qsp.set[op;md;data]

Parameters:

name q type description
op symbol or dict The name of the operator, or the operator dictionary.
md .qsp.message.metadata or generic null (::) The metadata for specialized data storage and retrieval, or null.
data any The new state.

Passing metadata allows data to be stored on a per-key basis, which allows for dynamic rebalancing of state across workers.

Stores state for the operator and returns the data added to the state. If no operator configuration is provided, state is retrieved from global storage. Stores state for the specified operator and returns the data added to the state. If md contains a key, updates the state specifically associated with that key. Otherwise, updates the unkeyed state of the operator.

Keyed State

Message metadata can contain a key field. The value of this field represents the key of the events contained in that message. Messages with different keys are processed independently of one another, and to help achieve this, operator state is separated by key, as demonstrated in the example below.

.qsp.run
    .qsp.read.fromCallback[`publish; .qsp.use ``key!(::;`y)]
    .qsp.map[{[op;md;data]
        previousSum: .qsp.get[op;md];
        newSum: previousSum + data`x;
        .qsp.set[op;md;newSum]
        }; .qsp.use ``state!(::;0)]
    .qsp.write.toConsole[]

// Publishing data with key=0
publish `x`y!1 0;
publish `x`y!2 0;
publish `x`y!3 0;

// Publishing data with key=1
publish `x`y!1 1;
publish `x`y!2 1;
publish `x`y!3 1;

// Publishing data with key=0 again
publish `x`y!4 0;
2022.03.10D16:28:02.220001229 | 1
2022.03.10D16:28:03.968131241 | 3
2022.03.10D16:28:04.569659787 | 6
2022.03.10D16:28:05.533655577 | 1
2022.03.10D16:28:06.051010015 | 3
2022.03.10D16:28:06.523569938 | 6
2022.03.10D16:28:08.076519264 | 10

In the above example, the .qsp.map is a stateful operator which outputs the sum of all numbers it has received so far. Since .qsp.read.fromCallback uses its key option, data pushed to the pipeline using publish will have the value of its y field extracted. This value will be the key of the data. You can see above that when publishing to key 0, the map outputs 1, 3, then 6, but when publishing to key 1, the sum resets and also outputs 1, 3, then 6. This is because the state of the map operator is split between the 2 keys, 0 and 1. Finally, you can see that upon publishing to key 0 again, it continues from where it left off, outputting 10.