Configuring operators
q interface
The .qsp.use
API modifies the behavior of an operator by configuring it with optional parameters.
Most operators have a set of required values defined in their function signature.
Optional parameters are provided to an operator using a dictionary parameter to .qsp.use
.
.qsp.use opts
Parameters:
name | type | description | default |
---|---|---|---|
opts | dictionary | Custom configuration options. | Required |
Returns:
A dictionary of custom configuration options.
Operator Options
Many operators have options which are not positional arguments. Instead, they are included as a dictionary as the last argument, with .qsp.use
marking it as a special parameter. .qsp.use
can also be used to pass in positional arguments.
Examples:
In the following expressions, sort
is an options which must be passed in using the .qsp.use
dictionary. period
and function
can be positional arguments, or passed in as options. As such, the following expressions are equivalent.
.qsp.window.tumbling[00:00:01; `time; .qsp.use ``sort!11b]
.qsp.window.tumbling[.qsp.use (!) . flip (
(`period; 00:00:01);
(`timeColumn; `time);
(`sort; 1b))];
Common Options
There are three options common to many operators. name
can be used by all operators,
and state
and params
can be used by any operator referencing them in their options
list.
state
sets the initial state for stateful operators. This state can be retrieved
or modified using .qsp.get
and .qsp.set
.
params
is a symbol or list of symbols specifying which parameters are passed to an
operator's main function, as well as their order. These can be any of operator
,
metadata
, or data
, and can also include operator specific parameters, if they are
specified in that operator's documentation.
For operators where params
defaults to `data
, including a state
option will
change the default params
to `operator`metadata`data
.
Note that metadata
cannot currently be used as the sole parameter. A second data
parameter will be implicitly added.
name
is common to all operators and can be used to set the id of an operator.
Create a pipeline that creates a publish
callback, and sends data to a stateful map:
// Create some example data
data: ([] timestamp: .z.p + 00:00:00.1 * til 30; price: 30?100)
// The metadata for a node can be retrieved with the `params` option:
addWindowStart: {[md; data] update start: md`window from data }
// Take a running total of the window and maintain the state
aggregate: {[op; md; data] .qsp.set[op; md] .qsp.get[op; md] + sum data`price };
// The metadata added by the window operator can be accessed by the map
// by specifying the `params` field, adding the `md` (metadata) argument
// to the user defined function.
stream: .qsp.read.fromCallback[`publish]
.qsp.window.tumbling[00:00:01; `timestamp]
.qsp.map[addWindowStart; .qsp.use``params!(::; `metadata`data)];
// Runs the stream with a split to print the intermediate results as well as an
// aggregated result.
.qsp.run (
stream .qsp.write.toConsole[];
stream
When `state` is set, the operator becomes stateful, and its function is automatically
provided the operator and metadata as arguments.
.qsp.map[aggregate; .qsp.use `name`state!(`MyAggregator; 0f)]
.qsp.write.toConsole["Running total: "]
)
publish data
| timestamp price start
-----------------------------| --------------------------------------------------------------------
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209300 51.14424 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209301 48.9294 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209302 78.87772 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209303 40.96236 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209304 76.77878 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209305 16.01048 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209306 99.31063 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209307 11.61313 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209308 37.77495 2021.10.02D11:54:09.614209300
2021.10.02D11:54:10.622299200| 2021.10.02D11:54:09.614209309 97.89015 2021.10.02D11:54:09.614209300
Running total: 2021.10.02D11:54:10.623336100 | 559.2918
Pull Reader Trigger Options
A pull reader is a reader which pulls data, as opposed to having data pushed to it like streaming readers. You can pull from a kdb+ Insights Database, SQL database or from a HTTP(S) endpoint. By default, these readers pull once and are then finished.
This section explains the concept of reader triggering which applies to the pull readers.
The pull readers include:
With the trigger
option you can transform a reader so that the read can be triggered by an API or on a timer, in which case the reader does not finish. This removes the restriction that the reader can only pull once. For example, where you have a long running pipelines that reads from a stream, using a Kafka reader, you may want that joined to more static reference data pulled from a database. If this pipeline runs for multiple days, or longer, the reference data could become outdated and need updated. In such cases you can trigger another read from the database to refresh the data. This can be achieved through triggering.
The trigger option is available to all the pull readers. The trigger can take the following values:
once
- Triggered once on startup and finishes. This is the default behavior if atrigger
is not supplied.api
- Triggered only by an API call, and can be triggered repeatedly. When triggered the reader does not finish automatically. q, python and REST API available.- (
timer
;period
;startAT
) or (`timer
;period
) (startAt
defaulted to now) - Trigger on a timer periodically. This is first triggered atstartAt
if one is provided otherwise it is triggered immediately. It can also be triggered by an API call. When this is used the reader does not finish automatically. period
must be atimespan
in q or adatetime.timedelta
in python.startAt
must be atimestamp
ortime
type in q, and either adatetime.datetime
ordatetime.time
in python.
Example
The following example triggers the expression reader to run at midnight daily:
period: 1D;
startAt: 00:00:00.000;
.qsp.run
.qsp.read.fromExpr["([] sym:10?`KX`FD; price:10?100.0)"; .qsp.use `trigger`name!((`timer; period; startAt); `expr)]
.qsp.write.toVariable[`.test.cache]
Additionally if you want to make sure the reader also runs on startup you can use the q .qsp.triggerRead API, and use it with the .qsp.onStart event handler in global code.
.qsp.onStart[{[] .qsp.triggerRead `expr}]
>>> import pykx as kx
>>> from datetime import datetime, time, timedelta
>>> from kxi import sp
>>> from kxi.sp.read import TriggerMode
>>> expr = '([] sym:10?`KX`FD; price:10?100.0)'
>>> sp.run(sp.read.from_expr(expr, name='myExpr', trigger=[TriggerMode.timer, timedelta(days=1), time(0, 0, 0)])
| sp.write.to_variable('.test.cache'))
If you want to make sure the reader also runs on startup you can use the python sp.trigger_read API with the sp.lifecycle.on_start event handler in global code.
>>> def trigger_start():
sp.trigger_read('myExpr')
>>> sp.lifecycle.on_start(trigger_start)
Python interface
There is no equivalent to the .qsp.use
function in Python as Python allows positional and optional parameters.
For example, in the case of reading from a database, the Python function sp.read.from_database
is defined with optional parameters which allow us to configure the operator:
@Reader
def from_database(sql: Optional[CharString] = None,
uda: Optional[List[Union[CharString, DictSpec]]] = None,
table: Optional[CharString] = None,
start_ts: Optional[TimestampSpec] = None,
end_ts: Optional[TimestampSpec] = None,
filter: Optional[List[List[CharString]]] = None,
group_by: Optional[List[CharString]] = None,
agg: Optional[List[CharString]] = None,
fill: Optional[CharString] = None,
temporality: Optional[CharString] = None,
sort_cols: Optional[List[CharString]] = None,
labels: Optional[DictSpec] = None,
retries: Optional[Union[kx.LongAtom, int]] = None,
retry_wait: Optional[Timedelta] = None) -> Reader
Examples:
Setting the UDA parameter
>>> from kxi import sp
>>> import pykx as kx
# explicitly type the UDA params to avoid issues on the server side
>>> args=['.example.multiplierAPI', {'table': kx.SymbolAtom('weather'),
'column': kx.CharVector('airtemp'), 'multiplier': kx.FloatAtom(10.0)}]
>>> sp.run(sp.read.from_database(uda=args)
| sp.write.to_variable('out'))
>>> kx.q('out')
original_col multiplied_col
---------------------------
46.256 462.56
46.7232 467.232
47.63167 476.3167
..