Skip to content

HTTP publisher for kdb Insights Enterprise stream processing pipelines.

Use HTTPPublisher to publish data to a kdb Insights stream processing pipeline via an httpUpload reader node.

Supports raw bytes, CSV, JSON, and Q/IPC serialization. Chunked file reading avoids loading large datasets into memory when a serialization format is set.

Example
import pandas as pd
from kxi.publish.http_publisher import HTTPPublisher

df = pd.DataFrame({"sym": ["AAPL", "GOOG"], "price": [150.0, 175.0]})

# Publish as Q/IPC binary in chunks
pub = HTTPPublisher("httpUpload_qipc", output="qipc", chunksize=10_000)
pub.publish(df, "trades", finish=True)

# Publish a CSV file
pub = HTTPPublisher("httpUpload_csv", output="csv")
pub.publish("/data/trades.csv", "trades", finish=True)

Classes:

  • HTTPPublisher – HTTP publisher for kdb Insights stream processing pipelines.
  • OutputFormat – Serialization format for HTTP upload.

HTTPPublisher

HTTPPublisher(name, *, coordinator=None, chunksize=None, output=OutputFormat.raw)

HTTP publisher for kdb Insights stream processing pipelines.

Connects to the stream processor coordinator and publishes serialized data into a pipeline via an httpUpload reader node. The node name in the pipeline must match the name parameter.

Supports raw bytes, CSV, JSON, Q/IPC, and custom callable serialization. File and directory publishing with a chunksize avoids loading large datasets into memory.

Example
import pandas as pd
from kxi.publish.http_publisher import HTTPPublisher

df = pd.DataFrame({"sym": ["AAPL", "GOOG"], "price": [150.0, 175.0]})

# Q/IPC binary in chunks
pub = HTTPPublisher("httpUpload_qipc", output="qipc", chunksize=10_000)
pub.publish(df, "trades", finish=True)

# CSV file
pub = HTTPPublisher("httpUpload_csv", output="csv")
pub.publish("/data/trades.csv", "trades", finish=True)

# Custom serialization
pub = HTTPPublisher(
    "httpUpload_custom",
    output=lambda df: df.to_csv(index=False).encode(),
)
pub.publish(df, "trades", finish=True)

Functions:

  • publish – Publish data to a table via the stream processor pipeline.

Connect to the stream processor coordinator.

Parameters:

  • name (str) – Name of the httpUpload reader node in the pipeline. Maps to the endpoint /streamprocessor/<name>.
  • coordinator (SPCoordinatorClient | None) – Stream Processor API connection. If None, a new connection is created using environment variables.
  • chunksize (int | None) – Rows per chunk when reading files. Required when using a serialization format other than raw.
  • output (OutputFormat | Literal['csv', 'json', 'qipc', 'raw'] | Callable[[DataFrame], bytes | str]) – Serialization format or custom callable:

  • "raw" — raw bytes, no serialization (default).

  • "csv" — CSV via DataFrame.to_csv().
  • "json" — JSON records via DataFrame.to_json().
  • "qipc" — Q/IPC binary via pykx serialization.
  • Callable accepting a pandas.DataFrame, returning bytes or str.

publish

publish(data, table=None, *, finish=False)

Publish data to a table via the stream processor pipeline.

Parameters:

  • data (str | Path | DataFrame) – Data source. Accepted types:

  • File path string or pathlib.Path (e.g. "/data/trades.csv")

  • Directory path — all direct child files are published
  • pandas.DataFrame
  • table (str | None) – Destination table name in the kdb Insights database.
  • finish (bool) – Signal the pipeline operator to finalize. Default: False.

OutputFormat

Bases: AutoNameEnum

Serialization format for HTTP upload.

Attributes:

  • csv – CSV serialized via DataFrame.to_csv().
  • json – JSON serialized via DataFrame.to_json(orient='records').
  • qipc – Q/IPC binary serialized via pykx.
  • raw – Raw bytes without serialization (default).
Back to top