Skip to content

Querying with API

The KX Insights Platform includes services for persisting, and accessing data.

The Service Gateway offers an authenticated, secure and OpenAPI compatible API to retrieve data from the system.

An Assembly Operator is used to dynamically provision data access processes, and storage manager nodes.

Deployment

To query data, first you must deploy an assembly, which will configure any data publishers, and databases.

To apply a new assembly, use kubectl.

kubectl apply -f sdk_sample_assembly.yaml

For an example of a sdk sample assembly see deploying an Assembly

SQL Usage

In order to use SQL, you will need to augment the assembly to set queryEnvironment. See SQL

Role based Access

All service gateway endpoints starting with /kxi use a singular insights.query.data role.

This role must be applied to the user or service account before acquiring a new bearer token.

Invoking a custom API requires the insights.query.custom role.

Querying Data

All DA processes come equipped with an API for simple data retrieval, called .kxi.getData.

To query data using this API, you may make a REST API call to servicegateway/kxi/getData.

A query minimally includes the name of the table, start timestamp, end timestamp, and one or more user defined labels.

For an example user-defined label assetClass:

START=$(date "+%Y.%m.%dD00:00:00.000000000")
END=$(date "+%Y.%m.%dD23:59:59.999999999")

# Set $INSIGHTS_TOKEN to your OAuth2 Token
curl -X POST --header "Content-Type: application/json"\
    --header "Accept: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"trades\",\"startTS\":\"$START\",\"endTS\":\"$END\",\"assetClass\": \"manufacturing\"}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Tokens, users, and Service Accounts

For information on how to acquire a token, and instructions on querying as a user or service account, see Authentication.

The getData API supports additional parameters for reducing the columns returned, and basic filtering.

For more details see getData API.

Case-sensitive labels

Labels are case-sensitive.

Make sure label's key/value pairs supplied match the labels given when the assembly was applied.

Using qipc responses

By including the HTTP Accept header "application/octet-stream", you can get query results as a serialized qipc byte array.

This header allows for significantly reduced overhead and faster response times at the cost of some minor complexity when handling the results.

By using any of the kdb+ as client interfaces, you can deserialize the responses, and then process as normal.

Added Bonus

Using this strategy has the additional benefit of preserving type information. JSON responses have the disadvantage of converting all numbers to floats, and may truncate the precision of timestamps.

For each of the following examples, we assume you have INSIGHTS_TOKEN and INSIGHTS_HOSTNAME defined in your environment.

# Save results to results.dat
curl -X POST --header "Content-Type: application/json"\
    --header "Accept: application/octet-stream"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    -o results.dat\
    --data "{\"table\":\"trades\"}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Start qce and deserialize the response:

-9!read1`:results.dat
URL:"https://",getenv[`INSIGHTS_HOSTNAME],"/servicegateway/kxi/getData";
headers:("Accept";"Content-Type";"Authorization")!(
    "application/octet-stream";
    "application/json";
    "Bearer ",getenv `INSIGHTS_TOKEN);
body:.j.j enlist[`table]!enlist "trades";
resp:.kurl.sync (URL; `POST; `binary`headers`body!(1b;headers;body));
if[200 <> first resp; 'last resp];
show -9!last resp

Ensure your copy of c.js has decompression support: // 2021.04.05 added decompress support

const https = require('https');
const c = require('./c');
let TOKEN = process.env.INSIGHTS_TOKEN;
const options = {
    host    : process.env.INSIGHTS_HOSTNAME,
    path    : '/servicegateway/kxi/getData',
    method  : 'POST',
    headers : {
        'Accept'      : 'application/octet-stream',
        'Content-Type'  : 'application/json',
        'Authorization' : 'Bearer ' + TOKEN
    }
};
let body = {'table' : 'trades'};
let request = https.request(options, (res) => {
    res.setEncoding('binary');
    if (res.statusCode !== 200) {
        console.error(`Non 200 error code ${res.statusCode}`)
        res.resume();
        return;
    }
    let chunks = [];
    res.on('data', (chunk) => {
        chunks.push(Buffer.from(chunk, 'binary'));
    });
    res.on('end', () => {
        let b = Buffer.concat(chunks);
        console.log(c.deserialize(b));
    });
    });
request.write(JSON.stringify(body));
request.end();
request.on('error', (err) => {
    console.error(`Encountered an error trying to make a request: ${err.message}`);
});

KX Insights Platform REST client

For more details on using the rest client see here

Installing custom API

Custom APIs may be used to add new functions to the databases, or define new aggregation functions.

Custom APIs are housed within packages made by the Insights Packaging APIs. The kxi package cli may be used to initialize a new package.

Within a package, you may define any number of functions. To make a function a callable API for a database, you must call .da.register. To make an aggregation function available you must call .sgagg.registerAggFn. Both registrations take the name of your function, and descriptions of its metadata. For more information on API metadata descriptions and parameter types see metadata registration.

API registration considerations

Care should be taken within the package to ensure that databases do not make calls to .sgagg.registerAggFn, and Aggregators to not call .da.register. Setting an entrypoint for your package can help organize this.

Your package manifest.json must define entrypoints for data-access and aggregator if you wish to add APIs to them respectively:

    "entrypoints": {
        "aggregator": "agg.q",
        "data-access": "da.q"
    }

When deploying the insights platform, set KXI_PACKAGES to a comma separate list of packages you want to load on the aggregators.

When deploying an assembly set KXI_PACKAGES to a comma separate list of packages you want to load on the databases.

For example, with two packages named fin and ml both versioned as 1.0.0, you may set the following environment:

This will load the aggregator entrypoint for the ml and fin packages. Put these values into the insights values.yaml you install with.

service-gateway:
  aggregator:
   env:
     KXI_PACKAGES: "ml:1.0.0,fin:1.0.0"

qe-gateway:
  aggregator:
   env:
     KXI_PACKAGES: "ml:1.0.0,fin:1.0.0"

This will load the data-access entrypoint for the ml and fin packages. Put these values into an assembly yaml.

dap:
  instances:
    rdb:
      env:
        - name: KXI_PACKAGES
          value: "ml:1.0.0,fin:1.0.0"
    idb:
      env:
        - name: KXI_PACKAGES
          value: "ml:1.0.0,fin:1.0.0"
    hdb:
      env:
        - name: KXI_PACKAGES
          value: "ml:1.0.0,fin:1.0.0"

The metadata for a custom API should describe the parameters the API will take. For DA APIs the parameters must match the function parameter names, or alternatively be a single parameter named args which will result in a dictionary input.

Examples of registration:

An example custom API that takes table and returns the first 100 results for the given columns:

.example.api:{[table;columns]
    columns:$[-11h = type columns;enlist columns;columns];
    filter:enlist (<;`i;100); // Note for partitioned tables, will return first 100 per date
    :?[table;filter;0b;columns!columns]
    };

// NOTE: Alternatively, .example.api can take a single parameter called `args`. This will result in a dictionary with keys `table and `columns:
// Uncomment me for an alternative example:
// .example.api:{[args]
//    show args `table`columns
//    };

.da.registerAPI[`.example.api;
    .sapi.metaDescription["Simple 'select ... from ...' API."],
    .sapi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Table to query")],
    .sapi.metaParam[`name`type`isReq`default`description!(`columns;11 -11h;0b;`sym`time;"Column(s) to select.")],
    .sapi.metaReturn[`type`description!(98h;"Result of the select.")],
    .sapi.metaMisc[enlist[`safe]!enlist 1b]];

An example custom aggregation API that does an average daily count by sym:

//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls  {table[]}   List of tables with ``` `sym`price``` columns.
//
// @return      {table}     Average count by sym
//
avPrice:{[tbls]
    .sapi.ok select avg price by sym from raze tbls
    }
.sgagg.registerAggFn[`avPrice;
    .sapi.metaDescription["Average join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
    `$()
    ]

.sapi.ok

A custom aggregation API must return a response of (rcHeader; payload). Use .sapi.ok to return this response shape.

Refer to custom API metadata for descriptions of metadata builders, and for more examples.

For examples on custom aggregation functions see custom file example.

For examples on response see generating a response header.

Calling custom API

Custom DA APIs are callable through the servicegateway. The name of the API is used to generate the HTTP endpoint.

In order to call a custom API, you will need the permission: insights.query.custom.

A custom DA API named .example.api will produce an endpoint: /servicegateway/example/api.

Only namespaced custom DA APIs are currently supported.

Custom Aggregation APIs used by supplying opt.aggFn in the JSON body to any request.

The input to a custom aggregation API will be a list of results returned by the database API you are calling, from each database that accessed by the query.

For example, if your custom API called .example.api returned a table, and your queries startTS and endTS cause the query to distribute to the IDB and HDB, the aggregation function will get a list of two tables.

Example

This example requires the kxi package cli and a running install of insights. This example will create a new package, push it to the insights platform, and instruct how to setup the Aggregator and DAPs to load the apis.

In order to push your package, you will also need to have run kxi configure to save your hostname and namespace for the CLIs usage.

Assumption

This example assume your assembly schema is for the trades table, and contains at minimum sym and price. If not using that schema, you can still follow along and replace with columns that are relevant to you

kxi configure
Hostname []: https://kxdeveloper.aws-dev.kxi-dev.kx.com
Namespace []: bweber
Client ID []: test-client
Client Secret (input hidden):

Create a new package called custom:

mkdir packages
cd packages
kxi package init custom
cd custom
touch agg.q
touch da.q

Update manifest.json to replace the default entrypoint of init, with entrypoints for DA functions and Aggregation functions:

{
    "name": "custom",
    "version": "0.0.1",
    "entrypoints": {
        "aggregator": "agg.q",
        "data-access": "da.q"
    },
    "metadata": {
        "description": null,
        "authors": {
            "kxdeveloper": {
                "email": null
            }
        }
    },
    "dependencies": {}
}

Add the following to agg.q:

//
// @desc Agg function that does an average daily count by sym.
//
// @param tbls  {table[]}   List of tables with ``` `sym`price``` columns.
//
// @return      {table}     Average count by sym
//
avPrice:{[tbls]
    .sapi.ok select avg price by sym from raze tbls
    }
.sgagg.registerAggFn[`avPrice;
    .sapi.metaDescription["Average join aggregation"],
    .sapi.metaParam[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
    .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
    `$()
    ]

Add the following to da.q:

.example.api:{[table;columns]
    columns:$[-11h = type columns;enlist columns;columns];
    filter:enlist (<;`i;100); // Note that this will be incorrect for partitioned table, would return 100 per date
    :?[table;filter;0b;columns!columns]
    };

.da.registerAPI[`.example.api;
    .sapi.metaDescription["Simple 'select ... from ...' API."],
    .sapi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Table to query")],
    .sapi.metaParam[`name`type`isReq`default`description!(`columns;11 -11h;0b;`sym`time;"Column(s) to select.")],
    .sapi.metaReturn[`type`description!(98h;"Result of the select.")],
    .sapi.metaMisc[enlist[`safe]!enlist 1b]];

Next, push the APIs to your insights platform by calling packit and push:

mkdir -p /tmp/packages
mkdir -p /tmp/artifacts
export KX_PACKAGE_PATH=/tmp/packages
export KX_ARTIFACT_PATH=/tmp/artifacts

cd ..
kxi package packit custom --version 1.0.0 --tag
kxi package push custom/1.0.0

If configured correctly, the response you should see is a confirmation that the push completed:

{
    "custom": [
        {
            "version": "1.0.0",
            "_status": "InstallationStatus.SUCCESS"
        }
    ]
}

Now, if you deploy an assembly with KXI_PACKAGES set for the DA environment variables, as well as helm update the aggregator with KXI_PACKAGES, your APIs will be installed.

Example DA Configuration with entrypoint=da:

    dap:
      instances:
        rdb:
          env:
            - name: KXI_PACKAGES
              value: "custom:1.0.0:da"
        idb:
          env:
            - name: KXI_PACKAGES
              value: "custom:1.0.0:da"
        hdb:
          env:
            - name: KXI_PACKAGES
              value: "custom:1.0.0:da"

Example Aggregator Configuration with entrypoint=agg:

    service-gateway:
      aggregator:
       env:
         KXI_PACKAGES: "custom:1.0.0:agg"

    qe-gateway:
      aggregator:
       env:
         KXI_PACKAGES: "custom:1.0.0:agg"

You may now call your custom API over HTTPs by supplying the API parameters and a start/endTS:

# Example that uses custom API on data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
curl -X POST --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\": \"trades\", \"columns\":[\"sym\",\"price\"], \"startTS\": \"$startTS\", \"endTS\": \"$endTS\"}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/example/api"

By default, the aggregation will be a raze, to override the aggregation, update the query to set opts.aggFn:

# Example that uses custom aggregation API on `getData` within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
curl -X POST --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\": \"trades\", \"columns\":[\"sym\",\"price\"],  \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"opts\": {\"aggFn\":\"avPrice\"}}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/example/api"

Data Tiers and Life-cycle

Databases in insights are distributed across tiers. Data migrates across tiers as the data ages.

Data tiers are configured in the assembly specification, and involves describing mounts, and a data retention life cycle.

Newly received data can be made available in-memory for a number of days, before being migrated to on-disk storage or cloud storage. This enables a faster response time for recent data.

An example mount description detailing that the IDB/HDB are to be kept in a Rook CephFS partition, under the root /data/db.

  mounts:
    rdb:
      type: stream
      baseURI: none
      partition: none
    idb:
      type: local
      baseURI: file:///data/db/idb
      partition: ordinal
      volume:
        storageClass: "rook-cephfs"
        accessModes:
          - ReadWriteMany
    hdb:
      type: local
      baseURI: file:///data/db/hdb
      partition: date
      dependency:
      - idb
      volume:
        storageClass: "rook-cephfs"
        accessModes:
          - ReadWriteMany

An example showing corresponding data tiering configuration, saved under the storage manager elements.

Intra-day data would migrate from memory, to on disk every ten hours, again every midnight, and be retained for 3 months.

  elements:
    sm:
      source: south
      tiers:
        - name: streaming
          mount: rdb
        - name: interval
          mount: idb
          schedule:
            freq: 00:10:00
            snap: 00:00:00
        - name: recent
          mount: hdb
          schedule:
            freq: 1D00:00:00
            snap:   01:35:00
          retain:
            time: 3 Months

For a full detail description of data tiering, such as data compression, see the storage manager elements configuration.

Querying is tier agnostic

Do do not specify a tier when accessing data, instead use labels to query data.

Troubleshooting

Please see the Troubleshooting section from the data-access microservices