kxi.sp
Python interface for the KX Stream Processor.
This Python package, kxi
, provides Python modules which are libraries for interacting with
KX Insights. This module, sp
, leverages
PyKX to provide a rich Python interface to the KX Stream
Processor.
Regardless of whether the Python interface or the q interface is used, the core streaming engine of the Stream Processor still uses kdb+, and benefits from its high performance.
All operators return a Pipeline
instance. Pipelines can be chained
together using the pipe operator: |
(which is an overload of Python's bitwise-or operator). For
example:
>>> sp.map(transform) | sp.map(analytic)
<kx.sp.Pipeline>
All operators can be supplied with their documented parameters, but also can be given a few extra parameters, also called the common operator configuration. This common configuration is made up of the following parameters:
name
(Optional[str]
): A name/identifier for the operator. If it isNone
, then a unique identifier will be generated based on what kind of operator it is (e.g.map
,to_console
,timer
, etc.), what its configuration is, and a sequential discriminator. This name/identifier can be used for various Stream Processor functions that require you to identify the operator.params
(List[OperatorParams]
): The parameters that will be provided by the SP to the function you provide to the operator, when applicable. The valid options are members of the OperatorParams enum, which can be specified as strings or enum member objects. Defaults to['data']
, unless an initial state is provided, in which case the default is['operator', 'metadata', 'data']
, as the operator and metadata dictionaries are required to use the state API, and it is assumed that you will want to use the state API if you set the initial state.state
(Any
): The initial state for the operator. Refer to thesp.state
documentation.
push
def push(operator: 'types.OperatorSpecifier', metadata: 'types.Metadata',
data: Any) -> None
Publish data to all downstream operators in the pipeline.
Arguments:
operator
- The operator, specified as a configuration dictionary or name.metadata
- Metadata for the message being emitted from the operator.data
- The data to be pushed to the downstream pipeline.
teardown
def teardown() -> None
Tear down all running pipelines.
set_trace
def set_trace(level: int) -> None
Enables or disables trace logging.
Arguments:
level
- The level of tracing to use. The following levels are available:- Disable trace logging.
- Log data that is passed through readers and writers.
- Log data pushed through buffers.
- Log operator inputs.
- Log state operations.
clear_trace
def clear_trace() -> None
Clears trace level logging and resets logging level.
enable_data_tracing
def enable_data_tracing() -> None
Capture data outputs as they flow through a pipeline.
Data tracing captures data that is flowing in the streaming pipeline. This inserts probes that cache the last value from each operator in the pipeline. If a given operator has an error, the error is also captured and where possible, the data is the input to the operator.
Notes:
Adding data capture to a pipeline may have an impact on the pipeline performance. Data tracing should be reserved for debugging purposes and not used in production deployments where possible.
disable_data_tracing
def disable_data_tracing() -> None
Disables data tracing in the current pipeline.
clear_data_trace
def clear_data_trace() -> None
DEPRECATED Use reset_data_trace() instead
reset_data_trace
def reset_data_trace() -> None
Resets the current data cache state.
get_data_trace
def get_data_trace() -> kx.Dictionary
Returns the data from a data trace at the time of invocation.
set_record_counting
def set_record_counting(level: int) -> None
Sets the level for tracking dataflow in a pipeline.
Arguments:
level
- The level of record counting to perform. The following levels are available:- Disable record counting.
- Count records flowing through readers and writers.
- Count records flowing through all operators.
reset_record_counts
def reset_record_counts() -> None
Resets the current record counts cache.
get_record_counts
def get_record_counts() -> kx.Dictionary
Returns information on the amount of dataflow.
config_path
def config_path(obj: Union[str, bytes] = '') -> Path
Returns the location of user mounted configurations.
Arguments:
obj
- A string name of the configuration object to get the path of.
Returns:
The location of user mounted configurations.
Pipeline Objects
class Pipeline()
Thin Python wrapper around a q Stream Processor pipeline.
This class provides special handling for the |
operator. When given a Pipeline
instance on either side, it produces a new Pipeline
instance that is comprised of the
operators from the pipeline on the left connected in series with those from the pipeline on
the right.
The |
operator does not modify a pipeline definition in-place. Pipelines are immutable.
See also: splitting a pipeline, merging two pipelines with a joining function, and unifying two pipelines
When the right argument of |
is a list of pipelines, and the left argument is a single
pipeline, the result is a list where each element is a pipeline with the left argument joined
to one of the elements from the list on the right.
When the left argument of |
is a list of pipelines, and the right argument is a single
pipeline, the result is a single pipeline obtained by taking the union
of every pipeline from the list with the pipeline on the right.
Examples:
Join a reader to a map operation, then join that pipeline to a writer, and run the resulting pipeline:
```python
>>> sp.run(sp.read.from_expr(lambda: range(10, 30, 2))
| sp.map(lambda x: x*x)
| sp.write.to_console(timestamp='none'))
100 144 196 256 324 400 484 576 676 784
```
Join a reader to multiple map operations, and then have them all output to the same writer:
```python
>>> reader = sp.read.from_expr(lambda: range(10))
>>> maps = (sp.map(lambda x: x), sp.map(lambda x: x ** 2), sp.map(lambda x: x ** 3))
>>> writer = sp.write.to_console(timestamp='none')
>>> sp.run(reader | maps | writer)
0 1 2 3 4 5 6 7 8 9
0 1 4 9 16 25 36 49 64 81
0 1 8 27 64 125 216 343 512 729
```
as_dot
@cached_property
def as_dot() -> str
Provides the graph structure of the pipeline in the DOT format.
validate
def validate() -> None
Validate the structure of the pipeline graph.
Raises:
pykx.QError
- If the pipeline is not valid; the error message will explain why.
run
def run(*pipelines: Pipeline) -> None
Run pipelines.
Examples:
Running a pipeline that reads from a nullary function, then writes to the console:
```python
>>> sp.run(sp.read.from_expr(lambda: range(10)) | sp.write.to_console(timestamp='none'))
0 1 2 3 4 5 6 7 8 9
```
Running multiple pipelines:
```python
>>> pipeline_a = sp.read.from_expr(lambda: range(10)) | sp.write.to_console('A) ', timestamp='none')
>>> pipeline_b = sp.read.from_expr(lambda: range(20)) | sp.write.to_console('B) ', timestamp='none')
>>> pipeline_c = sp.read.from_expr(lambda: range(30)) | sp.write.to_console('C) ', timestamp='none')
>>> sp.run(pipeline_a, pipeline_b, pipeline_c)
A) 0 1 2 3 4 5 6 7 8 9
B) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
C) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
```
Argument unpacking can be used to run a list/tuple of pipelines:
```python
>>> from random import sample
>>> source = sp.read.from_expr(lambda: sample(range(20, 50, 2), 10))
>>> pipelines = (
... source | sp.map(lambda x: x * 2) | sp.write.to_console('double ) ', timestamp='none'),
... source | sp.write.to_console('original) ', timestamp='none'),
... source | sp.map(lambda x: x // 2) | sp.write.to_console('half ) ', timestamp='none'),
... )
>>> sp.run(*pipelines)
double ) 68 72 84 52 44 88 96 80 56 48
original) 34 36 42 26 22 44 48 40 28 24
half ) 17 18 21 13 11 22 24 20 14 12
```
SPModule Objects
class SPModule(ModuleType)
mode
@cached_property
def mode()
The mode of the Stream Processor.
'local'
if operating independently as a local deployment.'cluster'
if operating as part of a larger cluster with a parent controller.