Stateful operators
Buffered Messages
Buffer messages until over 1000 have been accumulated, then emit them in a single batch
Operators can be passed the state
option to maintain state. This state is specific to that operator.
It can hold arbitrary values, and will be recovered if a node restarts.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; metadata; data]
// Capture a buffer containing the previous state and new message
state: .qsp.get[op;metadata] , data;
$[1000 < count state;
// If we've received 1000 messages, emit the buffer
[.qsp.set[op; metadata; ()]; .qsp.push[op;metadata;state]];
// Otherwise, set the new state to include the new message
.qsp.set[op; metadata; state]]
};
.qsp.use``state!(::;()) ] // Set the initial state to ()
.qsp.map[{10*x}]
.qsp.write.toConsole[]
// Buffered data accumulates across multiple calls to publish
publish 500?1f
publish 500?1f
// By pushing the buffer size over 1000, this causes all records to be emitted
publish 1?1f
// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]
For operators such as map, which only take a data
parameter by default, setting the state
flag will cause the operator
to also pass in op
and metadata
. These are required to get and set the state.
from random import random
from kxi import sp
import pykx as kx
def buffer(op, md, data):
buffered = sp.state.get(op, md)
buffered.extend(data)
if 1000 < len(buffered):
sp.state.set(op, md, [], pickle=True)
return buffered
# The state is not updated until `set` is called, even when it is a mutable object. This is
# because the state is stored as pickled bytes to protect it during conversions to/from q.
sp.state.set(op, md, buffered, pickle=True)
return []
sp.run(sp.read.from_callback('publish')
| sp.map(buffer, state=[], pickle=True)
| sp.map(lambda x: 10 * x)
| sp.write.to_console())
kx.q('publish', [random() for _ in range(500)])
kx.q('publish', [random() for _ in range(500)]) # Buffered data accumulates across multiple calls to publish
kx.q('publish', [random()]) # By pushing the buffer size over 1000, this causes all records to be emitted
# Example output: 2021.10.12D18:13:53.203632235 | 1.780839 3.017723 7.85033 5.347096..
data
parameter by default, setting the state
argument will cause the operator to also pass in an sp.State
object, which is required to get and set the state.
Running Average
Calculates the running average by tracking the sum and count in the state
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[
{[op; metadata; data]
state: .qsp.get[op;metadata];
state[`sum]+: sum data;
state[`count]+: count data;
.qsp.set[op; metadata; state];
state[`sum]%state`count
};
.qsp.use enlist[`state]!enlist `sum`count!(0f; 0) / set initial state
]
.qsp.write.toConsole[]
// This can be run repeatedly to calculate the average
// of all records seen thus far
publish 10?1f
.qsp.teardown[]
from random import random
from kxi import sp
import pykx as kx
def running_average(op, md, data):
s = sp.state.get(op, md)
s['sum'] += sum(data)
s['count'] += len(data)
sp.state.set(op, md, s, pickle=True)
return s['sum'] / s['count']
sp.run(sp.read.from_callback('publish')
| sp.map(running_average, state={'sum': 0, 'count': 0}, pickle=True)
| sp.write.to_console())
# Repeated runs of the following line will cause the output (printed to
# console) to approach 0.5.
kx.q('publish', [random() for _ in range(10)])
sp.teardown()