Skip to content

Publishing

The functionality outlined below provides a breakdown of kxi.publish.rtpublisher.RtPublisher, which you can use to publish data to kdb Insights or kdb Insights Enterprise.

kxi.publish.rtpublisher.RtPublisher Publisher interface for kdb Insights Enterprise

Publishing create Creates an instance of the RtPublisher class for publishing data to kdb Insights (not Enterprise). Does not require specifying a RT JSON configuration file start Start the RT connection for kdb Insights and kdb Insights Enterprise insert Publish data to kdb Insights and kdb Insights Enterprise stop Stop the RT connection for kdb Insights and kdb Insights Enterprise ..

kxi.publish.rtpublisher.RtPublisher

Create a python publisher client for kdb Insights or kdb Insights Enterprise instance.

Parameters:

name type description default
config_url str The URL/file that is called/read to get the RT endpoint(s) None
rt_dir str RT directory for client log files /tmp/rt
query_timeout int Milliseconds to wait for the connection to be established 2000
fetch_config_sleep int Time in ms to sleep between reads of the configuration details 5000
config_max_age int Maximum age of configuration in milliseconds 300000
local_persistence_period int Local persistence period in milliseconds 600000
log_level str Log level: info, warn, err, or off info
console_log_level str Console log level: info, warn, err, or off err
ca_info_file str Path to Certificate Authority certificate bundle None
dedup_id str ID to dedup multiple publishers None
schemas dict[str, dict[str, QType]] Table schemas in dict format. Tip: use Query.get_table_schemas() None
connection_timeout int Timeout (seconds) for RT initial connection 60

Example:

Create a python publisher client for kdb Insights or kdb Insights Enterprise instance.

Details on how to obtain the KXI_CONFIG_URL

>>> from kxi.publish.rtpublisher import RtPublisher
>>> import os
>>> os.environ['INSIGHTS_URL']
'https://my-insights.kx.com'
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> pub = RtPublisher(config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data")

create

Factory method to create a RtPublisher object. Needed for kdb Insights

Parameters:

name type description default
path str The location that your Insights Reliable Transport (RT) publisher client logs are written to None
stream str The RT stream that you want to send data to None
publisher_id str Identifier for the publisher client. Useful to distinguish multiple publishers on the same host None
endpoints str or list[str] It is possible to completely override the logic that creates the connection URLs for the replicators by manually providing the URLs None

Returns:

type description
RtPublisher RT publisher

Example:

Connect to a running instance of kdb Insights and publish data to RT with default values querying a named table

>>> from kxi.publish.rtpublisher import RtPublisher
>>> params = { 'path': '/tmp/rt', 'stream': 'data', 'publisher_id': 'pypub', 'endpoints': ':127.0.0.1:5002',}
>>> pub = RtPublisher.create(**params)

start

Start the RT connection for kdb Insights and kdb Insights Enterprise

insert

Publish data to kdb Insights and kdb Insights Enterprise

Parameters:

name type description default
table str The table in your database to send the data to None
data Any The payload to send to the database None

Example:

Connect to a running instance of kdb Insights and publish data to RT

>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> import kxi.query
>>> params = { 'path': '/tmp/rt', 'stream': 'data', 'publisher_id': 'pypub', 'endpoints': ':127.0.0.1:5002',}
>>> pub = RtPublisher.create(**params)
>>> pub.start()
>>> query = kxi.query.Query()
>>> pub.fetch_schemas(query)
>>> item = { 'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'passengers': [2], 'distance': [5.25], 'fare': [20.50], 'payment_type': ['AMEX'] }
>>> pub.insert('taxi', item)

Connect to a running instance of kdb Insights Enterprise and publish data to RT

>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> from kxi.util import QType
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> schemas =  { 'taxi': {'vendor': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'passengers': QType.get('long'), 'distance': QType.get('long'), 'fare': QType.get('long'), 'payment_type': QType.get('symbol') }}
>>> pub = RtPublisher( config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data", schemas=schemas)
>>> pub.start()
>>> item = { 'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'passengers': [2], 'distance': [5.25], 'fare': [20.50], 'payment_type': ['AMEX'] }
>>> pub.insert('taxi', item)

stop

Stop the RT connection for kdb Insights and kdb Insights Enterprise