Skip to content

Stateful operators

Buffered Messages

Buffer messages until over 1000 have been accumulated, then emit them in a single batch

Operators can be passed the state option to maintain state. This state is specific to that operator. It can hold arbitrary values, and will be recovered if a node restarts.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; metadata; data]
      // Capture a buffer containing the previous state and new message
      state: .qsp.get[op;metadata] , data;
      $[1000 < count state;
        // If we've received 1000 messages, emit the buffer
        [.qsp.set[op; metadata; ()]; .qsp.push[op;metadata;state]];
        // Otherwise, set the new state to include the new message
        .qsp.set[op; metadata; state]]
    };
    .qsp.use``state!(::;()) ]  // Set the initial state to ()
  .qsp.map[{10*x}]
  .qsp.write.toConsole[]

// Buffered data accumulates across multiple calls to publish
publish 500?1f
publish 500?1f

// By pushing the buffer size over 1000, this causes all records to be emitted
publish 1?1f

// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[]

For operators such as map, which only take a data parameter by default, setting the state flag will cause the operator to also pass in op and metadata. These are required to get and set the state.

from random import random

from kxi import sp
import pykx as kx


def buffer(op, md, data):
    buffered = sp.state.get(op, md)
    buffered.extend(data)
    if 1000 < len(buffered):
        sp.state.set(op, md, [], pickle=True)
        return buffered
    # The state is not updated until `set` is called, even when it is a mutable object. This is
    # because the state is stored as pickled bytes to protect it during conversions to/from q.
    sp.state.set(op, md, buffered, pickle=True)
    return []

sp.run(sp.read.from_callback('publish')
    | sp.map(buffer, state=[], pickle=True)
    | sp.map(lambda x: 10 * x)
    | sp.write.to_console())

kx.q('publish', [random() for _ in range(500)])
kx.q('publish', [random() for _ in range(500)]) # Buffered data accumulates across multiple calls to publish
kx.q('publish', [random()]) # By pushing the buffer size over 1000, this causes all records to be emitted
# Example output: 2021.10.12D18:13:53.203632235 | 1.780839 3.017723 7.85033 5.347096..
For operators such as map, which only take a data parameter by default, setting the state argument will cause the operator to also pass in an sp.State object, which is required to get and set the state.

Running Average

Calculates the running average by tracking the sum and count in the state

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.map[
    {[op; metadata; data]
      state: .qsp.get[op;metadata];
      state[`sum]+: sum data;
      state[`count]+: count data;
      .qsp.set[op; metadata; state];
      state[`sum]%state`count
    };
    .qsp.use enlist[`state]!enlist `sum`count!(0f; 0)  / set initial state
  ]
  .qsp.write.toConsole[]

// This can be run repeatedly to calculate the average
// of all records seen thus far
publish 10?1f

.qsp.teardown[]
from random import random

from kxi import sp
import pykx as kx


def running_average(op, md, data):
    s = sp.state.get(op, md)
    s['sum'] += sum(data)
    s['count'] += len(data)
    sp.state.set(op, md, s, pickle=True)
    return s['sum'] / s['count']

sp.run(sp.read.from_callback('publish')
    | sp.map(running_average, state={'sum': 0, 'count': 0}, pickle=True)
    | sp.write.to_console())

# Repeated runs of the following line will cause the output (printed to
# console) to approach 0.5.
kx.q('publish', [random() for _ in range(10)])

sp.teardown()