Querying with API
kdb Insights includes services for persisting, and accessing data.
The Service Gateway offers an authenticated, secure and OpenAPI compatible API to retrieve data from the system.
An Assembly Operator is used to dynamically provision data access processes, and storage manager nodes.
Deployment
To query data, first you must deploy an assembly, which will configure any data publishers and databases.
To apply a new assembly, use the kdb Insights CLI.
kxi assembly deploy --filepath sdk_sample_assembly.yaml
SQL Usage
In order to use SQL, you need to augment the assembly to set queryEnvironment
. For more information, refer to SQL.
Role based Access
All service gateway endpoints starting with /kxi
use a singular insights.query.data
role.
This role must be applied to the user or service account before acquiring a new bearer token.
Invoking a UDA requires the insights.query.custom
role.
Querying Data
All DA processes come equipped with an API for simple data retrieval, called .kxi.getData
.
To query data using this API, you can make a REST API call to servicegateway/kxi/getData
.
A query minimally includes the name of the table, start timestamp, end timestamp, and one or more user-defined labels.
For an example user-defined label assetClass
:
START=$(date "+%Y.%m.%dD00:00:00.000000000")
END=$(date "+%Y.%m.%dD23:59:59.999999999")
# Set $INSIGHTS_TOKEN to your OAuth2 Token
curl -X POST --header "Content-Type: application/json"\
--header "Accept: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\":\"trades\",\"startTS\":\"$START\",\"endTS\":\"$END\",\"assetClass\": \"manufacturing\"}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
Tokens, users, and Service Accounts
For information on how to acquire a token, and instructions on querying as a user or service account, refer to Authentication and authorization.
The getData
API supports additional parameters for reducing the columns returned, and basic filtering.
For more details, refer to the getData
API page.
Case-sensitive labels
Labels are case-sensitive.
Make sure the label's key/value pairs supplied match the labels given when the assembly was applied.
Using QIPC responses
By including the HTTP Accept
header "application/octet-stream", you can get query results as a serialized QIPC byte array.
This header allows for significantly reduced overhead and faster response times at the cost of some minor complexity when handling the results.
By using any of the kdb+ as client interfaces, you can deserialize the responses, and then process as normal.
Added Bonus
Using this strategy has the additional benefit of preserving type information.
JSON
responses have the disadvantage of converting all numbers to floats, and
may truncate the precision of timestamps.
Each of the following examples assumes you have INSIGHTS_TOKEN
and INSIGHTS_HOSTNAME
defined in your environment.
# Save results to results.dat
curl -X POST --header "Content-Type: application/json"\
--header "Accept: application/octet-stream"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
-o results.dat\
--data "{\"table\":\"trades\"}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
Start q
and deserialize the response:
-9!read1`:results.dat
URL:"https://",getenv[`INSIGHTS_HOSTNAME],"/servicegateway/kxi/getData";
headers:("Accept";"Content-Type";"Authorization")!(
"application/octet-stream";
"application/json";
"Bearer ",getenv `INSIGHTS_TOKEN);
body:.j.j enlist[`table]!enlist "trades";
resp:.kurl.sync (URL; `POST; `binary`headers`body!(1b;headers;body));
if[200 <> first resp; 'last resp];
show -9!last resp
Ensure your copy of c.js
has decompression support: // 2021.04.05 added decompress support
const https = require('https');
const c = require('./c');
let TOKEN = process.env.INSIGHTS_TOKEN;
const options = {
host : process.env.INSIGHTS_HOSTNAME,
path : '/servicegateway/kxi/getData',
method : 'POST',
headers : {
'Accept' : 'application/octet-stream',
'Content-Type' : 'application/json',
'Authorization' : 'Bearer ' + TOKEN
}
};
let body = {'table' : 'trades'};
let request = https.request(options, (res) => {
res.setEncoding('binary');
if (res.statusCode !== 200) {
console.error(`Non 200 error code ${res.statusCode}`)
res.resume();
return;
}
let chunks = [];
res.on('data', (chunk) => {
chunks.push(Buffer.from(chunk, 'binary'));
});
res.on('end', () => {
let b = Buffer.concat(chunks);
console.log(c.deserialize(b));
});
});
request.write(JSON.stringify(body));
request.end();
request.on('error', (err) => {
console.error(`Encountered an error trying to make a request: ${err.message}`);
});
kdb Insights Enterprise REST client
For more details on using the rest client, refer to Authentication and authorization.
Installing User Defined Analytics
You can use User Defined Analytics (UDAs) to add new functions to the databases, or define new aggregation functions.
UDAs are housed within a folder referred to as a package
.
For instructions, refer to Installing UDAs in the kdb Insights documentation.
For instructions, refer to Installing UDAs in the kdb Insights Enterprise documentation.
Within a package, you can define any number of functions. To make a function a callable API for a database, you must call .kxi.registerUDA
. For more information on UDA registration and metadata descriptions and parameter types, refer to UDA overview.
API registration considerations
Care should be taken within the package to ensure that query and aggregation functions do not conflict when registering. And that any specific DAP or aggregation logic are kept in different files (for example da.q
and agg.q
). Setting an entrypoint for your package can help organize this.
Your package manifest.json
file must define entrypoints for data-access
and aggregator
if you want to add UDAs to them respectively:
"entrypoints": {
"aggregator": "agg.q",
"data-access": "da.q"
}
Calling UDAs
UDAs are callable through the servicegateway
. The name of the API is used to generate the HTTP endpoint.
In order to call a UDA, you need the permission: insights.query.custom
.
A UDA named .example.api
produces the endpoint: /servicegateway/example/api
.
Only namespaced UDAs are currently supported.
The input to a custom aggregation API is a list of results returned by the database API you are calling, from each database that accessed by the query.
For example, if your UDA named .example.api
returns a table, and your queries startTS
and endTS
cause the query to distribute to the Intraday Database (IDB) and the Historical Database (HDB), the aggregation function gets a list of two tables.
Example
This example requires the kdb Insights Package CLI and a running install of insights. This example creates a new package, pushes it to kdb Insights Enterprise, and describes how to setup the Aggregator and DAPs to load the APIs.
In order to push your package, you also need to have run kxi configure
to save your hostname and namespace for the CLI's usage.
Assumption
This example assumes your assembly schema is for the trades
table, and contains at minimum sym
and price
. If not using that schema, you can still follow along and replace with columns that are relevant to you
kxi configure
Hostname []: https://kxdeveloper.aws-dev.kxi-dev.kx.com
Namespace []: jbloggs
Client ID []: test-client
Client Secret (input hidden):
Create a new package called custom
:
mkdir packages
cd packages
kxi package init custom
cd custom
touch agg.q
touch da.q
touch uda.q
Update manifest.json
to replace the default entrypoint of init, with entrypoints for DA functions and Aggregation functions:
{
"name": "custom",
"version": "0.0.1",
"entrypoints": {
"aggregator": "agg.q",
"data-access": "da.q"
},
"metadata": {
"description": null,
"authors": {
"kxdeveloper": {
"email": null
}
}
},
"dependencies": {}
}
Add the following to uda.q
:
// DA query function
.example.queryFn:{[table;columns]
columns:$[-11h = type columns;enlist columns;columns];
filter:enlist (<;`i;100); // Note for partitioned tables, will return first 100 per date
.kxi.response.ok ?[table;filter;0b;columns!columns]
};
// Aggregation function
.example.aggFn:{[res]
.kxi.response.ok 100?raze res
};
// Metadata for registering
metadata:.kxi.metaDescription["Returns first 100 records of each DAP"], / Description of the UDA
.kxi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Name of the table to query.")], / Describe `table` parameter
.kxi.metaParam[`name`type`isReq`description!(`columns;11 -11h;1b;"Columns in table to select")], / Describe `columns` parameter
.kxi.metaReturn`type`description!(98h;"Return first 100 rows."), / Describe the return of the UDA
.kxi.metaMisc[enlist[`safe]!enlist 1b]; / States that this UDA is safe to be retried in the event of an error
// Registration
.kxi.registerUDA `name`query`aggregation`metadata!(`.example.testUDA`.example.queryFn;`.example.aggFn;metadata)
Add the following to da.q
& agg.q
:
\l uda.q
Next, push the APIs to kdb Insights Enterprise by calling packit
and push
:
mkdir -p /tmp/packages
mkdir -p /tmp/artifacts
export KX_PACKAGE_PATH=/tmp/packages
export KX_ARTIFACT_PATH=/tmp/artifacts
cd ..
kxi package packit custom --version 1.0.0 --tag
kxi package push custom/1.0.0
If configured correctly, kdb Insights Enterprise provides confirmation that the push command succeeded:
{
"custom": [
{
"version": "1.0.0",
"_status": "InstallationStatus.SUCCESS"
}
]
}
Now, if you deploy an assembly with KXI_PACKAGES
set for the DA environment variables, as well as helm update the aggregator with KXI_PACKAGES
, your APIs will be installed.
Example DA Configuration with entrypoint for custom
package:
dap:
instances:
rdb:
env:
- name: KXI_PACKAGES
value: "custom:1.0.0"
idb:
env:
- name: KXI_PACKAGES
value: "custom:1.0.0"
hdb:
env:
- name: KXI_PACKAGES
value: "custom:1.0.0"
Example Aggregator Configuration for custom
package:
service-gateway:
aggregator:
env:
KXI_PACKAGES: "custom:1.0.0"
qe-gateway:
aggregator:
env:
KXI_PACKAGES: "custom:1.0.0"
You may now call your UDA over HTTPS by supplying the API parameters and a startTS
and endTS
:
# Example that uses UDA on data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
curl -X POST --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\": \"trades\", \"columns\":[\"sym\",\"price\"], \"startTS\": \"$startTS\", \"endTS\": \"$endTS\"}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/example/api"
By default, the aggregation will be a raze, to override the aggregation, update the query to set opts.aggFn
:
# Example that uses custom aggregation API on `getData` within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
curl -X POST --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\": \"trades\", \"columns\":[\"sym\",\"price\"], \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"opts\": {\"aggFn\":\"avPrice\"}}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/example/api"
Data Tiers and Life-cycle
Databases in insights are distributed across tiers. Data migrates across tiers as the data ages.
Data tiers are configured in the assembly specification, and involves describing mounts, and a data retention life cycle.
Newly received data can be made available in-memory for a number of days, before being migrated to on-disk storage or cloud storage. This enables a faster response time for recent data.
An example mount description detailing that the IDB/HDB are to be kept in a Rook CephFS
partition, under the root /data/db
.
mounts:
rdb:
type: stream
baseURI: none
partition: none
idb:
type: local
baseURI: file:///data/db/idb
partition: ordinal
volume:
storageClass: "rook-cephfs"
accessModes:
- ReadWriteMany
hdb:
type: local
baseURI: file:///data/db/hdb
partition: date
dependency:
- idb
volume:
storageClass: "rook-cephfs"
accessModes:
- ReadWriteMany
An example showing corresponding data tiering configuration, saved under the Storage Manager elements.
Intra-day data would migrate from memory, to on disk every ten hours, again every midnight, and be retained for 3 months.
elements:
sm:
source: south
tiers:
- name: streaming
mount: rdb
- name: interval
mount: idb
schedule:
freq: 00:10:00
- name: recent
mount: hdb
schedule:
freq: 1D00:00:00
snap: 01:35:00
retain:
time: 3 Months
For a full detail description of data tiering, such as data compression, refer to the Elements section of the Database configuration page.
Querying is tier agnostic
Do do not specify a tier when accessing data, instead use labels to query data.
Troubleshooting
For troubleshooting information, refer to Troubleshooting.