Skip to content

Configuring operators

q interface

The .qsp.use API modifies the behavior of an operator by configuring it with optional parameters. Most operators have a set of required values defined in their function signature. Optional parameters are provided to an operator using a dictionary parameter to .qsp.use.

.qsp.use opts

Parameters:

name type description default
opts dictionary Custom configuration options. Required

Returns:

A dictionary of custom configuration options.

Operator Options

Many operators have options which are not positional arguments. Instead, they are included as a dictionary as the last argument, with .qsp.use marking it as a special parameter. .qsp.use can also be used to pass in positional arguments.

Examples:

In the following expressions, sort is an options which must be passed in using the .qsp.use dictionary. period and function can be positional arguments, or passed in as options. As such, the following expressions are equivalent.

.qsp.window.tumbling[00:00:01; `time; .qsp.use ``sort!11b]
.qsp.window.tumbling[.qsp.use (!) . flip (
    (`period;       00:00:01);
    (`timeColumn;   `time);
    (`sort;         1b))];

Common Options

There are three options common to many operators. name can be used by all operators, and state and params can be used by any operator referencing them in their options list.

state sets the initial state for stateful operators. This state can be retrieved or modified using .qsp.get and .qsp.set.

params is a symbol or list of symbols specifying which parameters are passed to an operator's main function, as well as their order. These can be any of operator, metadata, or data, and can also include operator specific parameters, if they are specified in that operator's documentation. For operators where params defaults to `data, including a state option will change the default params to `operator`metadata`data. Note that metadata cannot currently be used as the sole parameter. A second data parameter will be implicitly added.

name is common to all operators and can be used to set the id of an operator.

Create a pipeline that creates a publish callback, and sends data to a stateful map:

// Create some example data
data: ([] timestamp: .z.p + 00:00:00.1 * til 30; price: 30?100)

// The metadata for a node can be retrieved with the `params` option:
addWindowStart: {[md; data] update start: md`window from data }

// Take a running total of the window and maintain the state
aggregate: {[op; md; data] .qsp.set[op; md] .qsp.get[op; md] + sum data`price };

// The metadata added by the window operator can be accessed by the map
// by specifying the `params` field, adding the `md` (metadata) argument
// to the user defined function.
stream: .qsp.read.fromCallback[`publish]
    .qsp.window.tumbling[00:00:01; `timestamp]
    .qsp.map[addWindowStart; .qsp.use``params!(::; `metadata`data)];

// Runs the stream with a split to print the intermediate results as well as an
// aggregated result.
.qsp.run (
    stream .qsp.write.toConsole[];
    stream
        When `state` is set, the operator becomes stateful, and its function is automatically
        provided the operator and metadata as arguments.
        .qsp.map[aggregate; .qsp.use `name`state!(`MyAggregator; 0f)]
        .qsp.write.toConsole["Running total: "]
        )

publish data
                            | timestamp                     price    start
-----------------------------| --------------------------------------------------------------------
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209300 51.14424 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209301 48.9294  2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209302 78.87772 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209303 40.96236 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209304 76.77878 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209305 16.01048 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209306 99.31063 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209307 11.61313 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209308 37.77495 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209309 97.89015 2021.10.02D11:54:09.614209300
Running total: 2021.10.02D11:54:10.623336100 | 559.2918

Pull Reader Trigger Options

A pull reader is a reader which pulls data, as opposed to having data pushed to it like streaming readers. You can pull from a kdb+ Insights Database, SQL database or from a HTTP(S) endpoint. By default, these readers pull once and are then finished.

This section explains the concept of reader triggering which applies to the pull readers.

The pull readers include:

With the trigger option you can transform a reader so that the read can be triggered by an API or on a timer, in which case the reader does not finish. This removes the restriction that the reader can only pull once. For example, where you have a long running pipelines that reads from a stream, using a Kafka reader, you may want that joined to more static reference data pulled from a database. If this pipeline runs for multiple days, or longer, the reference data could become outdated and need updated. In such cases you can trigger another read from the database to refresh the data. This can be achieved through triggering.

The trigger option is available to all the pull readers. The trigger can take the following values:

  • once - Triggered once on startup and finishes. This is the default behavior if a trigger is not supplied.
  • api - Triggered only by an API call, and can be triggered repeatedly. When triggered the reader does not finish automatically. q, python and REST API available.
  • (timer; period; startAT) or (`timer; period) (startAt defaulted to now) - Trigger on a timer periodically. This is first triggered at startAt if one is provided otherwise it is triggered immediately. It can also be triggered by an API call. When this is used the reader does not finish automatically.
  • period must be a timespan in q or a datetime.timedelta in python. startAt must be a timestamp or time type in q, and either a datetime.datetime or datetime.time in python.
Example

The following example triggers the expression reader to run at midnight daily:

period: 1D;
startAt: 00:00:00.000;

.qsp.run
    .qsp.read.fromExpr["([] sym:10?`KX`FD; price:10?100.0)"; .qsp.use `trigger`name!((`timer; period; startAt); `expr)]
    .qsp.write.toVariable[`.test.cache]

Additionally if you want to make sure the reader also runs on startup you can use the q .qsp.triggerRead API, and use it with the .qsp.onStart event handler in global code.

.qsp.onStart[{[] .qsp.triggerRead `expr}]
>>> import pykx as kx
>>> from datetime import datetime, time, timedelta
>>> from kxi import sp
>>> from kxi.sp.read import TriggerMode

>>> expr = '([] sym:10?`KX`FD; price:10?100.0)'
>>> sp.run(sp.read.from_expr(expr, name='myExpr', trigger=[TriggerMode.timer,  timedelta(days=1), time(0, 0, 0)])
        | sp.write.to_variable('.test.cache'))

If you want to make sure the reader also runs on startup you can use the python sp.trigger_read API with the sp.lifecycle.on_start event handler in global code.

>>> def trigger_start():
        sp.trigger_read('myExpr')

>>> sp.lifecycle.on_start(trigger_start)

Python interface

There is no equivalent to the .qsp.use function in Python as Python allows positional and optional parameters.

For example, in the case of reading from a database, the Python function sp.read.from_database is defined with optional parameters which allow us to configure the operator:

@Reader
def from_database(sql: Optional[CharString] = None,
                uda: Optional[List[Union[CharString, DictSpec]]] = None,
                table: Optional[CharString] = None,
                start_ts: Optional[TimestampSpec] = None,
                end_ts: Optional[TimestampSpec] = None,
                filter: Optional[List[List[CharString]]] = None,
                group_by: Optional[List[CharString]] = None,
                agg: Optional[List[CharString]] = None,
                fill: Optional[CharString] = None,
                temporality: Optional[CharString] = None,
                sort_cols: Optional[List[CharString]] = None,
                labels: Optional[DictSpec] = None,
                retries: Optional[Union[kx.LongAtom, int]] = None,
                retry_wait: Optional[Timedelta] = None) -> Reader

Examples:

Setting the UDA parameter

>>> from kxi import sp
>>> import pykx as kx

# explicitly type the UDA params to avoid issues on the server side
>>> args=['.example.multiplierAPI', {'table': kx.SymbolAtom('weather'),
        'column': kx.CharVector('airtemp'), 'multiplier': kx.FloatAtom(10.0)}]
>>> sp.run(sp.read.from_database(uda=args)
        | sp.write.to_variable('out'))
>>> kx.q('out')
original_col multiplied_col
---------------------------
46.256        462.56
46.7232       467.232
47.63167      476.3167
..