kdb Insights Python API Quickstart
This quickstart provides you with a step-by-step guide to publish and retrieve data from a running instance of kdb Insights Enterprise using API calls and SQL. Python APIs, also known as user defined analytics (UDAs), can be defined in a package and deployed to your kdb Insights Enterprise deployment, allowing you to create tailored functionalities.
Prerequisites
- You have installed the kdb Insights Python API following the installation guide.
- You have access to a running version of kdb Insights Enterprise. For more information, refer to the installation guide.
- You have installed the kdb Insights Command Line following the installation guide.
Quickstart
Authentication and Authorization
Before pushing a package or publishing data, use the KXI CLI to create users, clients, and assign roles. For more information on the CLI, refer to the kdb Insights Command Line documentation.
Administrator password
The administrator password below is defined during the installation of your kdb Insights Enterprise deployment. For more information, refer to the Administration Passwords documentation.
# Create a user
kxi user create user@domain.com --password <myPassword> --admin-password <adminPassword> --not-temporary
INSIGHTS_ROLES="insights.client.*,insights.package.*,insights.query.*,insights.admin.package.system"
kxi user assign-roles user@domain.com --roles $INSIGHTS_ROLES --admin-password <adminPassword>
## Create a client
CLIENT_ID=svc1
INSIGHTS_ROLES="insights.client.*,insights.package.*,insights.query.*,insights.admin.package.system"
kxi user create-client $CLIENT_ID --admin-password <adminPassword>
kxi user assign-roles service-account-$CLIENT_ID --roles $INSIGHTS_ROLES --admin-password <adminPassword>
CLIENT_SECRET=$(kxi user get-client-secret $CLIENT_ID --admin-password <adminPassword>)
Package Deployment
Global Aggregator
There is an aggregator deployed as part of kdb Insights Enterprise. This is a global aggregator that can traverse all databases. An aggregator deployed as part of a package can only query the databases present in that package. In this guide, we rely on the global aggregator. There is no aggregator deployed in the package. Consequently, certain environment variables are required in your base kdb Insights Enterprise deployment.
aggregator:
env:
KXI_PACKAGES: "kxi-py:1.12.0"
KX_PACKAGE_PATH: "/opt/kx/packages"
KXI_SECURE_ENABLED: "false"
PYTHONPATH: "/opt/kx/shared/packaging/packages/kxi-py/1.12.0/src"
Follow the steps below to download and deploy the sample package kxi-py-1.12.0.kxi
, available from the KX Download Portal.
Step 1 - Download the package
Run the following command to download the package:
version=1.12.0
curl -L https://portal.dl.kx.com/assets/raw/package-samples/kxi-py/$version/kxi-py-$version.kxi -o kxi-py-$version.kxi
Step 2 - Push the package to kdb Insights
Push the downloaded package to kdb Insights using the following command:
kxi pm push kxi-py-$version.kxi --client-id $CLIENT_ID --client-secret $CLIENT_SECRET
Step 3 - Deploy the package
Deploy the package to kdb Insights:
kxi pm deploy kxi-py/$version --client-id $CLIENT_ID --client-secret $CLIENT_SECRET
Publish to kdb Insights Enterprise
Follow the steps below to enroll a client and publish to kdb Insights Enterprise
Step 1 - Enrol a client
If the publisher is outside the kdb Insights Enterprise cluster, use the Information Service to retrieve SSL certificates and endpoint details. For more information on client enrollment, refer to the client enrollment documentation.
To allow the publisher to use this service, you must enrol a client by running:
kxi --serviceaccount-id $CLIENT_ID --serviceaccount-secret $CLIENT_SECRET client enrol --name publisher1 --insert-topic ext-taxi
The following response is returned:
{
"message": "success",
"detail": "Client enrolled",
"url": "5ed6e5b7c80c8e35d07249d12f32d9eb",
"config_url": "https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb"
}
Step 2 - Define your authenticated kdb Insights Enterprise client URL (KXI_CONFIG_URL)
Copy the value returned for the config_url
key returned when the client is enrolled
KXI_CONFIG_URL="https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb"
Step 3 - Publish data to kdb Insights Enterprise
>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> from kxi.util import QType
>>> import kxi.query
>>> import os
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> item1 = {'borough': ['Brooklyn'],'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'dropoff': [datetime.now() + timedelta(minutes=15)], 'passengers': [2], 'distance': [5], 'fare': [20], 'extra': [0],'tax': [0], 'tip': [2], 'tolls': [0], 'fees': [0], 'total': [22], 'payment_type': ['AMEX'] }
>>> item2 = {'borough': ['Brooklyn'], 'pickup': [datetime.now()], 'dropoff': [datetime.now() + timedelta(minutes=10)], 'traveldistance': [20], 'charge': [25.5], 'payment_type': ['AMEX'] }
>>> schemas = { 'taxi': {'borough': QType.get('symbol'), 'vendor': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'dropoff': QType.get('timestamp'), 'passengers': QType.get('long'), 'distance': QType.get('long'), 'fare': QType.get('long'), 'extra': QType.get('long'), 'tax': QType.get('long'), 'tip': QType.get('long'), 'tolls': QType.get('long'), 'fees': QType.get('long'), 'total': QType.get('long'), 'payment_type': QType.get('symbol') }, 'uber': {'borough': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'dropoff': QType.get('timestamp'), 'traveldistance': QType.get('long'), 'charge': QType.get('float'), 'payment_type': QType.get('symbol') }}
>>> pub = RtPublisher(config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data", schemas=schemas)
>>> pub.start()
>>> pub.insert('taxi', item1)
>>> pub.insert('uber', item2)
>>> pub.stop()
Query kdb Insights Enterprise
The following snippets provide two examples each. The first example returns the data as PyKX objects, which requires PyKX to be installed. The second returns the data in JSON format.
-
Create an authenticated connection to your instance of kdb Insights Enterprise using the
kxi.query.Query
class outlined here.>>> import kxi.query >>> import os >>> conn = kxi.query.Query('https://my-insights.kx.com',client_id=os.environ['CLIENT_ID'],client_secret=os.environ['CLIENT_SECRET'])
>>> import kxi.query >>> import os >>> import json >>> os.environ['INSIGHTS_URL'] 'https://my-insights.kx.com' >>> os.environ['CLIENT_ID'] 'my-client-id' >>> os.environ['CLIENT_SECRET'] 'my-client-secret' >>> conn = kxi.query.Query('https://my-insights.kx.com',client_id=os.environ['CLIENT_ID'],client_secret=os.environ['CLIENT_SECRET'],data_format='application/json')
APIs
kdb Insights includes the get_meta
and get_data
APIs that are used here for illustration purposes.
User Defined Analytics (UDAs)
The get_meta
API can also be used to identify custom UDAs available in the data-access (DAP) and aggregator (AGG) services. In the example below, the .asofJoin
and .select
APIs are user defined analytics found in the deployed kxi-py
package. For more information on custom UDAs in a package, refer to the packaging quickstart guide.
-
Query the kdb Insights Deployment to retrieve the metadata relating to your deployed database. Including filtering for available API's.
>>> conn.get_meta() pykx.Dictionary(pykx.q(' rc | +`api`agg`assembly`schema`rc`labels`started!(0 6;2 2;0 2;0 2;`andre.. dap | +`assembly`instance`startTS`endTS!(`python-test`crypto`crypto`pytho.. api | +`api`kxname`aggFn`custom`full`metadata`procs!(`.kxi.getData`.kxi.p.. agg | +`aggFn`custom`full`metadata`procs!(`.sgagg.aggFnDflt`.sgagg.getDat.. assembly| +`assembly`kxname`tbls!(`s#`crypto`python-test;`crypto`python-test;.. schema | +`table`assembly`typ`pkCols`updTsCol`prtnCol`sortColsMem`sortColsID.. ')) >>> conn.get_meta()['api'] pykx.Table(pykx.q(' api kxname aggFn custom full metadata .. -----------------------------------------------------------------------------.. addition mydb myavg 1 1 `description`params`return`m.. asofJoin mydb pyagg 1 1 `description`params`return`m.. .example.daAPI mydb 1 1 `description`params`return`m.. .kxi.getData mydb .sgagg.getData 0 1 `description`params`return`m.. .kxi.ping mydb 0 1 `description`params`return`m.. .kxi.preview mydb .sgagg.preview 0 1 `description`params`return`m.. select mydb 1 1 `description`params`return`m.. '))
>>> conn.get_meta() {'header': {'rcvTS': '2023-02-13T21:34:16.257000000', 'corr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'protocol': 'gw', 'logCorr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'client': ':10.30.143.195:5050', 'http': 'json', ... >>> conn.get_meta()['payload']['api'] [{'api': '.kxi.getData', 'kxname': ['python-test', 'crypto'], 'aggFn': '.sgagg.getData', 'custom': False, 'full': True, 'metadata': {'description': 'Get timeseries and splayed data from data access processes.', 'params': [{'name': 'table', 'type': -11, 'isReq': True, 'description': 'Table to select from.'}, {'name': 'startTS', 'type': -12, 'isReq': False, 'description': 'Inclusive start time for period of interest.'}, {'name': 'endTS', 'type': -12, 'isReq': False, 'description': 'Exclusive end time for period of interest.'}, {'name': 'filter', 'type': 0, 'isReq': False, 'default': '', 'description': 'Custom filter.'}, {'name': 'groupBy', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column(s) to determine aggregation grouping'}, {'name': 'agg', 'type': [-11, 11, 0], 'isReq': False, 'default': '', 'description': 'Column(s) to return or aggregations to perform (where ` returns all columns).'}, {'name': 'sortCols', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column names to sort response payload on.'}, {'name': 'slice', 'type': [16, 18, 19], 'isReq': False, 'default': '', 'description': 'Timerange to grab from each date in request range'}, {'name': 'fill', 'type': -11, 'isReq': False, 'default': '', 'description': 'Fills option to apply to null data'}, {'name': 'temporality', 'type': -11, 'isReq': False, 'default': '', 'description': 'How to interpret time range, as continuous or as slice'}], 'return': {'type': 0, 'description': 'Triple consisting of sort columns (symbol[]), return columns (symbol[]), and table data (table)'}, 'misc': {'safe': True}, 'aggReturn': {'type': 98, 'description': 'Returns the raze of the tables.'}}, 'procs': []}, {'api': '.kxi.ping', 'kxname': ['python-test', 'crypto'], 'aggFn': '', 'custom': False, 'full': True, 'metadata': {'description': 'Returns true if it reaches a target process.', 'params': [], 'return': {'type': -1, 'description': 'True.'}, 'misc': {'safe': True}, 'aggReturn': {}}, 'procs': []}]
-
To query the table
taxi
, use aget_data
API call. Make sure to apply a filter and denote the time from which you want to retrieve data.>>> conn.get_data('taxi', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'vendor', 'Happy Cabs']]) pykx.Table(pykx.q(' timestamp market sym from_symbol to_symbol side acti.. -----------------------------------------------------------------------------.. 2023.01.11D09:15:52.447785984 Kraken BTCGBP BTC GBP BID ADD .. 2023.01.11D09:15:53.058736128 Kraken BTCGBP BTC GBP BID REMO.. 2023.01.11D09:15:53.067506944 Kraken BTCGBP BTC GBP BID ADD .. 2023.01.11D09:15:53.287335936 Kraken BTCGBP BTC GBP BID CHAN.. 2023.01.11D09:15:53.287882752 Bitfinex BTCGBP BTC GBP BID CHAN.. 2023.01.11D09:15:53.462057984 Kraken BTCGBP BTC GBP BID REMO..
>>> conn.get_data('taxi', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'vendor', 'Happy Cabs']])['payload'] [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731'},
SQL
-
Query the table
taxi
using an SQL statement.>>> conn.sql('SELECT * from taxi') pykx.Table(pykx.q(' date label_kxname borough vendor pickup dro.. -----------------------------------------------------------------------------.. 2024.11.13 mydb Brooklyn Happy Cabs 2024.11.13D12:23:23.140902000 202..
>>> conn.sql('select * from taxi')['payload'] 'payload': [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731', 'date': '2023-01-11', 'kxname': 'crypto'}, {'timestamp': '2023-01-11T09:15:52.985120000', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC',
UDAs
You can deploy your own Python UDAs through a package. This quickstart guide includes a sample set of UDAs that are part of the deployed kxi-py
package.
The two UDAs executed below are:
select
, an example call to the function.kxi.selectTable
-
asofJoin
, an example call usingaj
>>> from datetime import datetime,timedelta,date >>> conn.fetch_custom_apis() >>> end=datetime.strftime(datetime.now()+timedelta(days=1), "%Y-%m-%d") >>> start=datetime.strftime(datetime.now()-timedelta(days=1), "%Y-%m-%d") >>> params = {"table1": "uber", "table2": "taxi" , "start": start, "end": end} >>> conn.asofJoin(json=params)[1] >>> pykx.Table(pykx.q(' borough pickup dropoff traveldistance charge payment_type vendor passengers distance fare extra tax tip tolls fees total -------------------------------------------------------------------------------------------------------------------------------------------------------- "Brooklyn" 784815809308703000 784816703140908000 20 25.5 "AMEX" "Happy Cabs" 2 5 20 0 0 2 0 0 22 ')) >>> >>> params = {"table": "uber", "start": start, "end": end} >>> conn.select(json=params)[1] pykx.Table(pykx.q(' borough pickup dropoff traveldi.. -----------------------------------------------------------------------------.. Brooklyn 2024.11.13D12:23:29.308703000 2024.11.13D12:33:29.308709000 20 .. '))
>>> from datetime import datetime,timedelta,date >>> conn.fetch_custom_apis() >>> params = {"table": "uber", "start": "2024-11-13", "end": "2024-11-14"} >>> conn.select(json=params) {'header': {'rcvTS': '2024-11-18T12:45:20.162000000', 'corr': '8b21775b-db69-4e89-84ca-cd1b03879bcf', 'logCorr': '8b21775b-db69-4e89-84ca-cd1b03879bcf', 'http': 'json', 'api': '.select', 'agg': ':10.8.148.130:5070', 'refVintage': -9223372036854775807, 'rc': 0, 'ac': 0, 'ai': ''}, 'payload': [{'borough': 'Brooklyn', 'pickup': '2024-11-13T12:23:29.308703000', 'dropoff': '2024-11-13T12:33:29.308709000', 'traveldistance': 20, 'charge': 25.5, 'payment_type': 'AMEX'}]} >>> >>> params = {"table1": "uber", "table2": "taxi" , "start": "2024-11-13", "end": "2024-11-14"} >>> conn.asofJoin(json=params) {'header': {'rcvTS': '2024-11-18T12:43:46.274000000', 'corr': '2c31e67c-5a3c-4626-8b17-177a2e5476d7', 'logCorr': '2c31e67c-5a3c-4626-8b17-177a2e5476d7', 'http': 'json', 'api': '.asofJoin', 'agg': ':10.8.148.130:5070', 'refVintage': -9223372036854775807, 'rc': 0, 'ac': 0, 'ai': ''}, 'payload': [{'borough': 'Brooklyn', 'pickup': 784815809308703000, 'dropoff': 784816703140908000, 'traveldistance': 20, 'charge': 25.5, 'payment_type': 'AMEX', 'vendor': 'Happy Cabs', 'passengers': 2, 'distance': 5, 'fare': 20, 'extra': 0, 'tax': 0, 'tip': 2, 'tolls': 0, 'fees': 0, 'total': 22}]}