Skip to content

Operators

An operator is a first-class building block in the stream processor API. Operators are strung together in a user’s program and provide the core functionality for the Stream Processor.

.qsp.accumulate

Aggregates a stream into an accumulator

.qsp.accumulate[fn; initial]
.qsp.accumulate[fn; initial; output]

Parameters:

name type description default
fn function An aggregator which takes the metadata, data, and the accumulator, and returns an updated accumulator. Required
initial any The initial state of the accumulator. Required
output function A function to transform the output before emitting it. It gets passed the value of the accumulator. ::

For all common arguments, refer to configuring operators

Aggregates a stream into an accumulator, which is updated and then emitted for each incoming batch. The value being emitted is passed to the output function, which can modify it before passing it to the next operator. If the accumulator is a dictionary, it may be necessary to enlist the result in the output function so the next operator receives a table.

If no output function is specified, the accumulator will be emitted. If the accumulator is a dictionary, an output function like {enlist x} can be used to emit tables.

This pipeline calculates the running averages for each column.

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.accumulate[{[md; data; acc]
        acc[`total] +: count data;
        acc[`sum] +: sum data;
        acc
        };
        `total`sum!(0; ());
       // The enlist is needed to turn the dictionary into a table
       {enlist x[`sum] % x`total}]
    .qsp.write.toConsole[]

publish each 10 cut ([] x: 100?1f; y: 100?1f)

This pipeline emits the number of times each status has been seen

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.accumulate[{[md; data; acc] acc + count each group data `status};
        `a`b`c!0 0 0;
        // The enlist is needed to turn the dictionary into a table
        {enlist x}]
    .qsp.write.toConsole[]

publish each 10 cut ([] status: 100?`a`b`c)

.qsp.apply

Apply a function to incoming batches in the stream

.qsp.apply[fn]
.qsp.apply[fn; .qsp.use (!) . flip (
    (`onFinish; onFinish);
    (`state   ; state);
    (`params  ; params))]

Parameters:

name type description default
fn function An operator applied to incoming data (see below). Required

options:

name type description default
onFinish function A function run when the pipeline finishes or is torn down, to flush the buffer. It is passed the operator and metadata. None
state any The initial state. ::
params symbol or symbol[] The arguments to pass to fn. `operator`metadata`data

For all common arguments, refer to configuring operators

fn is a ternary function, which is applied to incoming batches in the stream.

The arguments of fn are:

operator
metadata
data

Since apply is most often used with state, the operator and metadata arguments are implicitly added to the user-defined function.

Unlike other operators, apply is asynchronous, and data returned by it does not immediately flow through the rest of the pipeline. Instead, the operator must use .qsp.push to push data through the pipeline when ready.

This is often useful when combined with the state API for implementing custom window operation, running asynchronous external tasks (see task registration), and similar behaviors.

When apply is used to buffer data, the onFinish option should be used to flush the buffer when the pipeline finishes. It will be called once for every key in the state including the default key (empty symbol), with the metadata dictionary passed to it containing only the key.

Buffer events in memory before running an analytic:

.qsp.run
  .qsp.read.fromCallback[`publish]
  // Buffer the stream to 10000 events before pushing downstream
  .qsp.apply[
    {[op; md; data]
      $[10000 <= count state: .qsp.get[op; md] , data;
        // If we've hit 10000 events, clear the state and push the buffer
        [.qsp.set[op; md; ()]; .qsp.push[op; md; state]];
        // Otherwise, update the buffer
        .qsp.set[op; md; state]
        ]
      };
    .qsp.use (!) . flip (
        // Set the default state to empty
        (`state;     ());
        (`onFinish;  {[op; md] .qsp.push[op; md] .qsp.get[op; md]}))
    ]

  // Run an analytic on the buffered stream
  .qsp.map[{ 10*x }]
  // Convert to the (tableName; tableData) format expected by RT
  .qsp.map[{ (`customAnalytic; value flip x) }]
  .qsp.write.toVariable[`output]

publish each 50 cut ([] data: 20000?1f)

Register an asynchronous task for true async:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task which represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        data `url;
        "GET";
        ``callback!(
            ::;
            {[op;md;data;tid;r]
              .qsp.finishTask[op;tid]; //   Mark the task as finished
              data[`response]: r 1;    //   Add GET request response to data
              .qsp.push[op;md;data];   //   Push enriched data to the next operator
            }[op;md;data;tid]
        )
      );
    }
  ]
  .qsp.write.toConsole[];

publish each ([] url: ("https://www.google.ca"; "https://www.example.com"))

.qsp.filter

Filter elements out of a batch, or a batch from a stream

.qsp.filter[fn]
.qsp.filter[fn; .qsp.use (!) . flip (
    (`dropEmptyBatches; dropEmptyBatches);
    (`allowPartials   ; allowPartials);
    (`state           ; state);
    (`params          ; params))]

Parameters:

name type description default
fn function A unary function returning a boolean atom or vector, that either filters elements out of a batch, or filter out a batch in its entirety from a stream. Required

options:

name type description default
dropEmptyBatches boolean 1b to only emit non-empty batches. 0b
allowPartials boolean 1b indicates this filter operation can handle partial batches of data. See below for more details on partial batches. 1b
state any The initial state. ::
params symbol or symbol[] The arguments to pass to fn `data

For all common arguments, refer to configuring operators

The fn function will be called on each batch in the stream. If the result is a boolean

  • vector, only records that it flags in the batch progress through the stream
  • atom, all records in the batch progress or are discarded according to the flag

Partial data batches

Some source nodes can push partial sets of data through a pipeline to reduce overall batch sizes. For example, a file reader might break the file down into smaller chunks and push each chunk through the pipeline. If the current operator requires an entire batch of data (ex. an entire file) then set allowPartials to 0b to force the batch to be buffered to completion before running this operator.

Filter for a single table:

.qsp.run
  .qsp.read.fromStream[":tp:5000"]
  // Values come in as (tableName; tableValue), so select the desired
  // table name
  .qsp.filter[{ `trade ~ first x }]
  // After the filter, there are only tuples corresponding to trades
  // in the stream, so the `tableName` field can be discarded
  .qsp.map[{ x 1 }]
  .qsp.write.toConsole[]

.qsp.map

Apply a function to data passing through the operator

.qsp.map[fn]
.qsp.map[fn; .qsp.use (!) . flip (
    (`allowPartials; allowPartials);
    (`state        ; state);
    (`params       ; params))]

Parameters:

name type description default
fn function A unary function that is applied it to the data, and returns the result. Required

options:

name type description default
allowPartials boolean 1b indicates this filter operation can handle partial batches of data. See below for more details on partial batches. 1b
state any The initial state. ::
params symbol or symbol[] The arguments to pass to fn. `data

For all common arguments, refer to configuring operators

Partial data batches

Some source nodes can push partial sets of data through a pipeline to reduce overall batch sizes. For example, a file reader might break the file down into smaller chunks and push each chunk through the pipeline. If the current operator requires an entire batch of data (ex. an entire file) then set allowPartials to 0b to force the batch to be buffered to completion before running this operator.

A basic map:

.qsp.run
  .qsp.read.fromCallback[`trade]
  .qsp.map[{ update price * size from x }]
  .qsp.write.toConsole[]

trade ([] price: 10?200f; size: 10?1000)

A stateful map:

.qsp.run
  .qsp.read.fromCallback[`trade]
  // *Note* - using `state` implicitly adds the `operator` and `metadata`
  // arguments required for .qsp.* state APIs.
  .qsp.map[
    {[op;md;data]
      // Retrieve the previous state from the last batch
      previous: .qsp.get[op;md];
      // Calculate size * price for each symbol
      v: select sp:price * size by sym from data;
      // Set the state to include the current batch calculation
      .qsp.set[op; md; previous , v];
      // Send the difference between the current and last batch to
      // any forward operators in the pipeline (here, console writer)
      select from v - previous where sym in key[v]`sym
    };
    .qsp.use``state!(::; ([sym:0#`] sp:0#0f))
  ]
  .qsp.write.toConsole[]

trade each 1 cut ([]
    sym: `ABC`ABC`XYZ`XYZ`ABC`XYZ;
    price: 200 203 53 52 190 55;
    size: 100 100 200 400 100 150)

Retrieving metadata within a map:

.qsp.run
  .qsp.read.fromCallback[`publish]
  // Group events into logical groups every 5 seconds of event time
  // based on the 'timestamp' column in the data to indicate which
  // window an event should belong
  .qsp.window.tumbling[00:00:05; `time]
  .qsp.map[
    {[md;x]
      // Add the start of the window to the batched event data
      update start: md`window from x
    };
    .qsp.use``params!(::; 1#`metadata)
  ]
  .qsp.write.toConsole[]

publish ([] time: 0p + 00:00:01 * 0 3 5 7 9 11; data: til 6)

.qsp.merge

Merge two data streams

.qsp.merge[stream; function]
.qsp.merge[stream; function; .qsp.use (!) . flip (
    (`flush  ; flush);
    (`trigger; trigger);
    (`concat ; concat);
    (`state  ; state);
    (`params ; params))]

Parameters:

name type description default
stream pipeline A separate pipeline to merge. Required
function function A function of the two streams to combine both into one. Required

options:

name type description default
flush symbol Indicates which side of the merge operation to flush data from. left
trigger function or symbol This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. both
concat function A function to update the buffer with incoming data. Append incoming data
state any The initial state. ::
params symbol or symbol[] The arguments to pass to fn. `left`right

For all common arguments, refer to configuring operators

merges stream to the current stream using fn.

Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream. The stream to which .qsp.merge is appended is the left stream, and the stream included as a parameter is the right stream.

The merge function is passed the buffered values for the two streams, to merge them into an output batch. It must return the pair (metadata; data).

The flush option indicates which side of the merge operation to flush data from. Data is flushed after a merge has been performed meaning that once the merge is complete, the indicated stream is cleared for the next merge operation. Data that is not flushed in the join will be buffered indefinitely. This value can be one of the following symbols: - left - Only flush data from the left stream - right - Only flush data from the right stream - both - Flush both streams - none - Don't flush any data

The trigger option can be a function or expression that evaluates to a boolean, indicating if the merge function should be run. It is passed two arguments: the buffers for the left and right streams. These are each pairs of (metadata; data). trigger can also be one of the following symbols: - immediate - Emit on any input - both - Emit when both streams have buffered data - left - Emit when the left stream has buffered data - right - Emit when the right stream has buffered data

The concat option is a function applied to an incoming stream to concatenate data from the same stream. It is passed two arguments: the previously cached state, and the incoming message, both as (metadata; data) tuples.

The params list can be any subset left,right, leftMeta, rightMeta, metadata, and operator, to specify what is passed to the merge function.

Join last quote on each trade:

// Create a data flow for quote updates
quotes: .qsp.read.fromCallback[`updQuote]
  // A stateful map to hold the last seen quote for each symbol
  .qsp.map[
    {[o;m;z]
      // Update the state with the last value for each symbol from the batch
      // The '.qsp.set' returns the data set, forwarding it on to downstream
      // operators.
      .qsp.set[o;m] .qsp.get[o;m] upsert select price by sym from z
    };
    .qsp.use``state!(::; ()) ]

// Create a data stream for trade updates
quoteAsOfTrade: .qsp.read.fromCallback[`updTrade]
  // Left join the last quote for each sym onto each new trade in the stream
  // Since the updates from the quoteStream are keyed, the buffer will be updated
  // with the latest data, and doesn't need to be re-keyed before joining with `lj`.
  .qsp.merge[quotes; lj]
  .qsp.write.toConsole[]

.qsp.run quoteAsOfTrade

updQuote ([sym: `ABC`XYZ] time: 2001.01.01D12:00:00; price: 45 72)
updTrade ([] sym: `ABC`XYZ; size: 1000 5000)
                             | sym size price
-----------------------------| --------------
2022.03.03D19:40:06.743993600| ABC 1000 45
2022.03.03D19:40:06.743993600| XYZ 5000 72

.qsp.reduce

Aggregate partial windows

.qsp.reduce[fn; initial]
.qsp.reduce[fn; initial; output]

Parameters:

name type description default
fn function An aggregator which takes the metadata, data, and the accumulator for a given window, and returns an updated accumulator. Required
initial any The initial state of the accumulator. Required
output function A function to transform the output before emitting it. It gets passed the value of the accumulator. ::

For all common arguments, refer to configuring operators

Aggregates partial windows.

A window may include more records than can fit in memory. As such, it may be necessary to reduce the buffered records into a smaller, aggregated value at regular intervals. If the window operator uses the countTrigger option, a partial window will be emitted when the number of buffered records exceeds the countTrigger. Partial windows will also be emitted when a stream goes idle. These partial windows can be aggregated using .qsp.reduce. When the window is complete, .qsp.reduce will emit the result of reducing the partial windows.

The reduce operator runs a function on each incoming batch to update the accumulator for that window. Each window has a separate accumulator. For partial windows, such as those emitted by countTrigger or idle streams, the accumulator will be updated but not emitted. When a window is closed, such as when the high-water mark passes the end of the window, the accumulator will be updated for that final batch, and then it will be emitted.

If no output function is specified, the accumulator will be emitted. If the accumulator is a dictionary, an output function like {enlist x} can be used to emit tables.

Any partial windows will be emitted on teardown.

This pipeline calculates the average of the val column. There are 1000 records per window, but the reduction is run whenever the buffer size reaches 100 records.

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.window.tumbling[00:00:01; `time; .qsp.use ``countTrigger!0 100]
    .qsp.reduce[{[md; data; acc]
        acc[`total] +: count data;
        acc[`sum] +: sum data `val;
        acc[`window]: md `window;
        acc
        };
        `total`sum`window!(0; 0f; ::);
        {enlist `startTime`average!(x`window; x[`sum] % x`total)}]
    .qsp.write.toConsole[]

publish each 10 cut ([] time: .z.p + 00:00:00.001 * til 10000; val: 10000?1f)
// Set a new high-water mark to close out earlier windows
publish ([] time: .z.p + 00:00:10; val: 1?1f)

.qsp.split

Split the current stream

.qsp.split[]

For all common arguments, refer to configuring operators

The split operator allows a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.

Split operators can either be explicitly added, or are implicit if the same operator appears as a parent multiple times when resolving all streams given to .qsp.run into a single DAG before running.

Explicit split operator:

streamA: .qsp.read.fromCallback[`publish]
  .qsp.map[{x + 1}]
  .qsp.split[]

streamB: streamA .qsp.map[{10 * x}] .qsp.write.toConsole[]
streamC: streamA .qsp.map[{x - 1}] .qsp.write.toVariable[`output]

.qsp.run (streamB; streamC)

publish ([] data: til 10)

Implicit split operator:

streamA: .qsp.read.fromCallback[`publish]
  .qsp.map[{x + 1}]

streamB: streamA .qsp.map[{10 * x}] .qsp.write.toConsole[]
streamC: streamA .qsp.map[{x % 2}] .qsp.write.toVariable[`output]

.qsp.run (streamB; streamC)

publish ([] data: til 10)

.qsp.sql

Perform an SQL query

.qsp.sql[query;schema]

Parameters:

name type description default
query string A SQL query, performs it on table data in the stream. Required
schema table, dictionary or :: The schema of the incoming data. Either an empty table representing the schema of the data table, a dictionary of column names and their type character, or null to infer the schema. ::

For all common arguments, refer to configuring operators

Queries must

If data in the stream is not table data, an error will be signaled. Choosing to pass a schema will allow the query to be precompiled enabling faster processing on streaming data.

Queries run in a local worker process

SQL queries are not distributed across workers, and are currently run on each worker's substream

Select average price by date:

// Generate some random data with dates and prices
n: 100
t: ([] date: n?.z.d-til 3; price: n?1000f)

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.sql["select date, avg(price) from $1 group by date"; 0#t]
  .qsp.write.toConsole[]

publish t
                             | date       price
-----------------------------| -------------------
2021.07.26D18:28:29.377228300| 2021.07.24 509.2982
2021.07.26D18:28:29.377228300| 2021.07.25 455.0621
2021.07.26D18:28:29.377228300| 2021.07.26 507.9869

Select using q date parameter:

// Generate some random data with dates and prices
n: 100
t: ([] date: n?.z.d-til 3; price: n?1000f)

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.sql["select avg(price) from $1 where date = $2"; 0#t
    .qsp.use``args!(::;enlist .z.d)]
  .qsp.write.toConsole[]

publish t
                             | price
-----------------------------| --------
2021.07.26D18:31:20.854529500| 489.4781

Select the average price over 2-second windows:

schema: ([] time:`timestamp$(); price: `float$());

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.window.tumbling[00:00:02; `time]
  .qsp.sql["select min(time) as start_time, max(time) as end_time, avg(price) from $1"; schema]
  .qsp.write.toConsole[]

publish `time xasc ([] time: 100?.z.p+0D00:00:01*til 20; price: 100?1000f)
                             | start_time                    end_time                      price
-----------------------------| --------------------------------------------------------------------
2021.07.27D10:27:38.432116100| 2021.07.27D10:27:38.409337200 2021.07.27D10:27:39.409337200 407.0665
                             | start_time                    end_time                      price
-----------------------------| --------------------------------------------------------------------
2021.07.27D10:27:38.446046200| 2021.07.27D10:27:40.409337200 2021.07.27D10:27:41.409337200 528.9845
                             | start_time                    end_time                      price
-----------------------------| ------------------------------------------------------------------
2021.07.27D10:27:38.462265300| 2021.07.27D10:27:42.409337200 2021.07.27D10:27:43.409337200 387.83
 ..

.qsp.union

Unite two streams

.qsp.union[stream]
.qsp.union[stream; .qsp.use (!) . flip (
     (`flush  ; flush);
     (`trigger; trigger))]
name type description default
stream pipeline The stream to unite with the current stream. Required

options:

name type description default
flush symbol Indicates which side of the merge operation to flush data from. left
trigger function or symbol This defines when the merge function should be run. It can be a custom function run on the buffered data, or a symbol to use a predefined trigger. immediate

For all common arguments, refer to configuring operators

Unite stream and the current stream into a single stream.

This is similar to a join, with the difference that elements from both sides of the union are left as-is, resulting in a single stream.

To enable .qsp.union

Unions are non-deterministic, as it is unknown in which order the sources will be read from. Set KXI_ALLOW_NONDETERMINISM to "true" to allow enable this operator.

Simple union of two streams:

streamA: .qsp.read.fromCallback[`callbackA] .qsp.map[{ enlist(`a;x) }]
streamB: .qsp.read.fromCallback[`callbackB] .qsp.map[{ enlist(`b;x) }]
streamC: streamA .qsp.union[streamB] .qsp.write.toConsole[]

.qsp.run streamC

callbackA 1; callbackB 1; callbackA 1
2021.06.28D21:40:07.939960300 | `a 1
2021.06.28D21:40:08.204401700 | `b 1
2021.06.28D21:40:08.910553600 | `a 1