Stateful operators

Operators can be passed the state option to maintain state. This state is specific to that operator. It can hold arbitrary values, and will be recovered if a node restarts.

This example buffers messages until over 1000 have been accumulated, then emits them in a single batch

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.apply[{[op; metadata; data]
        // Capture a buffer containing the previous state and new message
        state: .qsp.get[op;metadata] , data;

        $[1000 < count state;
            // If we've received 1000 messages, emit the buffer
            [.qsp.set[op; metadata; ()]; .qsp.push[op;metadata;state]];

            // Otherwise, set the new state to include the new message
            .qsp.set[op; metadata; state]]

        }; .qsp.use``state!(::;())]  // Set the initial state to ()
    .qsp.map[{10*x}]
    .qsp.write.toConsole[];

// Buffered data accumulates across multiple calls to publish
publish 500?1f;
publish 500?1f;

// By pushing the buffer size over 1000, this causes all records to be emitted
publish 1?1f;

// This closes the pipeline, allowing a new pipeline to be run
.qsp.teardown[];

For operators such as map, which only take a data parameter by default, setting the state flag will cause the operator to also pass in op and metadata. These are required to get and set the state.

This example calculates the running average by tracking the sum and count in the state.

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op; metadata; data]
        state: .qsp.get[op;metadata];
        state[`sum]+: sum data;
        state[`count]+: count data;
        .qsp.set[op; metadata; state];
        state[`sum]%state`count
        }; .qsp.use enlist[`state]!enlist `sum`count!(0f; 0)] / Using .qsp.use to set initial state
    .qsp.write.toConsole[];

// This can be run repeatedly to calculate the average of all records seen thus far
publish 10?1f;

.qsp.teardown[];