State
.qsp.get
Get cached state of an operator
.qsp.get[op;md]
Parameters:
name | q type | description |
---|---|---|
op | symbol or dict | The name of the operator, or the operator dictionary. |
md | .qsp.message.metadata or generic null (::) | The metadata for specialized data storage and retrieval, or null. |
Returns cached state for the specified operator. If md
contains a key, returns the state
specifically associated with that key. Otherwise, returns unkeyed state of the operator.
Example:
.qsp.set[`map;::;0 1]
.qsp.set[`map;``key!(::;`key1);0 1 2 3 4]
.qsp.get[`map;``key!(::;`key1)] // => 0 1 2 3 4
.qsp.get[`map;``key!(::;`key2)] // => ()
.qsp.get[`map;::] // => 0 1
Simple stateful operator example:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[op;md;data]
previousSum: .qsp.get[op;md];
newSum: previousSum + data;
.qsp.set[op;md;newSum]
}; .qsp.use ``state!(::;0)]
.qsp.write.toConsole[];
publish 1;
publish 2;
publish 3;
2022.03.09D21:08:02.752793709 | 1
2022.03.09D21:08:04.152717183 | 3
2022.03.09D21:08:04.819407879 | 6
.qsp.set
Sets state of an operator
.qsp.set[op;md;data]
Parameters:
name | q type | description |
---|---|---|
op | symbol or dict | The name of the operator, or the operator dictionary. |
md | .qsp.message.metadata or generic null (::) | The metadata for specialized data storage and retrieval, or null. |
data | any | The new state. |
Passing metadata allows data to be stored on a per-key basis, which allows for dynamic rebalancing of state across workers.
Stores state for the operator and returns the data added to the state.
If no operator configuration is provided, state is retrieved from global storage.
Stores state for the specified operator and returns the data added to the state. If md
contains a key, updates the state specifically associated with that key. Otherwise, updates
the unkeyed state of the operator.
Keyed State
Message metadata can contain a key
field. The value of this field represents the key of the events contained in that message. Messages with different keys are processed independently of one another, and to help achieve this, operator state is separated by key, as demonstrated in the example below.
.qsp.run
.qsp.read.fromCallback[`publish; .qsp.use ``key!(::;`y)]
.qsp.map[{[op;md;data]
previousSum: .qsp.get[op;md];
newSum: previousSum + data`x;
.qsp.set[op;md;newSum]
}; .qsp.use ``state!(::;0)]
.qsp.write.toConsole[]
// Publishing data with key=0
publish `x`y!1 0;
publish `x`y!2 0;
publish `x`y!3 0;
// Publishing data with key=1
publish `x`y!1 1;
publish `x`y!2 1;
publish `x`y!3 1;
// Publishing data with key=0 again
publish `x`y!4 0;
2022.03.10D16:28:02.220001229 | 1
2022.03.10D16:28:03.968131241 | 3
2022.03.10D16:28:04.569659787 | 6
2022.03.10D16:28:05.533655577 | 1
2022.03.10D16:28:06.051010015 | 3
2022.03.10D16:28:06.523569938 | 6
2022.03.10D16:28:08.076519264 | 10
In the above example, the .qsp.map
is a stateful operator which outputs the sum of all numbers it has received so far. Since .qsp.read.fromCallback
uses its key
option, data pushed to the pipeline using publish
will have the value of its y
field extracted. This value will be the key of the data. You can see above that when publishing to key 0, the map outputs 1, 3, then 6, but when publishing to key 1, the sum resets and also outputs 1, 3, then 6. This is because the state of the map operator is split between the 2 keys, 0 and 1. Finally, you can see that upon publishing to key 0 again, it continues from where it left off, outputting 10.