Skip to content

Querying with API

The KX Insights Platform includes services for persisting, and accessing data.

The Service Gateway offers an authenticated, secure and OpenAPI compatible API to retrieve data from the system.

An Assembly Operator is used to dynamically provision data access processes, and storage manager nodes.

Deployment

To query data, first you must deploy an assembly, which will configure any data publishers, and databases.

To apply a new assembly, use kubectl.

kubectl apply -f sdk_sample_assembly.yaml
For an example of a sdk sample assembly see deploying an Assembly

Role based Access

All service gateway endpoints starting with /kxi use a singular insights.query.data role.

This role must be applied to the user or service account before acquiring a new bearer token.

Invoking a custom API requires the insights.query.custom role.

Querying Data

All DA processes come equipped with an API for simple data retrieval, called .kxi.getData.

To query data using this API, you may make a REST API call to servicegateway/kxi/getData.

A query minimally includes the name of the table, start timestamp, end timestamp, and one or more user defined labels.

For an example user-defined label assetClass:

START=$(date "+%Y.%m.%dD00:00:00.000000000")
END=$(date "+%Y.%m.%dD23:59:59.999999999")


# Set $INSIGHTS_TOKEN to your OAuth2 Token
curl -X POST --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"trace\",\"startTS\":\"$START\",\"endTS\":\"$END\",\"assetClass\": \"manufacturing\"}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Tokens, users, and Service Accounts

For information on how to acquire a token, and instructions on querying as a user or service account, see Authentication.

The getData API supports additional parameters for reducing the columns returned, and basic filtering.

For more details see getData API.

Case-sensitive labels

Labels are case-sensitive.

Make sure label's key/value pairs supplied match the labels given when the assembly was applied.

Using qipc responses

By including the HTTP Accepted header "application/octet-stream", you can get query results as a serialized qipc byte array.

This header allows for significantly reduced overhead and faster response times at the cost of some minor complexity when handling the results.

By using any of the kdb+ as client interfaces, you can deserialize the responses, and then process as normal.

Added Bonus

Using this strategy has the additional benefit of preserving type information. JSON responses have the disadvantage of converting all numbers to floats, and may truncate the precision of timestamps.

For each of the following examples, we assume you have INSIGHTS_TOKEN and INSIGHTS_HOSTNAME defined in your environment.

# Save results to results.dat
curl -X POST --header "Content-Type: application/json"\
    --header "Accepted: application/octet-stream"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    -o results.dat\
    --data "{\"table\":\"trace\"}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Start qce and deserialize the response:

-9!read1`:results.dat
URL:"https://",getenv[`INSIGHTS_HOSTNAME],"/servicegateway/kxi/getData";
headers:("Accepted";"Content-Type";"Authorization")!(
    "application/octet-stream";
    "application/json";
    "Bearer ",getenv `INSIGHTS_TOKEN);
body:.j.j enlist[`table]!enlist "trace";
resp:.kurl.sync (URL; `POST; `binary`headers`body!(1b;headers;body));
if[200 <> first resp; 'last resp];
show -9!last resp

Ensure your copy of c.js has decompression support: // 2021.04.05 added decompress support

const https = require('https');
const c = require('./c');
let TOKEN = process.env.INSIGHTS_TOKEN;
const options = {
    host    : process.env.INSIGHTS_HOSTNAME,
    path    : '/servicegateway/kxi/getData',
    method  : 'POST',
    headers : {
        'Accepted'      : 'application/octet-stream',
        'Content-Type'  : 'application/json',
        'Authorization' : 'Bearer ' + TOKEN
    }
};
let body = {'table' : 'trace'};
let request = https.request(options, (res) => {
    res.setEncoding('binary');
    if (res.statusCode !== 200) {
        console.error(`Non 200 error code ${res.statusCode}`)
        res.resume();
        return;
    }
    let chunks = [];
    res.on('data', (chunk) => {
        chunks.push(Buffer.from(chunk, 'binary'));
    });
    res.on('end', () => {
        let b = Buffer.concat(chunks);
        console.log(c.deserialize(b));
    });
    });
request.write(JSON.stringify(body));
request.end();
request.on('error', (err) => {
    console.error(`Encountered an error trying to make a request: ${err.message}`);
});

KX Insights Platform REST client

For more details on using the rest client see here

Installing a custom API

When deploying an assembly, you may install a custom API by setting a file location on a pre-existing volume mount.

  • Define any volumes under assembly.spec.
  • Define volume mounts for each key under assembly.spec.elements.dap.instances.
  • Define customFile for each key under assembly.spec.elements.dap.instances.

Volumes and Volume Mounts

Volumes and Volume mounts use the standard Kubernetes definitions.

Using a pre-existing config map named custom-api-configmap:

spec:
  volumes:
   - name: custom-api-configmap
     configMap:
       name: custom-api-configmap
  elements:
    dap:
      instances:
        idb:
          mountName: idb
          customFile: /tmp/src/fx.q
          volumeMounts:
            - name: custom-api-configmap
              mountPath: /tmp/src/fx.q
        hdb:
          mountName: hdb
          customFile: /tmp/src/fx.q
          volumeMounts:
            - name: custom-api-configmap
              mountPath: /tmp/src/fx.q
        rdb:
          mountName: rdb
          customFile: /tmp/src/fx.q
          volumeMounts:
            - name: custom-api-configmap
              mountPath: /tmp/src/fx.q

Below is an example of what this config map might look like:

apiVersion: v1
kind: ConfigMap
metadata:
  name: custom-api-configmap
data:
  fx.q: |
     // Define your API body here:
     ...
     // Now register your API, providing parameter names/types/descriptions, an api level description:
     .da.registerAPI[`myAPI;
       .sapi.metaDescription["Simple 'select ... from ... where ...' API."],
       .sapi.metaParam[`name`type`isReq`description!(`table;-11h;1b;"Table to query")],
       .sapi.metaParam[`name`type`isReq`default`description!(`filter;0 10h;0b;();"Filter (in functional or string form).")],
       .sapi.metaParam[`name`type`isReq`default`description!(`col;11 -11h;0b;`sym`time;"Column(s) to select.")],
       .sapi.metaReturn[`type`description!(98h;"Result of the select.")],
       .sapi.metaMisc[enlist[`safe]!enlist 1b]
       ]

For more information on API metadata descriptions and parameter types see metadata registration.

Updates to custom API

To make updates to custom API, simply modify the source file on the volume mount itself.

In order for a pod to utilize the new API, restart the data access pods.

Config maps

If you are using a config map as your volume, you may edit the volume interactively with kubectl edit.

Installing custom aggregator API

When installing insights, you may set custom aggregator APIs using values in the base configuration file you supply to helm.

You must use a pre-existing volume mount.

Using a pre-existing config map named custom-agg-configmap:

service-gateway:
  aggregator:
    volumes:
     - name: custom-agg-configmap
       configMap:
         name: custom-agg-configmap
    customFile: /tmp/src/customAgg.q
    volumeMounts:
      - name: custom-agg-configmap
        mountPath: /tmp/src
Below is an example of what this config map might look like:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: custom-agg-configmap
data:
  customAgg.q: |
    //
    // @desc Agg function that does an average daily count by sym.
    //
    // @param tbls  {table[]}   List of tables with ``` `sym`date`cnt``` columns.
    //
    // @return      {table}     Average count by sym
    //
    avAgg:{[tbls]
        res:select sum cnt by sym,date from raze 0!'tbls; / Join common dates
        select avg cnt by sym from res / Average
        }
    .sgagg.registerAggFn[`avAgg;
        .sapi.metaDescription["Average join aggregation"],
        .sapi.metaParams[`name`type`description!(`tbls;0h;"Tables received from DAPs")],
        .sapi.metaReturn`type`description!(98h;"The average join (over) of the tables");
        `$()
        ]

Volumes and Volume mounts use the standard Kubernetes definitions.

For more information on API metadata descriptions and parameter types see metadata registration.

For more examples on custom aggregation functions see custom file example.

Data Tiers and Life-cycle

Databases in insights are distributed across tiers. Data migrates across tiers as the data ages.

Data tiers are configured in the assembly specification, and involves describing mounts, and a data retention life cycle.

Newly received data can be made available in-memory for a number of days, before being migrated to on-disk storage or cloud storage. This enables a faster response time for recent data.

An example mount description detailing that the IDB/HDB are to be kept in a Rook CephFS partition, under the root /data/db.

  mounts:
    rdb:
      type: stream
      baseURI: none
      partition: none
    idb:
      type: local
      baseURI: file:///data/db/idb
      partition: ordinal
      volume:
        storageClass: "rook-cephfs"
        accessModes:
          - ReadWriteMany
    hdb:
      type: local
      baseURI: file:///data/db/hdb
      partition: date
      dependency:
      - idb
      volume:
        storageClass: "rook-cephfs"
        accessModes:
          - ReadWriteMany

An example showing corresponding data tiering configuration, saved under the storage manager elements.

Intra-day data would migrate from memory, to on disk every ten hours, again every midnight, and be retained for 3 months.

  elements:
    sm:
      source: south
      tiers:
        - name: streaming
          mount: rdb
        - name: interval
          mount: idb
          schedule:
            freq: 00:10:00
            snap: 00:00:00
        - name: recent
          mount: hdb
          schedule:
            freq: 1D00:00:00
            snap:   01:35:00
          retain:
            time: 3 Months

For a full detail description of data tiering, such as data compression, see the storage manager elements configuration.

Querying is tier agnostic

Do do not specify a tier when accessing data, instead use labels to query data.

Querying inside a sandbox

Sandboxes were primarily designed to isolate freeform queries from API-driven critical queries. However, sandbox functionality is flexible and can accommodate getData calls and your custom APIs.

To use this functionality, create a distributed sandbox exactly as is done for SQL.

Now, with your sandbox running, you can issue requests to getData through the /servicegateway API, using the SANDBOX_ID you have generated.

When you issue this query, it will operate against your real-time and historical data.

curl -X POST\
    --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data '{"table":"trace", "filter":[["<", "sensorID", 100]]}\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/sandbox/${SANDBOX_ID}/kxi/getData"

When you are finished, you can tear down the sandbox:

curl -X POST --header "Authorization: Bearer $INSIGHTS_TOKEN" \
    https://${INSIGHTS_HOSTNAME}/kxicontroller/sandbox/${SANDBOX_ID}/teardown

Troubleshooting

Please see the Troubleshooting section from the data-access microservices