Python Pipeline Walkthrough¶
Note
If you want to follow this walkthrough from scratch, continue below. To skip ahead or verify your result against a working copy, download the completed example: python-walkthrough.zip, unzip and push it:
unzip python-walkthrough.zip
cd python-walkthrough
kxi pm push .
kxi pm deploy python-walkthrough
Including a Python Package¶
This walkthrough uses three PyPI packages to demonstrate dependency management:
- art — generates ASCII text art from strings.
- cowsay — renders messages inside ASCII speech bubbles spoken by an animal character.
- sparklines — renders a list of numbers as an inline ASCII bar chart.
Python dependencies are declared in manifest.yaml under dependencies. The Package Manager resolves and installs them into the Python runtime before the pipeline starts:
name: python-walkthrough
version: 0.0.9
dependencies:
- art==6.5
- cowsay==6.1
- sparklines==1.0.0
The package structure looks like this:
python-walkthrough/
├── manifest.yaml
├── udfs.yaml
├── src/
│ ├── pypeline.py # pipeline spec
│ └── uda.py # UDA entrypoints
├── tables/
│ └── mytable.yaml
├── pipelines/
│ └── pypeline.yaml
├── databases/
│ └── python-walkthrough/
│ └── shards/
│ └── python-walkthrough.yaml
└── router/
└── router.yaml
To make the package's own source files importable within the runtime, add the package path to sys.path at the top of any Python file that needs it:
import os
import sys
sys.path.append(os.getenv("KX_PACKAGE_PATH"))
Note
This sys.path.append call is an explicit opt-in — it is not done
automatically. Without it, imports of downloaded Python libraries will fail.
Using Python Packages in a Pipeline¶
pipelines/pypeline.yaml selects the Python SP worker image with base: py and points at the spec file:
base: py
name: pypeline
type: spec
spec: file://src/pypeline.py
minWorkers: 1
maxWorkers: 10
src/pypeline.py imports dependencies normally and uses the kxi.sp API to build the pipeline graph.
This example generates synthetic market ticks on a timer, filters to sales only, enriches with a tax
column, windows by time or row count, and writes to the database. The _art pipeline branch uses the art and cowsay packages to print a playful periodic liveness indicator to the console:
import art, cowsay
def to_art(data):
...
output = cowsay.get_output_string(animal, art.text2art(msg, "fancy"))
...
_art = ... | sp.map(to_art) | sp.write.to_console(qlog=True)
... # other pipeline flows defined
sp.run(_sales_db, _sales_log, _art)
Using Python Packages in a UDA¶
entrypoints in manifest.yaml tells the system which script to load for each process type:
entrypoints:
data-access: src/uda.py
aggregator: src/uda.py
src/uda.py defines and registers a UDA that merges the partial results. Just like the pipeline
spec, it adds KX_PACKAGE_PATH to sys.path so it can import a downloaded package — here
sparklines, used in the aggregation step to render each
symbol's recent price series as an inline ASCII trend column:
import os
import sys
sys.path.append(os.getenv("KX_PACKAGE_PATH"))
import pykx as kx
from sparklines import sparklines
from kxi_toolkit import Table, Timestamp, Symbol, select_table, uda, register_udas
@uda.name("salesSummary")
@uda.description("Sales summary by sym: count, avg price, total volume and price trend.")
def sales_summary(
table: Symbol, startTS: Timestamp = None, endTS: Timestamp = None
) -> Table:
raw = select_table(table, startTS, endTS)
return kx.q(
"{0!select cnt:count i, avg_price:avg price, total_vol:sum size, prices:price by sym from x}",
raw,
)
@sales_summary.aggregation
def sales_summary_agg(partials) -> Table:
combined = kx.q.raze([kx.Table(data=d) for d in partials])
result = kx.q(
"{0!select cnt:sum cnt, avg_price:avg avg_price, total_vol:sum total_vol, prices:raze prices by sym from x}",
combined,
)
trend = kx.toq([sparklines(p[-25:])[0] for p in result["prices"].py()])
result = kx.q("{[t;tr] delete prices from update trend:tr from t}", result, trend)
return kx.q(".kxi.response.ok", result)
register_udas()
The UDA carries each symbol's price list through as a prices column; the aggregation
razes those lists, renders the last 25 points with sparklines, and drops the raw series before
returning.
Once deployed, query it:
kxi query uda .pyuda.salesSummary table=mytable
sym cnt avg_price total_vol trend
AAPL 1305 175.1156 646380 ▃▃▂▃▆▁▅▅▄▃▄▃▆▅▆▄▇▅▇▄█▅▂▅█
AMZN 1432 185.0524 734630 ▇▃▇▂▂▇▄▆▁▁▃▇▃█▅▂▁▆▁▄▇▃▃▄█
GOOG 1284 139.9753 649000 ▆▆▃█▄▆▄▁▂▇█▂▄▃▅▁▃▆▅▆▂▃▃▂▂
MSFT 1385 415.2954 702890 ▂▆▇▄▄▃▁▁▄▆▅▆▇▁▅▃▄▃▄▇▃█▇▆█
Troubleshooting¶
If the package or pipeline is not behaving as expected, use these commands to diagnose the issue.
Check the package is registered:
kxi pm list
You should see python-walkthrough in the table:
╭────────────────────┬─────────┬─────────┬──────┬────────┬──────────┬────────────────┬──────────╮
│ name │ version │ status │ data │ access │ owner │ id │ size │
├────────────────────┼─────────┼─────────┼──────┼────────┼──────────┼────────────────┼──────────┤
│ python-walkthrough │ 0.0.9 │ RUNNING │ yes │ ARWX │ ataylor1 │ 3b9e28cd-38b2… │ 7 KiB │
╰────────────────────┴─────────┴─────────┴──────┴────────┴──────────┴────────────────┴──────────╯
Check the deployment status:
kxi pm list deployment -f name=python-walkthrough
╭────────────────────┬─────────┬──────┬────────┬──────────┬─────────────────────────────────╮
│ name │ version │ data │ access │ owner │ components │
├────────────────────┼─────────┼──────┼────────┼──────────┼─────────────────────────────────┤
│ python-walkthrough │ 0.0.9 │ yes │ ARWX │ ataylor1 │ pipeline | pypeline | RUNNING │
│ │ │ │ │ │ database | python-w… | Ready │
╰────────────────────┴─────────┴──────┴────────┴──────────┴─────────────────────────────────╯
Stream the pipeline worker logs:
kxi obs logs --workload python-walkthrough --container spwork --tail-lines 20
You should see windowed sales batches flushing every 5 seconds:
{"component":"console","container":"spwork","level":"INFO","message":"time sym transaction price size price_with_tax","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"-------------------------------------------------------------------------","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 AAPL sale 173.25 920 207.9 ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 GOOG sale 138.6 300 166.32 ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 MSFT sale 419.15 210 502.98 ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
Every 10 seconds the _art pipeline branch prints a liveness indicator using the art and cowsay packages:
2026.05.07D13:41:31.994593000 __________________________________
| ρяσcεssιηg мαякεт ∂αтα! (133 яσωs) |
==================================
\
\
\
.---.
|o_o |
|:_/ |
// \ \
(| | )
/'\_ _/`\
\___)=(___/