Real-time publisher for kdb Insights Enterprise.¶
Use RtPublisher to insert data into kdb
Insights Enterprise via the RT (real-time) persistence layer using the rtpy
package.
Note
Requires kxi-rtpy, available on Linux x86_64 only.
Example
from kxi.publish.rtpublisher import RtPublisher
with RtPublisher(
config_url="https://my-insights.kx.com/had7t28a2dyga",
rt_dir="/tmp/rt_data",
) as pub:
pub.fetch_schemas()
pub.insert("trace", {"sensorID": [1, 2], "value": [3.14, 2.71]})
Classes:
- RtPublisher – Real-time publisher for kdb Insights Enterprise.
RtPublisher¶
RtPublisher(*, config_url=None, rt_dir=None, query_timeout=None, fetch_config_sleep=None, config_max_age=None, local_persistence_period=None, log_level=None, console_log_level=None, ca_info_file=None, dedup_id=None, schemas=None, connection_timeout=60, rt_flush_timeout_secs=30)
Real-time publisher for kdb Insights Enterprise.
Publishes data into kdb Insights Enterprise via the RT persistence layer. Use as a context manager for automatic start/stop lifecycle management.
Note
Requires kxi-rtpy, available on Linux x86_64 only.
Example
from kxi.publish.rtpublisher import RtPublisher
with RtPublisher(
config_url="https://my-insights.kx.com/had7t28a2dyga",
rt_dir="/tmp/rt_data",
) as pub:
pub.fetch_schemas()
pub.insert("trace", {"sensorID": [1, 2], "value": [3.14, 2.71]})
Functions:
- create – Create an RtPublisher from explicit stream connection parameters.
- fetch_schemas – Fetch table schemas from a kdb Insights deployment.
- insert – Insert data into a named table.
- start – Start the RT connection.
- stop – Stop the RT connection.
Initialise the RT publisher.
Parameters:
-
config_url (
str | Path | None) – Config URL for the RT connection. Accepts: -
HTTP URL:
https://host/path/to/config-id - File URI:
file:///home/user/config.json - Local path string or
pathlib.Path - rt_dir (
str | None) – Local directory for RT log storage. - query_timeout (
int | None) – Timeout in seconds for RT queries. - fetch_config_sleep (
int | None) – Interval in seconds between config fetch retries. - config_max_age (
int | None) – Maximum age in seconds for a cached config. - local_persistence_period (
int | None) – RT persistence flush period in seconds. - log_level (
str | None) – RT log level (e.g."info","debug"). - console_log_level (
str | None) – Console log level for RT output. - ca_info_file (
str | None) – Path to a CA certificate file for TLS connections. - dedup_id (
str | None) – Publisher deduplication identifier. - schemas (
dict[str, dict[str, QType]] | None) – Table schemas for type coercion before insertion. Use RtPublisher.fetch_schemas to retrieve schemas from a live deployment. - connection_timeout (
int) – Seconds to wait for the initial RT connection. Default:60. - rt_flush_timeout_secs (
int | None) – Seconds to wait for RT to flush on stop. Default:30.
Raises:
ImportError–rtpyis not installed.
create¶
create(path, stream, publisher_id, endpoints)
Create an RtPublisher from explicit stream connection parameters.
Alternative constructor that generates a config from stream and endpoint parameters instead of a config URL.
Parameters:
- path (
str) – Parent directory for RT log files. - stream (
str) – RT stream name (insert topic). - publisher_id (
str) – Unique identifier for this publisher instance. - endpoints (
str | list[str]) – RT endpoint address(es), e.g.":127.0.0.1:5002".
Returns:
RtPublisher– Configured RtPublisher.
Example
from kxi.publish.rtpublisher import RtPublisher
pub = RtPublisher.create(
path="/tmp/rt",
stream="data",
publisher_id="pypub",
endpoints=":127.0.0.1:5002",
)
fetch_schemas¶
fetch_schemas(query_client=None, assembly=None)
Fetch table schemas from a kdb Insights deployment.
Schemas are used for type coercion before data is inserted via insert.
Parameters:
- query_client (
Query | None) – Authenticated Query connection. Creates one from environment variables ifNone. - assembly (
str | None) – Assembly name to fetch schemas for. Fetches all assemblies ifNone.
Returns:
- – Dict mapping table names to column-type dicts.
insert¶
insert(table, data)
Insert data into a named table.
Applies schema type coercion if schemas were loaded via fetch_schemas.
Parameters:
- table (
str) – Destination table name. - data (
Any) – Data to insert as a dict mapping column names to value lists.
Raises:
RTException– RT is not started. Call start first or use the publisher as a context manager.
start¶
start()
Start the RT connection.
Retries until connected or connection_timeout is exceeded.
Called automatically when used as a context manager.
Raises:
RTException– RT is already running.Exception– Connection could not be established within the timeout.
stop¶
stop()
Stop the RT connection.
Called automatically when used as a context manager.
Raises:
RTException– RT is not currently running.