Skip to content

General

Python interface for the KX Stream Processor.

This Python package, kxi, provides Python modules which are libraries for interacting with KX Insights. This module, sp, leverages PyKX to provide a rich Python interface to the KX Stream Processor.

Regardless of whether the Python interface or the q interface is used, the core streaming engine of the Stream Processor still uses kdb+, and benefits from its high performance.

All operators return a Pipeline instance. Pipelines can be chained together using the pipe operator: | (which is an overload of Python's bitwise-or operator). For

Examples:

>>> sp.map(transform) | sp.map(analytic)
<kx.sp.Pipeline>

All operators can be supplied with their documented parameters, but also can be given a few extra parameters, also called the common operator configuration. This common configuration is made up of the following parameters:

  • name (Optional[str]): A name/identifier for the operator. If it is None, then a unique identifier will be generated based on what kind of operator it is (e.g. map, to_console, timer, etc.), what its configuration is, and a sequential discriminator. This name/identifier can be used for various Stream Processor functions that require you to identify the operator.
  • params (List[OperatorParams]): The parameters that will be provided by the SP to the function you provide to the operator, when applicable. The valid options are members of the OperatorParams enum, which can be specified as strings or enum member objects. Defaults to ['data'], unless an initial state is provided, in which case the default is ['operator', 'metadata', 'data'], as the operator and metadata dictionaries are required to use the state API, and it is assumed that you will want to use the state API if you set the initial state.
  • state (Any): The initial state for the operator. Refer to the sp.state documentation.

kxi.sp.push

Publish data to all downstream operators in the pipeline.

Parameters:

Name Type Description Default
operator types.OperatorSpecifier

The operator, specified as a configuration dictionary or name.

required
metadata types.Metadata

Metadata for the message being emitted from the operator.

required
data Any

The data to be pushed to the downstream pipeline.

required

kxi.sp.teardown

Tear down all running pipelines.

kxi.sp.config_path

Returns the location of user mounted configurations.

Parameters:

Name Type Description Default
obj Union[str, bytes]

A string name of the configuration object to get the path of.

''

Returns:

Type Description
Path

The location of user mounted configurations.

kxi.sp.run

Run pipelines.

Examples:

Running a pipeline that reads from a nullary function, then writes to the console:

>>> sp.run(sp.read.from_expr(lambda: range(10)) | sp.write.to_console(timestamp='none'))
0 1 2 3 4 5 6 7 8 9

Running multiple pipelines:

>>> pipeline_a = sp.read.from_expr(lambda: range(10)) | sp.write.to_console('A) ', timestamp='none')
>>> pipeline_b = sp.read.from_expr(lambda: range(20)) | sp.write.to_console('B) ', timestamp='none')
>>> pipeline_c = sp.read.from_expr(lambda: range(30)) | sp.write.to_console('C) ', timestamp='none')
>>> sp.run(pipeline_a, pipeline_b, pipeline_c)
A) 0 1 2 3 4 5 6 7 8 9
B) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
C) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

Argument unpacking can be used to run a list/tuple of pipelines:

>>> from random import sample
>>> source = sp.read.from_expr(lambda: sample(range(20, 50, 2), 10))
>>> pipelines = (
...     source | sp.map(lambda x: x * 2) | sp.write.to_console('double  ) ', timestamp='none'),
...     source | sp.write.to_console('original) ', timestamp='none'),
...     source | sp.map(lambda x: x // 2) | sp.write.to_console('half    ) ', timestamp='none'),
... )
>>> sp.run(*pipelines)
double  ) 68 72 84 52 44 88 96 80 56 48
original) 34 36 42 26 22 44 48 40 28 24
half    ) 17 18 21 13 11 22 24 20 14 12

kxi.sp.Pipeline

Thin Python wrapper around a q Stream Processor pipeline.

This class provides special handling for the | operator. When given a Pipeline instance on either side, it produces a new Pipeline instance that is comprised of the operators from the pipeline on the left connected in series with those from the pipeline on the right.

The | operator does not modify a pipeline definition in-place. Pipelines are immutable.

See also: splitting a pipeline, merging two pipelines with a joining function, and unifying two pipelines

When the right argument of | is a list of pipelines, and the left argument is a single pipeline, the result is a list where each element is a pipeline with the left argument joined to one of the elements from the list on the right.

When the left argument of | is a list of pipelines, and the right argument is a single pipeline, the result is a single pipeline obtained by taking the union of every pipeline from the list with the pipeline on the right.

Examples:

Join a reader to a map operation, then join that pipeline to a writer, and run the resulting pipeline:

>>> sp.run(sp.read.from_expr(lambda: range(10, 30, 2))
    | sp.map(lambda x: x*x)
    | sp.write.to_console(timestamp='none'))
100 144 196 256 324 400 484 576 676 784

Join a reader to multiple map operations, and then have them all output to the same writer:

>>> reader = sp.read.from_expr(lambda: range(10))
>>> maps = (sp.map(lambda x: x), sp.map(lambda x: x ** 2), sp.map(lambda x: x ** 3))
>>> writer = sp.write.to_console(timestamp='none')
>>> sp.run(reader | maps | writer)
0 1 2 3 4 5 6 7 8 9
0 1 4 9 16 25 36 49 64 81
0 1 8 27 64 125 216 343 512 729

kxi.sp.Pipeline.as_dot: str cached property writable

Provides the graph structure of the pipeline in the DOT format.

kxi.sp.Pipeline.validate

Validate the structure of the pipeline graph.

Exceptions:

Type Description
`pykx.QError`

If the pipeline is not valid; the error message will explain why.