Stream Processor Python API
The Stream Processor Python API is provided in the sp
module of the kxi
package. The
Python SDK can be used as part of a Stream Processor deployment or locally for development.
See the installation section below for more details on local deployment.
API Sections
- General - general pipeline interaction functions.
- Lifecycle - event hooks and task management.
- Operators - pipeline building blocks.
- Readers - pipeline data sources
- Decoders - streaming data deserializers
- Encoders - streaming data serializers
- Schema - schema operations
- State - runtime state management
- String - streaming data string operators
- Transform - streaming data transformations
- Windows - windowing aggregations over data streams
- Writers - pipeline data sinks
- Machine Learning - machine learning and analytic functions
Operator syntax
Pipeline operators are designed to be chained together to create a single pipeline. Operators are
joined using a pipe |
syntax. Each operator has a number of required arguments that can be
provided positionally, any optional arguments must use named arguments.
from kxi import sp
from datetime import timedelta
sp.run(sp.read.from_kafka('trades')
| sp.decode.json()
| sp.window.tumbling(timedelta(seconds=5), 'time')
| sp.write.to_stream())
Installation
Use a virtual environment
Python packages should typically be installed in a virtual environment. This can be done with the venv package from the standard library.
Ensure you have a recent version of pip
:
pip install --upgrade pip
Then to install kxi.sp
using the KX PyPI server run the following command:
pip install --extra-index-url=https://$KX_PYPI_USER:$KX_PYPI_PASS@nexus.dl.kx.com/repository/kxi/simple/ kxi.sp
Where the environment variables KX_PYPI_USER
and KX_PYPI_PASS
have been set as appropriate.
Use a local mirror
pip
installations default to using publicily available PyPI packages over ones listed under
an extra index URL. For security, consider creating a local mirror of the kxi
package.
Dependencies
Python Dependencies
kxi.sp
depends on PyKX. Please refer to the
PyKX installation for more details.
kxi.sp
depends on the following third-party Python packages:
joblib==1.1.0
typing-extensions==4.0.1