Skip to content

Python Pipeline Walkthrough

Note

If you want to follow this walkthrough from scratch, continue below. To skip ahead or verify your result against a working copy, download the completed example: python-walkthrough.zip, unzip and push it:

    unzip python-walkthrough.zip
    cd python-walkthrough
    kxi pm push .
    kxi pm deploy python-walkthrough

Including a Python Package

This walkthrough uses three PyPI packages to demonstrate dependency management:

  • art — generates ASCII text art from strings.
  • cowsay — renders messages inside ASCII speech bubbles spoken by an animal character.
  • sparklines — renders a list of numbers as an inline ASCII bar chart.

Python dependencies are declared in manifest.yaml under dependencies. The Package Manager resolves and installs them into the Python runtime before the pipeline starts:

name: python-walkthrough
version: 0.0.9
dependencies:
  - art==6.5
  - cowsay==6.1
  - sparklines==1.0.0

The package structure looks like this:

python-walkthrough/
├── manifest.yaml
├── udfs.yaml
├── src/
│   ├── pypeline.py     # pipeline spec
│   └── uda.py          # UDA entrypoints
├── tables/
│   └── mytable.yaml
├── pipelines/
│   └── pypeline.yaml
├── databases/
│   └── python-walkthrough/
│       └── shards/
│           └── python-walkthrough.yaml
└── router/
    └── router.yaml

To make the package's own source files importable within the runtime, add the package path to sys.path at the top of any Python file that needs it:

import os
import sys

sys.path.append(os.getenv("KX_PACKAGE_PATH"))

Note

This sys.path.append call is an explicit opt-in — it is not done automatically. Without it, imports of downloaded Python libraries will fail.

Using Python Packages in a Pipeline

pipelines/pypeline.yaml selects the Python SP worker image with base: py and points at the spec file:

base: py
name: pypeline
type: spec
spec: file://src/pypeline.py
minWorkers: 1
maxWorkers: 10

src/pypeline.py imports dependencies normally and uses the kxi.sp API to build the pipeline graph. This example generates synthetic market ticks on a timer, filters to sales only, enriches with a tax column, windows by time or row count, and writes to the database. The _art pipeline branch uses the art and cowsay packages to print a playful periodic liveness indicator to the console:

import art, cowsay


def to_art(data):
    ...
    output = cowsay.get_output_string(animal, art.text2art(msg, "fancy"))
    ...


_art = ... | sp.map(to_art) | sp.write.to_console(qlog=True)

...  # other pipeline flows defined

sp.run(_sales_db, _sales_log, _art)

Using Python Packages in a UDA

entrypoints in manifest.yaml tells the system which script to load for each process type:

entrypoints:
  data-access: src/uda.py
  aggregator: src/uda.py

src/uda.py defines and registers a UDA that merges the partial results. Just like the pipeline spec, it adds KX_PACKAGE_PATH to sys.path so it can import a downloaded package — here sparklines, used in the aggregation step to render each symbol's recent price series as an inline ASCII trend column:

import os
import sys

sys.path.append(os.getenv("KX_PACKAGE_PATH"))

import pykx as kx
from sparklines import sparklines
from kxi_toolkit import Table, Timestamp, Symbol, select_table, uda, register_udas


@uda.name("salesSummary")
@uda.description("Sales summary by sym: count, avg price, total volume and price trend.")
def sales_summary(
    table: Symbol, startTS: Timestamp = None, endTS: Timestamp = None
) -> Table:
    raw = select_table(table, startTS, endTS)
    return kx.q(
        "{0!select cnt:count i, avg_price:avg price, total_vol:sum size, prices:price by sym from x}",
        raw,
    )


@sales_summary.aggregation
def sales_summary_agg(partials) -> Table:
    combined = kx.q.raze([kx.Table(data=d) for d in partials])
    result = kx.q(
        "{0!select cnt:sum cnt, avg_price:avg avg_price, total_vol:sum total_vol, prices:raze prices by sym from x}",
        combined,
    )
    trend = kx.toq([sparklines(p[-25:])[0] for p in result["prices"].py()])
    result = kx.q("{[t;tr] delete prices from update trend:tr from t}", result, trend)
    return kx.q(".kxi.response.ok", result)


register_udas()

The UDA carries each symbol's price list through as a prices column; the aggregation razes those lists, renders the last 25 points with sparklines, and drops the raw series before returning.

Once deployed, query it:

kxi query uda .pyuda.salesSummary table=mytable
 sym  cnt  avg_price  total_vol                     trend
AAPL 1305   175.1156     646380 ▃▃▂▃▆▁▅▅▄▃▄▃▆▅▆▄▇▅▇▄█▅▂▅█
AMZN 1432   185.0524     734630 ▇▃▇▂▂▇▄▆▁▁▃▇▃█▅▂▁▆▁▄▇▃▃▄█
GOOG 1284   139.9753     649000 ▆▆▃█▄▆▄▁▂▇█▂▄▃▅▁▃▆▅▆▂▃▃▂▂
MSFT 1385   415.2954     702890 ▂▆▇▄▄▃▁▁▄▆▅▆▇▁▅▃▄▃▄▇▃█▇▆█

Troubleshooting

If the package or pipeline is not behaving as expected, use these commands to diagnose the issue.

Check the package is registered:

kxi pm list

You should see python-walkthrough in the table:

╭────────────────────┬─────────┬─────────┬──────┬────────┬──────────┬────────────────┬──────────╮
│ name               │ version │ status  │ data │ access │ owner    │ id             │ size     │
├────────────────────┼─────────┼─────────┼──────┼────────┼──────────┼────────────────┼──────────┤
│ python-walkthrough │ 0.0.9   │ RUNNING │ yes  │ ARWX   │ ataylor1 │ 3b9e28cd-38b2… │   7 KiB  │
╰────────────────────┴─────────┴─────────┴──────┴────────┴──────────┴────────────────┴──────────╯

Check the deployment status:

kxi pm list deployment -f name=python-walkthrough
╭────────────────────┬─────────┬──────┬────────┬──────────┬─────────────────────────────────╮
│ name               │ version │ data │ access │ owner    │ components                      │
├────────────────────┼─────────┼──────┼────────┼──────────┼─────────────────────────────────┤
│ python-walkthrough │ 0.0.9   │ yes  │ ARWX   │ ataylor1 │ pipeline  | pypeline  | RUNNING │
│                    │         │      │        │          │ database  | python-w… | Ready   │
╰────────────────────┴─────────┴──────┴────────┴──────────┴─────────────────────────────────╯

Stream the pipeline worker logs:

kxi obs logs --workload python-walkthrough --container spwork --tail-lines 20

You should see windowed sales batches flushing every 5 seconds:

{"component":"console","container":"spwork","level":"INFO","message":"time                          sym  transaction price  size price_with_tax","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"-------------------------------------------------------------------------","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 AAPL sale        173.25 920  207.9         ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 GOOG sale        138.6  300  166.32        ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}
{"component":"console","container":"spwork","level":"INFO","message":"2026.05.07D13:37:51.500098000 MSFT sale        419.15 210  502.98        ","pod":"python-walkthrough-pypeline-1-spwork-0","time":"2026-05-07T13:37:51.984z"}

Every 10 seconds the _art pipeline branch prints a liveness indicator using the art and cowsay packages:

2026.05.07D13:41:31.994593000   __________________________________
| ρяσcεssιηg мαякεт ∂αтα! (133 яσωs) |
  ==================================
                                       \
                                        \
                                         \
                                          .---.
                                         |o_o |
                                         |:_/ |
                                        //   \ \
                                       (|     | )
                                      /'\_   _/`\
                                      \___)=(___/
Back to top