HTTP publisher for kdb Insights Enterprise stream processing pipelines.¶
Use HTTPPublisher to publish data
to a kdb Insights stream processing pipeline via an httpUpload reader node.
Supports raw bytes, CSV, JSON, and Q/IPC serialization. Chunked file reading avoids loading large datasets into memory when a serialization format is set.
Example
import pandas as pd
from kxi.publish.http_publisher import HTTPPublisher
df = pd.DataFrame({"sym": ["AAPL", "GOOG"], "price": [150.0, 175.0]})
# Publish as Q/IPC binary in chunks
pub = HTTPPublisher("httpUpload_qipc", output="qipc", chunksize=10_000)
pub.publish(df, "trades", finish=True)
# Publish a CSV file
pub = HTTPPublisher("httpUpload_csv", output="csv")
pub.publish("/data/trades.csv", "trades", finish=True)
Classes:
- HTTPPublisher – HTTP publisher for kdb Insights stream processing pipelines.
- OutputFormat – Serialization format for HTTP upload.
HTTPPublisher¶
HTTPPublisher(name, *, coordinator=None, chunksize=None, output=OutputFormat.raw)
HTTP publisher for kdb Insights stream processing pipelines.
Connects to the stream processor coordinator and publishes serialized data
into a pipeline via an httpUpload reader node. The node name in the
pipeline must match the name parameter.
Supports raw bytes, CSV, JSON, Q/IPC, and custom callable serialization.
File and directory publishing with a chunksize avoids loading large
datasets into memory.
Example
import pandas as pd
from kxi.publish.http_publisher import HTTPPublisher
df = pd.DataFrame({"sym": ["AAPL", "GOOG"], "price": [150.0, 175.0]})
# Q/IPC binary in chunks
pub = HTTPPublisher("httpUpload_qipc", output="qipc", chunksize=10_000)
pub.publish(df, "trades", finish=True)
# CSV file
pub = HTTPPublisher("httpUpload_csv", output="csv")
pub.publish("/data/trades.csv", "trades", finish=True)
# Custom serialization
pub = HTTPPublisher(
"httpUpload_custom",
output=lambda df: df.to_csv(index=False).encode(),
)
pub.publish(df, "trades", finish=True)
Functions:
- publish – Publish data to a table via the stream processor pipeline.
Connect to the stream processor coordinator.
Parameters:
- name (
str) – Name of thehttpUploadreader node in the pipeline. Maps to the endpoint/streamprocessor/<name>. - coordinator (
SPCoordinatorClient | None) – Stream Processor API connection. IfNone, a new connection is created using environment variables. - chunksize (
int | None) – Rows per chunk when reading files. Required when using a serialization format other thanraw. -
output (
OutputFormat | Literal['csv', 'json', 'qipc', 'raw'] | Callable[[DataFrame], bytes | str]) – Serialization format or custom callable: -
"raw"— raw bytes, no serialization (default). "csv"— CSV viaDataFrame.to_csv()."json"— JSON records viaDataFrame.to_json()."qipc"— Q/IPC binary via pykx serialization.- Callable accepting a
pandas.DataFrame, returningbytesorstr.
publish¶
publish(data, table=None, *, finish=False)
Publish data to a table via the stream processor pipeline.
Parameters:
-
data (
str | Path | DataFrame) – Data source. Accepted types: -
File path string or
pathlib.Path(e.g."/data/trades.csv") - Directory path — all direct child files are published
pandas.DataFrame- table (
str | None) – Destination table name in the kdb Insights database. - finish (
bool) – Signal the pipeline operator to finalize. Default:False.
OutputFormat¶
Bases: AutoNameEnum
Serialization format for HTTP upload.
Attributes: