Skip to content

Functions

Functions for transforming data in a streaming process pipeline.

  • Click to edit node. When a node is clicked, the property panel to the right will be editable.
  • Right-click to rename a node, remove (delete) or duplicate a node. A duplicated node will copy the details of the node, including any defined properties in the original node. The duplicated node will have a -n numeric tag in the name.

Accumulate

Screenshot

Aggregates a stream into an accumulator for each incoming batch.

item description
Initial State The initial state of the accumulator

The aggregator function takes the metadata (md), data (data) and accumulator (acc) and returns an updated accumulator.

Example:

{[md; data; acc]

    }

If the accumulator is a dictionary, it may be necessary to enlist the result in an output function (Transform Output) so the next operator receives a table; for example, {enlist x}.

Apply

Applies a function to incoming data. Useful for asynchronous operations. Data returned by it does not immediately flow through the rest of the pipeline. Instead, the provided function must use .qsp.push to push data through the pipeline when ready.

Example:

// @param data {any} data from the previous node
// @return {any} data to pass to the next node
{[data]
    data
    }

OnFinish

A function to flush the buffer when the pipeline completes or is torn down.

// Pass the operator and metadata

Filter

Filter out elements from a batch, or batches from a stream. The filter is a unary function returning a boolean atom or vector.

Example:

// @param data {any} data from the previous node
// @return {any} data to pass to the next node
{ `trade ~ first x }

Map

Maps data from one value to another while preserving shape. The provided function is given the current batch of data as the first parameter and expected to return data.

Example:

// @param data {any} data 
// @return {boolean | boolean[]} records to keep
{ update price * size from x }

Merge

Merge can be used to join and enrich streams with other streams, with static data, or to union two streams into a single stream.

Merge node function properties.

Example

// We can use `lj` to merge streams if the right stream contains a keyed common column with the left stream.
lj
item description
Flush Indicates which side of the union to flush data from; select between Left, Right, Both or None.
Trigger Indicates when the current join data should be emitted; select between Immediately when either stream gets data, When left stream gets data, When right stream gets data, or When both have data.

Reduce Window

If the number of records exceeds what can be held in memory then it may be necessary to reduce the number of buffered records into a smaller, aggregated value at regular intervals.

item description
Initial State The initial state of the accumulator
{[md; data; acc]

    }

If the accumulator is a dictionary, it may be necessary to enlist the result in an output function (Transform Output) so the next operator receives a table; for example, {enlist x}.

Split

Splits a stream into multiple streams for running separate analytics or processing.

// @param data {any} data from the previous node
// @return {any} data to pass to the next node
{[data]
    data
    }

SQL

Perform an SQL query [query]

Where

  • query is a SQL query, performs it on table data in the stream.

Queries must

Example:

select date, avg(price) from $1 group by date

Union

Unifies multiple streams into a single stream. Similar to a join, with the difference that elements from both sides of the union are left as-is, resulting in a single stream.

Screenshot

item description
Flush Indicates which side of the union to flush data from; select between Left, Right, Both or None.
Trigger Indicates when the current join data should be emitted; select between Immediately when either stream gets data, When left stream gets data, When right stream gets data, or When both have data.