Skip to content

kdb Insights Python API Quickstart

This quickstart provides you with a step-by-step guide to publish and retrieve data from a running instance of kdb Insights Enterprise using API calls and SQL. Python APIs, also known as user defined analytics (UDAs), can be defined in a package and deployed to your kdb Insights Enterprise deployment, allowing you to create tailored functionalities.

Prerequisites

  • You have installed the kdb Insights Python API following the installation guide.
  • You have access to a running version of kdb Insights Enterprise. For more information, refer to the installation guide.
  • You have installed the kdb Insights Command Line following the installation guide.

Quickstart

Authentication and Authorization

Before pushing a package or publishing data, use the KXI CLI to create users, clients, and assign roles. For more information on the CLI, refer to the kdb Insights Command Line documentation.

Administrator password

The administrator password below is defined during the installation of your kdb Insights Enterprise deployment. For more information, refer to the Administration Passwords documentation.

# Create a user
kxi user create user@domain.com --password <myPassword> --admin-password <adminPassword> --not-temporary
INSIGHTS_ROLES="insights.client.*,insights.package.*,insights.query.*,insights.admin.package.system"
kxi user assign-roles user@domain.com --roles $INSIGHTS_ROLES --admin-password <adminPassword> 

## Create a client
CLIENT_ID=svc1
INSIGHTS_ROLES="insights.client.*,insights.package.*,insights.query.*,insights.admin.package.system"
kxi user create-client $CLIENT_ID --admin-password <adminPassword>
kxi user assign-roles service-account-$CLIENT_ID --roles $INSIGHTS_ROLES --admin-password <adminPassword>
CLIENT_SECRET=$(kxi user get-client-secret $CLIENT_ID --admin-password <adminPassword>)

Package Deployment

Global Aggregator

There is an aggregator deployed as part of kdb Insights Enterprise. This is a global aggregator that can traverse all databases. An aggregator deployed as part of a package can only query the databases present in that package. In this guide, we rely on the global aggregator. There is no aggregator deployed in the package. Consequently, certain environment variables are required in your base kdb Insights Enterprise deployment.

  aggregator:
   env:
     KXI_PACKAGES: "kxi-py:1.12.0"
     KX_PACKAGE_PATH: "/opt/kx/packages"
     KXI_SECURE_ENABLED: "false"
     PYTHONPATH: "/opt/kx/shared/packaging/packages/kxi-py/1.12.0/src"

Follow the steps below to download and deploy the sample package kxi-py-1.12.0.kxi, available from the KX Download Portal.

Step 1 - Download the package

Run the following command to download the package:

version=1.12.0
curl -L https://portal.dl.kx.com/assets/raw/package-samples/kxi-py/$version/kxi-py-$version.kxi -o kxi-py-$version.kxi

Step 2 - Push the package to kdb Insights

Push the downloaded package to kdb Insights using the following command:

kxi pm push kxi-py-$version.kxi --client-id $CLIENT_ID --client-secret $CLIENT_SECRET

Step 3 - Deploy the package

Deploy the package to kdb Insights:

kxi pm deploy kxi-py/$version --client-id $CLIENT_ID --client-secret $CLIENT_SECRET

Publish to kdb Insights Enterprise

Follow the steps below to enroll a client and publish to kdb Insights Enterprise

Step 1 - Enrol a client

If the publisher is outside the kdb Insights Enterprise cluster, use the Information Service to retrieve SSL certificates and endpoint details. For more information on client enrollment, refer to the client enrollment documentation.

To allow the publisher to use this service, you must enrol a client by running:

kxi --serviceaccount-id $CLIENT_ID --serviceaccount-secret $CLIENT_SECRET client enrol --name publisher1 --insert-topic ext-taxi 

The following response is returned:

{
  "message": "success",
  "detail": "Client enrolled",
  "url": "5ed6e5b7c80c8e35d07249d12f32d9eb",
  "config_url": "https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb"
}

Step 2 - Define your authenticated kdb Insights Enterprise client URL (KXI_CONFIG_URL)

Copy the value returned for the config_url key returned when the client is enrolled

KXI_CONFIG_URL="https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb"

Step 3 - Publish data to kdb Insights Enterprise

>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> from kxi.util import QType
>>> import kxi.query
>>> import os
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> item1 = {'borough': ['Brooklyn'],'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'dropoff': [datetime.now() + timedelta(minutes=15)], 'passengers': [2], 'distance': [5], 'fare': [20], 'extra': [0],'tax': [0], 'tip': [2], 'tolls': [0], 'fees': [0], 'total': [22], 'payment_type': ['AMEX'] }
>>> item2 = {'borough': ['Brooklyn'], 'pickup': [datetime.now()], 'dropoff': [datetime.now() + timedelta(minutes=10)], 'traveldistance': [20], 'charge': [25.5], 'payment_type': ['AMEX'] }
>>> schemas =  { 'taxi': {'borough': QType.get('symbol'), 'vendor': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'dropoff': QType.get('timestamp'), 'passengers': QType.get('long'), 'distance': QType.get('long'), 'fare': QType.get('long'), 'extra': QType.get('long'), 'tax': QType.get('long'), 'tip': QType.get('long'), 'tolls': QType.get('long'), 'fees': QType.get('long'), 'total': QType.get('long'), 'payment_type': QType.get('symbol') }, 'uber': {'borough': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'dropoff': QType.get('timestamp'), 'traveldistance': QType.get('long'), 'charge': QType.get('float'), 'payment_type': QType.get('symbol') }}
>>> pub = RtPublisher(config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data", schemas=schemas)
>>> pub.start()
>>> pub.insert('taxi', item1)
>>> pub.insert('uber', item2)
>>> pub.stop()

Query kdb Insights Enterprise

The following snippets provide two examples each. The first example returns the data as PyKX objects, which requires PyKX to be installed. The second returns the data in JSON format.

  • Create an authenticated connection to your instance of kdb Insights Enterprise using the kxi.query.Query class outlined here.

    >>> import kxi.query
    >>> import os
    >>> conn = kxi.query.Query('https://my-insights.kx.com',client_id=os.environ['CLIENT_ID'],client_secret=os.environ['CLIENT_SECRET'])
    
        >>> import kxi.query
        >>> import os
    >>> import json
        >>> os.environ['INSIGHTS_URL']
        'https://my-insights.kx.com'
        >>> os.environ['CLIENT_ID']
        'my-client-id'
        >>> os.environ['CLIENT_SECRET']
        'my-client-secret'
        >>> conn = kxi.query.Query('https://my-insights.kx.com',client_id=os.environ['CLIENT_ID'],client_secret=os.environ['CLIENT_SECRET'],data_format='application/json')
    

APIs

kdb Insights includes the get_meta and get_data APIs that are used here for illustration purposes.

User Defined Analytics (UDAs)

The get_meta API can also be used to identify custom UDAs available in the data-access (DAP) and aggregator (AGG) services. In the example below, the .asofJoin and .select APIs are user defined analytics found in the deployed kxi-py package. For more information on custom UDAs in a package, refer to the packaging quickstart guide.

  • Query the kdb Insights Deployment to retrieve the metadata relating to your deployed database. Including filtering for available API's.

    >>> conn.get_meta()
    pykx.Dictionary(pykx.q('
    rc      | +`api`agg`assembly`schema`rc`labels`started!(0 6;2 2;0 2;0 2;`andre..
    dap     | +`assembly`instance`startTS`endTS!(`python-test`crypto`crypto`pytho..
    api     | +`api`kxname`aggFn`custom`full`metadata`procs!(`.kxi.getData`.kxi.p..
    agg     | +`aggFn`custom`full`metadata`procs!(`.sgagg.aggFnDflt`.sgagg.getDat..
    assembly| +`assembly`kxname`tbls!(`s#`crypto`python-test;`crypto`python-test;..
    schema  | +`table`assembly`typ`pkCols`updTsCol`prtnCol`sortColsMem`sortColsID..
    '))
    >>> conn.get_meta()['api']
    pykx.Table(pykx.q('
    api            kxname aggFn          custom full metadata                    ..
    -----------------------------------------------------------------------------..
    addition      mydb   myavg          1      1    `description`params`return`m..
    asofJoin      mydb   pyagg          1      1    `description`params`return`m..
    .example.daAPI mydb                  1      1    `description`params`return`m..
    .kxi.getData   mydb   .sgagg.getData 0      1    `description`params`return`m..
    .kxi.ping      mydb                  0      1    `description`params`return`m..
    .kxi.preview   mydb   .sgagg.preview 0      1    `description`params`return`m..
    select        mydb                  1      1    `description`params`return`m..
    '))
    
    >>> conn.get_meta()
    {'header': {'rcvTS': '2023-02-13T21:34:16.257000000', 'corr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'protocol': 'gw', 'logCorr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'client': ':10.30.143.195:5050', 'http': 'json', ...
    >>> conn.get_meta()['payload']['api']
    [{'api': '.kxi.getData', 'kxname': ['python-test', 'crypto'], 'aggFn': '.sgagg.getData', 'custom': False, 'full': True, 'metadata': {'description': 'Get timeseries and splayed data from data access processes.', 'params': [{'name': 'table', 'type': -11, 'isReq': True, 'description': 'Table to select from.'}, {'name': 'startTS', 'type': -12, 'isReq': False, 'description': 'Inclusive start time for period of interest.'}, {'name': 'endTS', 'type': -12, 'isReq': False, 'description': 'Exclusive end time for period of interest.'}, {'name': 'filter', 'type': 0, 'isReq': False, 'default': '', 'description': 'Custom filter.'}, {'name': 'groupBy', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column(s) to determine aggregation grouping'}, {'name': 'agg', 'type': [-11, 11, 0], 'isReq': False, 'default': '', 'description': 'Column(s) to return or aggregations to perform (where ` returns all columns).'}, {'name': 'sortCols', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column names to sort response payload on.'}, {'name': 'slice', 'type': [16, 18, 19], 'isReq': False, 'default': '', 'description': 'Timerange to grab from each date in request range'}, {'name': 'fill', 'type': -11, 'isReq': False, 'default': '', 'description': 'Fills option to apply to null data'}, {'name': 'temporality', 'type': -11, 'isReq': False, 'default': '', 'description': 'How to interpret time range, as continuous or as slice'}], 'return': {'type': 0, 'description': 'Triple consisting of sort columns (symbol[]), return columns (symbol[]), and table data (table)'}, 'misc': {'safe': True}, 'aggReturn': {'type': 98, 'description': 'Returns the raze of the tables.'}}, 'procs': []}, {'api': '.kxi.ping', 'kxname': ['python-test', 'crypto'], 'aggFn': '', 'custom': False, 'full': True, 'metadata': {'description': 'Returns true if it reaches a target process.', 'params': [], 'return': {'type': -1, 'description': 'True.'}, 'misc': {'safe': True}, 'aggReturn': {}}, 'procs': []}]
    
  • To query the table taxi, use a get_data API call. Make sure to apply a filter and denote the time from which you want to retrieve data.

    >>> conn.get_data('taxi', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'vendor', 'Happy Cabs']])
    pykx.Table(pykx.q('
    timestamp                     market   sym    from_symbol to_symbol side acti..
    -----------------------------------------------------------------------------..
    2023.01.11D09:15:52.447785984 Kraken   BTCGBP BTC         GBP       BID  ADD ..
    2023.01.11D09:15:53.058736128 Kraken   BTCGBP BTC         GBP       BID  REMO..
    2023.01.11D09:15:53.067506944 Kraken   BTCGBP BTC         GBP       BID  ADD ..
    2023.01.11D09:15:53.287335936 Kraken   BTCGBP BTC         GBP       BID  CHAN..
    2023.01.11D09:15:53.287882752 Bitfinex BTCGBP BTC         GBP       BID  CHAN..
    2023.01.11D09:15:53.462057984 Kraken   BTCGBP BTC         GBP       BID  REMO..
    
    >>> conn.get_data('taxi', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'vendor', 'Happy Cabs']])['payload']
    [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731'},
    

SQL

  • Query the table taxi using an SQL statement.

    >>> conn.sql('SELECT * from taxi')
    pykx.Table(pykx.q('
    date       label_kxname borough  vendor     pickup                        dro..
    -----------------------------------------------------------------------------..
    2024.11.13 mydb         Brooklyn Happy Cabs 2024.11.13D12:23:23.140902000 202..
    
    >>> conn.sql('select * from taxi')['payload']
    'payload': [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731', 'date': '2023-01-11', 'kxname': 'crypto'}, {'timestamp': '2023-01-11T09:15:52.985120000', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC',
    

UDAs

You can deploy your own Python UDAs through a package. This quickstart guide includes a sample set of UDAs that are part of the deployed kxi-py package. The two UDAs executed below are:

  1. select, an example call to the function .kxi.selectTable
  2. asofJoin, an example call using aj

    >>> from datetime import datetime,timedelta,date
    >>> conn.fetch_custom_apis()
    >>> end=datetime.strftime(datetime.now()+timedelta(days=1), "%Y-%m-%d")
    >>> start=datetime.strftime(datetime.now()-timedelta(days=1), "%Y-%m-%d")
    >>> params = {"table1": "uber", "table2": "taxi" , "start": start, "end": end}
    >>> conn.asofJoin(json=params)[1]
    >>> pykx.Table(pykx.q('
    borough    pickup             dropoff            traveldistance charge payment_type vendor       passengers distance fare extra tax tip tolls fees total
    --------------------------------------------------------------------------------------------------------------------------------------------------------
    "Brooklyn" 784815809308703000 784816703140908000 20             25.5   "AMEX"       "Happy Cabs" 2          5        20   0     0   2   0     0    22
    '))
    >>>
    >>> params = {"table": "uber", "start": start, "end": end}
    >>> conn.select(json=params)[1]
    pykx.Table(pykx.q('
    borough  pickup                        dropoff                       traveldi..
    -----------------------------------------------------------------------------..
    Brooklyn 2024.11.13D12:23:29.308703000 2024.11.13D12:33:29.308709000 20      ..
    '))
    
    >>> from datetime import datetime,timedelta,date
    >>> conn.fetch_custom_apis()
    >>> params = {"table": "uber", "start": "2024-11-13", "end": "2024-11-14"}
    >>> conn.select(json=params)
    {'header': {'rcvTS': '2024-11-18T12:45:20.162000000', 'corr': '8b21775b-db69-4e89-84ca-cd1b03879bcf', 'logCorr': '8b21775b-db69-4e89-84ca-cd1b03879bcf', 'http': 'json', 'api': '.select', 'agg': ':10.8.148.130:5070', 'refVintage': -9223372036854775807, 'rc': 0, 'ac': 0, 'ai': ''}, 'payload': [{'borough': 'Brooklyn', 'pickup': '2024-11-13T12:23:29.308703000', 'dropoff': '2024-11-13T12:33:29.308709000', 'traveldistance': 20, 'charge': 25.5, 'payment_type': 'AMEX'}]}
    >>>
    >>> params = {"table1": "uber", "table2": "taxi" , "start": "2024-11-13", "end": "2024-11-14"}
    >>> conn.asofJoin(json=params)
    {'header': {'rcvTS': '2024-11-18T12:43:46.274000000', 'corr': '2c31e67c-5a3c-4626-8b17-177a2e5476d7', 'logCorr': '2c31e67c-5a3c-4626-8b17-177a2e5476d7', 'http': 'json', 'api': '.asofJoin', 'agg': ':10.8.148.130:5070', 'refVintage': -9223372036854775807, 'rc': 0, 'ac': 0, 'ai': ''}, 'payload': [{'borough': 'Brooklyn', 'pickup': 784815809308703000, 'dropoff': 784816703140908000, 'traveldistance': 20, 'charge': 25.5, 'payment_type': 'AMEX', 'vendor': 'Happy Cabs', 'passengers': 2, 'distance': 5, 'fare': 20, 'extra': 0, 'tax': 0, 'tip': 2, 'tolls': 0, 'fees': 0, 'total': 22}]}