Stream Processor Python API

The Stream Processor Python API is provided in the sp module of the kxi package. The Python SDK can be used as part of a Stream Processor deployment or locally for development. See the installation section below for more details on local deployment.

API Sections

  • General - general pipeline interaction functions.
  • Lifecycle - event hooks and task management.
  • Operators - pipeline building blocks.
  • Readers - pipeline data sources
  • Decoders - streaming data deserializers
  • Encoders - streaming data serializers
  • State - runtime state management
  • Transform - streaming data transformations
  • Windows - windowing aggregations over data streams
  • Writers - pipeline data sinks
  • Machine Learning - machine learning and analytic functions

Operator syntax

Pipeline operators are designed to be chained together to create a single pipeline. Operators are joined using a pipe | syntax. Each operator has a number of required arguments that can be provided positionally, any optional arguments must use named arguments.

from kxi import sp
from datetime import timedelta'trades')
    | sp.decode.json()
    | sp.window.tumbling(timedelta(seconds=5), 'time')
    | sp.write.to_stream())


Use a virtual environment

Python packages should typically be installed in a virtual environment. This can be done with the venv package from the standard library.

Ensure you have a recent version of pip:

pip install --upgrade pip

Then to install kxi.sp using the KX PyPI server run the following command:

pip install --extra-index-url=https://$KX_PYPI_USER:$ kxi.sp

Where the environment variables KX_PYPI_USER and KX_PYPI_PASS have been set as appropriate.

Use a local mirror

pip installations default to using publicily available PyPI packages over ones listed under an extra index URL. For security, consider creating a local mirror of the kxi package.


Python Dependencies

kxi.sp depends on PyKX. Please refer to the PyKX installation for more details.

kxi.sp depends on the following third-party Python packages:

  • joblib==1.1.0
  • typing-extensions==4.0.1