Stream Processor Python API
The Stream Processor Python API is provided in the
sp module of the
kxi package. The
Python SDK can be used as part of a Stream Processor deployment or locally for development.
See the installation section below for more details on local deployment.
- General - general pipeline interaction functions.
- Lifecycle - event hooks and task management.
- Operators - pipeline building blocks.
- Readers - pipeline data sources
- Decoders - streaming data deserializers
- Encoders - streaming data serializers
- State - runtime state management
- Transform - streaming data transformations
- Windows - windowing aggregations over data streams
- Writers - pipeline data sinks
- Machine Learning - machine learning and analytic functions
Pipeline operators are designed to be chained together to create a single pipeline. Operators are
joined using a pipe
| syntax. Each operator has a number of required arguments that can be
provided positionally, any optional arguments must use named arguments.
from kxi import sp from datetime import timedelta sp.run(sp.read.from_kafka('trades') | sp.decode.json() | sp.window.tumbling(timedelta(seconds=5), 'time') | sp.write.to_stream())
Use a virtual environment
Python packages should typically be installed in a virtual environment. This can be done with the venv package from the standard library.
Ensure you have a recent version of
pip install --upgrade pip
Then to install
kxi.sp using the KX PyPI server run the following command:
pip install --extra-index-url=https://$KX_PYPI_USER:$KX_PYPI_PASS@nexus.dl.kx.com/repository/pykx/simple/ kxi.sp
Where the environment variables
KX_PYPI_PASS have been set as appropriate.
Use a local mirror
pip installations default to using publicily available PyPI packages over ones listed under
an extra index URL. For security, consider creating a local mirror of the
kxi.sp depends on PyKX. Please refer to the
PyKX installation for more details.
kxi.sp depends on the following third-party Python packages: