Skip to content

Using the kdb Insights Python interface

The Python interface allows you to publish data to RT in your own Python application.

The Python language interface is a Python wrapper of the C interface to RT

This section details how to get started with the Python interface.

Downloading the Python interface

You can download the Python interface from the KX Downloads Portal.

Supported Operating Systems

The Python interface is supported on the following operating systems:

  • CentOS 8 or later
  • Red Hat Enterprise Linux (RHEL) 8 or later
  • Ubuntu 18.04 or later
  • Windows

Note

The interface is only supported on x86 architectures. We currently do not support macOS.

Prerequisites

The Python interface requires the following system packages:

  • libssl
  • libcurl

Installation

Install the Python interface via pip.

  1. Ensure you have the latest version of pip:

    pip install --upgrade pip
    
  2. Install the package:

    pip install --extra-index-url https://portal.dl.kx.com/assets/pypi/ kxi-rtpy
    

    The interface uses PyKX

    The pip install command will also install PyKX if it is not already present.

Using the Python interface in your own application

To publish data to kdb Insights Enterprise using the Python interface as part of your own application, use the rtpy module. This module contains the functionality to connect to kdb Insights Enterprise and publish the data into the system.

Authentication and Connectivity

kdb Insights Enterprise provides an Information Service which returns a client URL (KXI_CONFIG_URL). This URL is used to authenticate to the kdb Insights Enterprise instance. If you are not using kdb Insights Enterprise you can populate a local JSON file with the connectivity details. Further information on this is available in the getting started guide.

Initializing

  1. Start the Python interpreter and import the rtpy module:

    from rtpy import Publisher, RTParams
    
  2. Create an RTParams configuration object. Sample params objects below:

    params = RTParams(config_url='file:///tmp/rt_config.json', rt_dir='/tmp/tmprt')
    params = RTParams(prefix='my-', stream='data' client_name='mypublisher', rt_dir='/tmp/tmprt')  ## This would send data to the RT stream 'my-data'
    

    See the Parameters section for the full list of configuration parameters.

    config_url

    The config_url can be either a kdb Insights Enterprise client URL or the path to a local JSON file (using the file URI scheme) containing the RT configuration. See the getting started guide for more details.

Parameters

The RTParams class contains the following RT connection configuration parameters.

parameter required description
rt_dir Yes RT directory
config_url No The URL/file that is called/read to get the RT endpoint(s)
prefix No The RT stream prefix you want to send data to
stream No The RT stream name you want to send data to
client_name No The client name that the user can assign to their publisher. Useful when running multiple publishers from a single node/server
query_timeout No Milliseconds to wait for the connection to be established
fetch_config_sleep No Time in ms to sleep between reads of the configuration details
config_max_age No Maximum age of configuration in milliseconds
local_persistence_period No Local persistence period in milliseconds
log_level No Log level: info, warn, err, or off
console_log_level No Console log level: info, warn, err, or off
ca_info_file No Path to Certificate Authority certificate bundle
dedup_id No ID to dedup multiple publishers

See the C interface documentation for the list of default values.

Publishing data

The Publisher class implements the Python context manager protocol. The functions to connect and disconnect from RT are called implicitly when entering and exiting the with block below.

  1. Create some data and publish it to RT:

    from rtpy import Publisher, RTParams
    import pykx as kx
    from datetime import *
    import uuid
    
    params = RTParams(config_url='file:///tmp/client.json', rt_dir='/tmp/tmprt')
    
    with Publisher(params) as pub:
    
        now = datetime.utcnow()
        trade_id = uuid.uuid4()
    
        # Send a Python list
        data = ["hello", 1234, 3.14, now]
        pub("test", data)
    
        # A kdb+ license is required to use the PyKX Table object
        data = kx.Table([[now, "VOD", "LSE", "buy", 75.90, 100, trade_id]], columns=['time','sym','exch','side','price','size','tradeID'])
    
        pub("trade", data)
    

Subscribing

You can also use the Python RT interface to subcribe to RT.

  1. Import the Subscriber class.
from rtpy import Subscriber
  1. Create a callback function to handle the RT messages.
import pykx as kx

def on_message(position, buffer):
    print(f"position: {position}")
    obj = kx.deserialize(buffer)
    print(obj)
  1. Instantiate a Subscriber object.
import signal

signal.signal(signal.SIGINT, handle_sigint)

with Subscriber(config_url="https://my-insights.kx.com/informationservice", on_message=on_message):
    print("Running... Press Ctrl+C to stop.")
    signal.pause()

Parameters

The Subscriber constructor accepts the following parameters.

name type Default Description
config_url str '' URL to configuration JSON.
endpoints List[str] [] List of endpoints where each endpoint is of the form <hostname>:<port>.
stream_id str '' Name of the RT stream.
topic_prefix str '' Prefix of RT stream which is prepended to stream_id.
replicas int 3 Number of RT nodes to connect to.
sub_port int 5003 Port of the subscription server on each RT node.
repl_port int 5001 Port of either the push server or pull server on each RT node (used for replica count monitoring).
fragment_slots int 10 Data being streamed from the subscription server to the subscriber is chunked into fragment of up to 1MB. fragment_slots specifies the number of unacknowledged fragments that can be in flight at any time.
connect_timeout int 5000 Timeout in milliseconds when attempting a connection.
client_name str '' Name identifying this subscription. Only used for logging both server and client side.
position int 0 Position into the RT stream from which the subscription should start.
topic_filter str '' Table name filter to apply to the subscription. Only messages matching this filter will be delivered to the subscriber.
on_message Optional[Callable[[int, memoryview], None]] None Message callback function.
on_event Optional[Callable[[EventType, int, int], None]] None Event callback function.

There are three ways to define the connection details for the RT stream you are subscribing to. 1. Endpoints list. The list of endpoints to the RT subscription server running on each RT node. 2. Config URL. The URL to the kdb Insights Information Service. 3. Stream ID. The stream ID and associated information to construct the list of endpoints using the naming convention: <topic_prefix>-<stream_id>-[0..<replicas>]:<sub_port>

The on_message callback's arguments are the the position in the RT stream of the message and the serialized message data.

The on_event callback's arguments are the type of event, the position in the RT stream where the event occurred, and the next valid position in the RT stream following the event.

Next Steps

Learn how to use our sample Python program. to publish data.