kdb Insights Python API Quickstart
This quickstart provides you with a step-by-step guide to publish and retrieve data from a running instance of kdb Insights Enterprise using API calls and SQL.
Pre-requisites
- You have installed the kdb Insights Python API following the installation guide.
- You have access to a running version of the kdb Insights Enterprise.
- You have access to some data within a database in your kdb Insights Enterprise instance.
- You have access to and have set the following environment variables
INSIGHTS_URL
,INSIGHTS_CLIENT_ID
andINSIGHTS_CLIENT_SECRET
required to complete a query which are associated with your instance of kdb Insights Enterprise. The values forINSIGHTS_CLIENT_ID
andINSIGHTS_CLIENT_SECRET
can be retrived through keycloak as outlined here.
Quickstart
Publish
>>> from datetime import datetime, timedelta
>>> from kxi.publish.rtpublisher import RtPublisher
>>> from kxi.util import QType
>>> import kxi.query
>>> import os
>>> os.environ['KXI_CONFIG_URL']
'https://my-insights.kx.com/informationservice/details/5ed6e5b7c80c8e35d07249d12f32d9eb'
>>> item = {'vendor': ['Happy Cabs'], 'pickup': [datetime.now()], 'dropoff': [datetime.now() + timedelta(minutes=15)], 'passengers': [2], 'distance': [5], 'fare': [20], 'extra': [0],'tax': [0], 'tip': [2], 'tolls': [0], 'fees': [0], 'total': [22], 'payment_type': ['AMEX'] }
>>> schemas = { 'taxi': {'vendor': QType.get('symbol'), 'pickup': QType.get('timestamp'), 'dropoff': QType.get('timestamp'), 'passengers': QType.get('long'), 'distance': QType.get('long'), 'fare': QType.get('long'), 'extra': QType.get('long'), 'tax': QType.get('long'), 'tip': QType.get('long'), 'tolls': QType.get('long'), 'fees': QType.get('long'), 'total': QType.get('long'), 'payment_type': QType.get('symbol') }}
>>> pub = RtPublisher(config_url=os.environ['KXI_CONFIG_URL'], rt_dir="/tmp/rt_data", schemas=schemas)
>>> pub.start()
>>> pub.insert('taxi', item)
>>> pub.stop()
Query
The following shows two examples, the first returns the data as PyKX objects, which requires PyKX to be installed, the second returns the data as JSON data.
-
Create an authenticated connection to your instance of kdb Insights Enterprise using the
kxi.query.Query
class outlined here.>>> import kxi.query >>> import os >>> os.environ['INSIGHTS_URL'] 'https://my-insights.kx.com' >>> os.environ['INSIGHTS_CLIENT_ID'] 'my-client-id' >>> os.environ['INSIGHTS_CLIENT_SECRET'] 'my-client-secret' >>> conn = kxi.query.Query()
>>> import kxi.query >>> import os >>> import json >>> os.environ['INSIGHTS_URL'] 'https://my-insights.kx.com' >>> os.environ['INSIGHTS_CLIENT_ID'] 'my-client-id' >>> os.environ['INSIGHTS_CLIENT_SECRET'] 'my-client-secret' >>> conn = kxi.query.Query(data_format='application/json')
-
Query the kdb Insights Deployment to retrieve the metadata relating to your deployed database. Including filtering for available API's.
>>> conn.get_meta() pykx.Dictionary(pykx.q(' rc | +`api`agg`assembly`schema`rc`labels`started!(0 6;2 2;0 2;0 2;`andre.. dap | +`assembly`instance`startTS`endTS!(`python-test`crypto`crypto`pytho.. api | +`api`kxname`aggFn`custom`full`metadata`procs!(`.kxi.getData`.kxi.p.. agg | +`aggFn`custom`full`metadata`procs!(`.sgagg.aggFnDflt`.sgagg.getDat.. assembly| +`assembly`kxname`tbls!(`s#`crypto`python-test;`crypto`python-test;.. schema | +`table`assembly`typ`pkCols`updTsCol`prtnCol`sortColsMem`sortColsID.. ')) >>> conn.get_meta()['api'] pykx.Table(pykx.q(' api kxname aggFn custom full metadata .. -----------------------------------------------------------------------------.. .kxi.getData python-test crypto .sgagg.getData 0 1 `description`param.. .kxi.ping python-test crypto 0 1 `description`param.. '))
>>> conn.get_meta() {'header': {'rcvTS': '2023-02-13T21:34:16.257000000', 'corr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'protocol': 'gw', 'logCorr': '18d77741-a11f-4c45-bcbd-a555ba93b32a', 'client': ':10.30.143.195:5050', 'http': 'json', ... >>> conn.get_meta()['payload']['api'] [{'api': '.kxi.getData', 'kxname': ['python-test', 'crypto'], 'aggFn': '.sgagg.getData', 'custom': False, 'full': True, 'metadata': {'description': 'Get timeseries and splayed data from data access processes.', 'params': [{'name': 'table', 'type': -11, 'isReq': True, 'description': 'Table to select from.'}, {'name': 'startTS', 'type': -12, 'isReq': False, 'description': 'Inclusive start time for period of interest.'}, {'name': 'endTS', 'type': -12, 'isReq': False, 'description': 'Exclusive end time for period of interest.'}, {'name': 'filter', 'type': 0, 'isReq': False, 'default': '', 'description': 'Custom filter.'}, {'name': 'groupBy', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column(s) to determine aggregation grouping'}, {'name': 'agg', 'type': [-11, 11, 0], 'isReq': False, 'default': '', 'description': 'Column(s) to return or aggregations to perform (where ` returns all columns).'}, {'name': 'sortCols', 'type': [-11, 11], 'isReq': False, 'default': '', 'description': 'Column names to sort response payload on.'}, {'name': 'slice', 'type': [16, 18, 19], 'isReq': False, 'default': '', 'description': 'Timerange to grab from each date in request range'}, {'name': 'fill', 'type': -11, 'isReq': False, 'default': '', 'description': 'Fills option to apply to null data'}, {'name': 'temporality', 'type': -11, 'isReq': False, 'default': '', 'description': 'How to interpret time range, as continuous or as slice'}], 'return': {'type': 0, 'description': 'Triple consisting of sort columns (symbol[]), return columns (symbol[]), and table data (table)'}, 'misc': {'safe': True}, 'aggReturn': {'type': 98, 'description': 'Returns the raze of the tables.'}}, 'procs': []}, {'api': '.kxi.ping', 'kxname': ['python-test', 'crypto'], 'aggFn': '', 'custom': False, 'full': True, 'metadata': {'description': 'Returns true if it reaches a target process.', 'params': [], 'return': {'type': -1, 'description': 'True.'}, 'misc': {'safe': True}, 'aggReturn': {}}, 'procs': []}]
-
Query the table
my_table
using an SQL statement.>>> conn.sql('SELECT * from taxi') pykx.Table(pykx.q(' date label_kxname vendor pickup dropoff .. -----------------------------------------------------------------------------.. 2024.05.14 na Happy Cabs 2024.05.14D15:09:11.992705000 2024.05.14D1.. 2024.05.14 na Happy Cabs 2024.05.14D15:30:40.666830000 2024.05.14D1.. 2024.05.14 na Happy Cabs 2024.05.14D15:47:55.346594000 2024.05.14D1..
>>> conn.sql('select * from obUpdates')['payload'] 'payload': [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731', 'date': '2023-01-11', 'kxname': 'crypto'}, {'timestamp': '2023-01-11T09:15:52.985120000', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC',
-
Query the table
my_table
using aget_data
API call, applying a filter and denoting the time from which data should be retrieved.>>> conn.get_data('obUpdates', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'side', 'BID']]) pykx.Table(pykx.q(' timestamp market sym from_symbol to_symbol side acti.. -----------------------------------------------------------------------------.. 2023.01.11D09:15:52.447785984 Kraken BTCGBP BTC GBP BID ADD .. 2023.01.11D09:15:53.058736128 Kraken BTCGBP BTC GBP BID REMO.. 2023.01.11D09:15:53.067506944 Kraken BTCGBP BTC GBP BID ADD .. 2023.01.11D09:15:53.287335936 Kraken BTCGBP BTC GBP BID CHAN.. 2023.01.11D09:15:53.287882752 Bitfinex BTCGBP BTC GBP BID CHAN.. 2023.01.11D09:15:53.462057984 Kraken BTCGBP BTC GBP BID REMO..
>>> conn.get_data('obUpdates', start_time='2023.01.11D09:15:52.447785984', filter=[['=', 'side', 'BID']])['payload'] [{'timestamp': '2023-01-11T09:15:52.447785984', 'market': 'Kraken', 'sym': 'BTCGBP', 'from_symbol': 'BTC', 'to_symbol': 'GBP', 'side': 'BID', 'action': 'ADD', 'ccseq': 1264949000, 'price': 14356.6, 'quantity': 0.09919806, 'seq': None, 'delay_timespan': '0D00:00:00.000806731'},