Skip to content

Operators

An operator is a first class building block in the stream processor API. Operators are strung together in a user's program and provide the core functionality for the Stream Processor.

.qsp.apply

Applies a function to incoming batches in the stream. Unlike other operators, apply is asynchronous, and data returned by it does not immediately flow through the rest of the pipeline. Instead, the apply operator must use .qsp.push to push data through the pipeline when ready.

This is often useful when combined with the state API for implementing custom window operation, running asynchronous external tasks (see task registration), and similar behaviors.

Since apply is most often used with state, the operator and metadata argument are implicitly added to the user defined function.

Example: Buffer events in-memory before running an analytic

 .qsp.run
    .qsp.read.fromCallback[`publish]
    // Buffer the stream to 10000 events before pushing downstream
    .qsp.apply[{[op; md; data]
        $[10000 <= count state: .qsp.get[op;md] , data;
            // If we've hit 10000 events, clear the state and push the buffer
            [.qsp.set[op; md; ()]; .qsp.push[op;md;state]];
            // Otherwise, update the buffer
            .qsp.set[op; md; state]]
        }; .qsp.use``state!(::;())]    // Set the default state to empty
    // Run an analytic on the buffered stream
    .qsp.map[{ 10*x }]
    // Convert to the (tableName; tableData) format expected by RT
    .qsp.map[{ (`customAnalytic; value flip x) }]
    .qsp.write.toRT[":tp:5000"]

Example: Register an asynchronous task for true async using apply

 .qsp.run
     .qsp.read.fromCallback[`publish]
     .qsp.apply[{[op; md; data]
         tid: .qsp.registerTask[op]; // Register a task which represents the unfinished async kurl GET request
         .kurl.async ("https://www.google.ca";"GET";``callback!(::;{[op;md;data;tid;r] // GET request with a callback
             .qsp.finishTask[op;tid]; // Mark the task as finished
              data[`response]: r 1;   // Add GET request response to data
             .qsp.push[op;md;data];   // Push enriched data to the next operator
             }[op;md;data;tid]));
         }]
     .qsp.write.toConsole[];

.qsp.filter

Either filter elements out of a batch, or filter out a batch in its entirety from a stream.

The function argument will be called on each batch in the stream, and has the following behaviors:

  • if the return is a list of booleans, records in the batch with the corresponding to 1b values in the return will progress through the stream
  • if the return is scalar boolean, the batch as a whole progresses (1b) or is discarded (0b)

Signature:

    .qsp.filter[f]

        - f - Function to apply to incoming data (any -> (boolean | boolean[]))

Example: Filter for a single table

 .qsp.run
     .qsp.read.fromRT[":tp:5000"]
     // Values come in as (tableName; tableValue), so select the desired
     // table name
     .qsp.filter[{ `trade ~ first x }]
     // After the filter, there are only tuples corresponding to trades
     // in the stream, so the `tableName` field can be discarded
     .qsp.map[{ x 1 }]
     .qsp.write.toConsole[]

.qsp.map

Applies a function to data passing through the operator. The return of the function is passed to the next operator in the pipeline.

Signature

    .qsp.map[f]

        - f - Function to apply to incoming data

Example: A basic map

 .qsp.read.fromCallback[`trade]
     .qsp.map[{ update price * size from x }]
     .qsp.write.toConsole[]

Example: A stateful map

 .qsp.read.fromCallback[`trade]
     // *Note* - using `state` implicitly adds the `operator` and `metadata`
     // arguments required for .qsp.* state APIs.
     .qsp.map[{[op;md;trade]
          // Retrieve the previous state from the last batch
          previous: .qsp.get[op;md];
          // Calculate size * price for each symbol
          v: select sp:price * size by sym from trade;
          // Set the new state to the current batch calculation
          .qsp.set[op; md; v];
          // Send the difference between the current and last batch to
          // any forward operators in the pipeline (here, console writer)
          v - previous
          }; .qsp.use``state!(::; ([sym:0#`] sp:0#0f))]
     .qsp.write.toConsole[]

Example: Retrieving metadata within a map

 .qsp.read.fromCallback[`trade]
     // Group events into logical groups every 5 seconds of event time
     // based on the 'timestamp' column in the data to indicate which
     // window an event should belong
     .qsp.window.tumbling[00:00:05; {y`timestamp}]
     .qsp.map[{[md;x]
          // Add the start of the window to the batched event data
          update start: md`window from x
          }; .qsp.use``params!(::; 1#`metadata))]
     .qsp.write.toConsole[]

.qsp.merge

Merge two data streams using an arbitrary merge function. Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream.

Signature:

    .qsp.merge[stream; f]

        - stream - a separate pipeline to merge
        - f      - a function of the two streams to combine both into one

Example: Join last quote on each trade

 // Create a data flow for quote updates
 quotes: .qsp.read.fromCallback[`updQuote]
     // A stateful map to hold the last seen quote for each symbol
     .qsp.map[{[o;m;x]
         // Update the state with the last value for each symbol from the batch
         // The '.qsp.set' returns the data set, forwarding it on to downstream
         // operators.
         .qsp.set[o;m] .qsp.get[o;m] upsert select by sym from x
          }; .qsp.use``state!(::; ())]

 // Create a data stream for trade updates
 quoteAsOfTrade: .qsp.read.fromCallback[`updTrade]
     // Left join the last quote for each sym onto each new trade in the stream
     // Since the updates from the quoteStream are keyed, the buffer will be updated
     // with the latest data, and doesn't need to be re-keyed before joining with `lj`.
     .qsp.merge[quotes; lj]
     .qsp.write.toConsole[]

 .qsp.run quoteAsOfTrade

.qsp.split

Split operators allow a single stream to be split into arbitrarily many separate streams for running separate analytics or processing.

Split operators can either be explicitly added, or are implicit if the same operator appears as a parent multiple times when resolving all streams given to .qsp.run into a single DAG before running.

Signature:

    .qsp.split[]

Example: Explicit split operator

 streamA: .qsp.read.fromCallback[`read;`publish]
     .qsp.map[{x + 1}]
     .qsp.split[]

 streamB: streamA .qsp.map[{10 * x}];
 streamC: streamA .qsp.map[{x - 1}];

 .qsp.run (streamB; streamC)

Example: Implicit split operator


 streamA: .qsp.read.fromCallback[`read; `publish]
    .qsp.map[{x + 1}];

 streamB: streamA .qsp.map[{10 * x}];
 streamC: streamA .qsp.map[{x - 1}];

 .qsp.run (streamB; streamC)

.qsp.union

Union two streams into a single stream. This is similar to a join, with the difference that elements from both sides of the union are left as-is, resulting in a single stream.

Signature:

    streamA .qsp.union[streamB]

        - streamA - the first pipeline
        - streamB - the second pipeline

Example: Simple union of two streams

 streamA: .qsp.read.fromCallback[`callbackA] .qsp.map[{ enlist(`a;x) }]
 streamB: .qsp.read.fromCallback[`callbackB] .qsp.map[{ enlist(`b;x) }]
 streamC: streamA .qsp.union[streamB] .qsp.write.toConsole[]

 .qsp.run streamC;

 callbackA 1; callbackB 1; callbackA 1
 // 2021.06.28D21:40:07.939960300 | `a 1
 // 2021.06.28D21:40:08.204401700 | `b 1
 // 2021.06.28D21:40:08.910553600 | `a 1