Skip to content

Real-time publisher for kdb Insights Enterprise.

Use RtPublisher to insert data into kdb Insights Enterprise via the RT (real-time) persistence layer using the rtpy package.

Note

Requires kxi-rtpy, available on Linux x86_64 only.

Example
from kxi.publish.rtpublisher import RtPublisher

with RtPublisher(
    config_url="https://my-insights.kx.com/had7t28a2dyga",
    rt_dir="/tmp/rt_data",
) as pub:
    pub.fetch_schemas()
    pub.insert("trace", {"sensorID": [1, 2], "value": [3.14, 2.71]})

Classes:

  • RtPublisher – Real-time publisher for kdb Insights Enterprise.

RtPublisher

RtPublisher(*, config_url=None, rt_dir=None, query_timeout=None, fetch_config_sleep=None, config_max_age=None, local_persistence_period=None, log_level=None, console_log_level=None, ca_info_file=None, dedup_id=None, schemas=None, connection_timeout=60, rt_flush_timeout_secs=30)

Real-time publisher for kdb Insights Enterprise.

Publishes data into kdb Insights Enterprise via the RT persistence layer. Use as a context manager for automatic start/stop lifecycle management.

Note

Requires kxi-rtpy, available on Linux x86_64 only.

Example
from kxi.publish.rtpublisher import RtPublisher

with RtPublisher(
    config_url="https://my-insights.kx.com/had7t28a2dyga",
    rt_dir="/tmp/rt_data",
) as pub:
    pub.fetch_schemas()
    pub.insert("trace", {"sensorID": [1, 2], "value": [3.14, 2.71]})

Functions:

  • create – Create an RtPublisher from explicit stream connection parameters.
  • fetch_schemas – Fetch table schemas from a kdb Insights deployment.
  • insert – Insert data into a named table.
  • start – Start the RT connection.
  • stop – Stop the RT connection.

Initialise the RT publisher.

Parameters:

  • config_url (str | Path | None) – Config URL for the RT connection. Accepts:

  • HTTP URL: https://host/path/to/config-id

  • File URI: file:///home/user/config.json
  • Local path string or pathlib.Path
  • rt_dir (str | None) – Local directory for RT log storage.
  • query_timeout (int | None) – Timeout in seconds for RT queries.
  • fetch_config_sleep (int | None) – Interval in seconds between config fetch retries.
  • config_max_age (int | None) – Maximum age in seconds for a cached config.
  • local_persistence_period (int | None) – RT persistence flush period in seconds.
  • log_level (str | None) – RT log level (e.g. "info", "debug").
  • console_log_level (str | None) – Console log level for RT output.
  • ca_info_file (str | None) – Path to a CA certificate file for TLS connections.
  • dedup_id (str | None) – Publisher deduplication identifier.
  • schemas (dict[str, dict[str, QType]] | None) – Table schemas for type coercion before insertion. Use RtPublisher.fetch_schemas to retrieve schemas from a live deployment.
  • connection_timeout (int) – Seconds to wait for the initial RT connection. Default: 60.
  • rt_flush_timeout_secs (int | None) – Seconds to wait for RT to flush on stop. Default: 30.

Raises:

create

create(path, stream, publisher_id, endpoints)

Create an RtPublisher from explicit stream connection parameters.

Alternative constructor that generates a config from stream and endpoint parameters instead of a config URL.

Parameters:

  • path (str) – Parent directory for RT log files.
  • stream (str) – RT stream name (insert topic).
  • publisher_id (str) – Unique identifier for this publisher instance.
  • endpoints (str | list[str]) – RT endpoint address(es), e.g. ":127.0.0.1:5002".

Returns:

Example
from kxi.publish.rtpublisher import RtPublisher

pub = RtPublisher.create(
    path="/tmp/rt",
    stream="data",
    publisher_id="pypub",
    endpoints=":127.0.0.1:5002",
)

fetch_schemas

fetch_schemas(query_client=None, assembly=None)

Fetch table schemas from a kdb Insights deployment.

Schemas are used for type coercion before data is inserted via insert.

Parameters:

  • query_client (Query | None) – Authenticated Query connection. Creates one from environment variables if None.
  • assembly (str | None) – Assembly name to fetch schemas for. Fetches all assemblies if None.

Returns:

  • – Dict mapping table names to column-type dicts.

insert

insert(table, data)

Insert data into a named table.

Applies schema type coercion if schemas were loaded via fetch_schemas.

Parameters:

  • table (str) – Destination table name.
  • data (Any) – Data to insert as a dict mapping column names to value lists.

Raises:

  • RTException – RT is not started. Call start first or use the publisher as a context manager.

start

start()

Start the RT connection.

Retries until connected or connection_timeout is exceeded. Called automatically when used as a context manager.

Raises:

  • RTException – RT is already running.
  • Exception – Connection could not be established within the timeout.

stop

stop()

Stop the RT connection.

Called automatically when used as a context manager.

Raises:

Back to top