Skip to content

Stream Processor Python API

The Stream Processor Python API is provided in the sp module of the kxi package. The Python SDK can be used as part of a Stream Processor deployment or locally for development. See the installation section below for more details on local deployment.

API Sections

  • General - general pipeline interaction functions.
  • Lifecycle - event hooks and task management.
  • Operators - pipeline building blocks.
  • Readers - pipeline data sources
  • Decoders - streaming data deserializers
  • Encoders - streaming data serializers
  • Schema - schema operations
  • State - runtime state management
  • String - streaming data string operators
  • Transform - streaming data transformations
  • Windows - windowing aggregations over data streams
  • Writers - pipeline data sinks
  • Machine Learning - machine learning and analytic functions

Operator syntax

Pipeline operators are designed to be chained together to create a single pipeline. Operators are joined using a pipe | syntax. Each operator has a number of required arguments that can be provided positionally, any optional arguments must use named arguments.

from kxi import sp
from datetime import timedelta

sp.run(sp.read.from_kafka('trades')
    | sp.decode.json()
    | sp.window.tumbling(timedelta(seconds=5), 'time')
    | sp.write.to_stream())

Installation

Use a virtual environment

Python packages should typically be installed in a virtual environment. This can be done with the venv package from the standard library.

Ensure you have a recent version of pip:

pip install --upgrade pip

Then to install kxi.sp using the KX PyPI server run the following command:

pip install --extra-index-url=https://$KX_PYPI_USER:$KX_PYPI_PASS@nexus.dl.kx.com/repository/kxi/simple/ kxi.sp

Where the environment variables KX_PYPI_USER and KX_PYPI_PASS have been set as appropriate.

Use a local mirror

pip installations default to using publicily available PyPI packages over ones listed under an extra index URL. For security, consider creating a local mirror of the kxi package.

Dependencies

Python Dependencies

kxi.sp depends on PyKX. Please refer to the PyKX installation for more details.

kxi.sp depends on the following third-party Python packages:

  • joblib==1.1.0
  • typing-extensions==4.0.1