Using the kdb Insights Python interface
The Python interface allows you to publish data to RT in your own Python application.
The Python language interface is a Python wrapper of the C interface to RT
This section details how to get started with the Python interface.
Downloading the Python interface
You can download the Python interface from the KX Downloads Portal.
Supported Operating Systems
The Python interface is supported on the following operating systems:
- CentOS 8 or later
- Red Hat Enterprise Linux (RHEL) 8 or later
- Ubuntu 18.04 or later
- Windows
Note
The interface is only supported on x86 architectures. We currently do not support macOS.
Prerequisites
The Python interface requires the following system packages:
libssl
libcurl
Installation
Install the Python interface via pip
.
-
Ensure you have the latest version of
pip
:pip install --upgrade pip
-
Install the package:
pip install --extra-index-url https://portal.dl.kx.com/assets/pypi/ kxi-rtpy
The interface uses PyKX
The
pip install
command will also install PyKX if it is not already present.
Using the Python interface in your own application
To publish data to kdb Insights Enterprise using the Python interface as part of your own application, use the rtpy
module. This module contains the functionality to connect to kdb Insights Enterprise and publish the data into the system.
Authentication and Connectivity
kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL
).
This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise you can populate a local JSON file with the connectivity details. Further information on this is available in the getting started guide.
Initializing
-
Start the Python interpreter and import the
rtpy
module:from rtpy import Publisher, RTParams
-
Create an
RTParams
configuration object. Sample params objects below:params = RTParams(config_url='file:///tmp/rt_config.json', rt_dir='/tmp/tmprt') params = RTParams(prefix='my-', stream='data' client_name='mypublisher', rt_dir='/tmp/tmprt') ## This would send data to the RT stream 'my-data'
See the Parameters section for the full list of configuration parameters.
config_url
The
config_url
can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. See the getting started guide for more details.
Parameters
The RTParams
class contains the following RT connection configuration parameters.
parameter | required | description |
---|---|---|
rt_dir |
Yes | RT directory |
config_url |
No | The URL/file that is called/read to get the RT endpoint(s) |
prefix |
No | The RT stream prefix you want to send data to |
stream |
No | The RT stream name you want to send data to |
client_name |
No | The client name that the user can assign to their publisher. Useful when running multiple publishers from a single node/server |
query_timeout |
No | Milliseconds to wait for the connection to be established |
fetch_config_sleep |
No | Time in ms to sleep between reads of the configuration details |
config_max_age |
No | Maximum age of configuration in milliseconds |
local_persistence_period |
No | Local persistence period in milliseconds |
log_level |
No | Log level: info , warn , err , or off |
console_log_level |
No | Console log level: info , warn , err , or off |
ca_info_file |
No | Path to Certificate Authority certificate bundle |
dedup_id |
No | ID to dedup multiple publishers |
See the C interface documentation for the list of default values.
Publishing data
The Publisher
class implements the Python context manager protocol. The functions to connect and disconnect from RT are called implicitly when entering and exiting the with
block below.
-
Create some data and publish it to RT:
from rtpy import Publisher, RTParams import pykx as kx from datetime import * import uuid params = RTParams(config_url='file:///tmp/client.json', rt_dir='/tmp/tmprt') with Publisher(params) as pub: now = datetime.utcnow() trade_id = uuid.uuid4() # Send a Python list data = ["hello", 1234, 3.14, now] pub("test", data) # A kdb+ license is required to use the PyKX Table object data = kx.Table([[now, "VOD", "LSE", "buy", 75.90, 100, trade_id]], columns=['time','sym','exch','side','price','size','tradeID']) pub("trade", data)
Subscribing
You can also use the Python RT interface to subcribe to RT.
- Import the
Subscriber
class.
from rtpy import Subscriber
- Create a callback function to handle the RT messages.
import pykx as kx
def on_message(position, buffer):
print(f"position: {position}")
obj = kx.deserialize(buffer)
print(obj)
- Instantiate a
Subscriber
object.
import signal
signal.signal(signal.SIGINT, handle_sigint)
with Subscriber(config_url="https://my-insights.kx.com/informationservice", on_message=on_message):
print("Running... Press Ctrl+C to stop.")
signal.pause()
Parameters
The Subscriber
constructor accepts the following parameters.
name | type | Default | Description |
---|---|---|---|
config_url |
str |
'' |
URL to configuration JSON. |
endpoints |
List[str] |
[] |
List of endpoints where each endpoint is of the form <hostname>:<port> . |
stream_id |
str |
'' |
Name of the RT stream. |
topic_prefix |
str |
'' |
Prefix of RT stream which is prepended to stream_id . |
replicas |
int |
3 |
Number of RT nodes to connect to. |
sub_port |
int |
5003 |
Port of the subscription server on each RT node. |
repl_port |
int |
5001 |
Port of either the push server or pull server on each RT node (used for replica count monitoring). |
fragment_slots |
int |
10 |
Data being streamed from the subscription server to the subscriber is chunked into fragment of up to 1MB. fragment_slots specifies the number of unacknowledged fragments that can be in flight at any time. |
connect_timeout |
int |
5000 |
Timeout in milliseconds when attempting a connection. |
client_name |
str |
'' |
Name identifying this subscription. Only used for logging both server and client side. |
position |
int |
0 |
Position into the RT stream from which the subscription should start. |
topic_filter |
str |
'' |
Table name filter to apply to the subscription. Only messages matching this filter will be delivered to the subscriber. |
on_message |
Optional[Callable[[int, memoryview], None]] |
None |
Message callback function. |
on_event |
Optional[Callable[[EventType, int, int], None]] |
None |
Event callback function. |
There are three ways to define the connection details for the RT stream you are subscribing to.
1. Endpoints list. The list of endpoints to the RT subscription server running on each RT node.
2. Config URL. The URL to the kdb Insights Information Service.
3. Stream ID. The stream ID and associated information to construct the list of endpoints using the naming convention: <topic_prefix>-<stream_id>-[0..<replicas>]:<sub_port>
The on_message
callback's arguments are the the position in the RT stream of the message and the serialized message data.
The on_event
callback's arguments are the type of event, the position in the RT stream where the event occurred, and the next valid position in the RT stream following the event.
Next Steps
Learn how to use our sample Python program. to publish data.