Publishing
The functionality outlined below provides a breakdown of kxi.publish.rtpublisher.RtPublisher
, which you can use to publish data to kdb Insights or kdb Insights Enterprise.
kxi.publish.rtpublisher.RtPublisher Publisher interface for kdb Insights Enterprise
Publishing create Creates an instance of the RtPublisher class for publishing data to kdb Insights (not Enterprise). Does not require specifying a RT JSON configuration file start Start the RT connection for kdb Insights and kdb Insights Enterprise insert Publish data to kdb Insights and kdb Insights Enterprise stop Stop the RT connection for kdb Insights and kdb Insights Enterprise ..
kxi.publish.rtpublisher.RtPublisher
Create a python publisher client for kdb Insights or kdb Insights Enterprise instance.
Parameters:
name | type | description | default |
---|---|---|---|
config_url | str | The URL/file that is called/read to get the RT endpoint(s) | None |
rt_dir | str | RT directory for client log files | /tmp/rt |
query_timeout | int | Milliseconds to wait for the connection to be established | 2000 |
fetch_config_sleep | int | Time in ms to sleep between reads of the configuration details | 5000 |
config_max_age | int | Maximum age of configuration in milliseconds | 300000 |
local_persistence_period | int | Local persistence period in milliseconds | 600000 |
log_level | str | Log level: info, warn, err, or off | info |
console_log_level | str | Console log level: info, warn, err, or off | err |
ca_info_file | str | Path to Certificate Authority certificate bundle | None |
dedup_id | str | ID to dedup multiple publishers | None |
schemas | dict[str, dict[str, QType]] | Table schemas in dict format. Tip: use Query.get_table_schemas() | None |
connection_timeout | int | Timeout (seconds) for RT initial connection | 60 |
Example:
Create a python publisher client for kdb Insights or kdb Insights Enterprise instance.
Details on how to obtain the KXI_CONFIG_URL
>>> from kxi.publish.rtpublisher import RtPublisher
>>> import os
>>> os.environ['INSIGHTS_URL']
'https://my-insights.kx.com'
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> pub = RtPublisher(config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data")
create
Factory method to create a RtPublisher object. Needed for kdb Insights
Parameters:
name | type | description | default |
---|---|---|---|
path | str | The location that your Insights Reliable Transport (RT) publisher client logs are written to | None |
stream | str | The RT stream that you want to send data to | None |
publisher_id | str | Identifier for the publisher client. Useful to distinguish multiple publishers on the same host | None |
endpoints | str or list[str] | It is possible to completely override the logic that creates the connection URLs for the replicators by manually providing the URLs | None |
Returns:
type | description |
---|---|
RtPublisher | RT publisher |
Example:
Connect to a running instance of kdb Insights and publish data to RT with default values querying a named table
>>> from kxi.publish.rtpublisher import RtPublisher
>>> params = { 'path': '/tmp/rt', 'stream': 'data', 'publisher_id': 'pypub', 'endpoints': ':127.0.0.1:5002',}
>>> pub = RtPublisher.create(**params)
start
Start the RT connection for kdb Insights and kdb Insights Enterprise
insert
Publish data to kdb Insights and kdb Insights Enterprise
Parameters:
name | type | description | default |
---|---|---|---|
table | str | The table in your database to send the data to | None |
data | Any | The payload to send to the database | None |
Example:
Connect to a running instance of kdb Insights and publish data to RT
>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> import kxi.query
>>> params = { 'path': '/tmp/rt', 'stream': 'data', 'publisher_id': 'pypub', 'endpoints': ':127.0.0.1:5002',}
>>> pub = RtPublisher.create(**params)
>>> pub.start()
>>> query = kxi.query.Query()
>>> pub.fetch_schemas(query)
>>> item = { 'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'passengers': [2], 'distance': [5.25], 'fare': [20.50], 'payment_type': ['AMEX'] }
>>> pub.insert('taxi', item)
Connect to a running instance of kdb Insights Enterprise and publish data to RT
>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> from kxi.util import QType
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> schemas = { 'taxi': {'vendor': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'passengers': QType.get('long'), 'distance': QType.get('long'), 'fare': QType.get('long'), 'payment_type': QType.get('symbol') }}
>>> pub = RtPublisher( config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data", schemas=schemas)
>>> pub.start()
>>> item = { 'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'passengers': [2], 'distance': [5.25], 'fare': [20.50], 'payment_type': ['AMEX'] }
>>> pub.insert('taxi', item)
stop
Stop the RT connection for kdb Insights and kdb Insights Enterprise