Skip to content

Reference Data

Reference data is non-timeseries data that has some meaningful relation to other data.

Reference data is typically small, static, and/or slowly changing.

A typical example of reference data would be organizational data, which may include postal codes, addresses, names, dates of birth, and more. Reference data are pieces of information that you want to reference when you query or analyze other data.

To define a reference table, define the schema and pick one or more keys as primary keys. A primary key should uniquely identify the reference data.

In the corresponding time series data table, set the foreign property to the table and column name of a primary key column in the reference data.

spec:
  tables:
    trade:
      description: Trade date
      type: partitioned
      columns:
        - name: sym
          description: trade symbol
          type: symbol
          attrMem: grouped
          attrDisk: parted
          attrOrd: parted
        - name: code
          type: symbol
          foreign: markets.code
          description: Code for the market the stock was exchanged on
    markets:
       description: reference market data
       type: splayed
       primaryKeys:
        - code
       columns:
        - name: code
          type: symbol
          description: Market code
        - name: opCode
          type: string
          description: Market operating (parent) code
        - name: updateTS
          description: Timestamp of last mutation
          type: timestamp

Importing Reference Data

Importing reference data is done by publishing to the reference data table, with the results being upserted to the reference-data table. Examples here are provided using the Stream Processor.

Examples that follow show reference metadata for stock listings, where the metadata is simply the stock's full name.

First, create a .qsp pipeline that reads from a callback and writes to a steam named after the reference table

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

To update or append to reference data, pass a table to the callback

listings:getFromCSV[];
-1 string[.z.p]," publishing reference data";
pubListings listings

Scheduled updates to reference data

If reference data needs to be updated on some condition or schedule, you can invoke the callback with a timer.

For example, the name of our timer function is .pub.listings which calls the callback pubListings.

.pub.listings:{
    // Read CSV from disk, scrape it the web, etc
    listings:getCSVData[];
    -1 string[.z.p]," publishing reference data";
    pubListings listings
    };

.qsp.onStart {
    // Reload reference data every 10 seconds
    .tm.add[`listings;(`.pub.listings;::);10000;0];
    };

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

Example Reference Data

This example will demonstrate how to query timeseries data, with reference data from a CSV source joined.

In this example, we have timeseries data in the table trade, and reference data in the table markets.

Both the trade data and the market data are randomly generated and saved on a schedule.

The example deploys two pipelines:

  • One pipeline scrapes market identifier codes data from: https://www.iso20022.org/market-identifier-codes.

  • Another pipeline generates random trades data and assigns a random market code for that trade.

Example Assembly

The example assembly content can be seen below.

apiVersion: insights.kx.com/v1
kind: Assembly
metadata:
  name: ref-data
  labels:
    env: dev
spec:
  labels:
    type: basic
  tables:
    trade:
      description: Trade date
      type: partitioned
      blockSize: 10000
      prtnCol: time
      sortColsOrd: [sym]
      sortColsDisk: [sym]
      columns:
        - name: sym
          description: trade symbol
          type: symbol
          attrMem: grouped
          attrDisk: parted
          attrOrd: parted
        - name: code
          type: symbol
          foreign: markets.code
          description: Code for the market the stock was exchanged on
        - name: price
          type: float
        - name: time
          description: timestamp
          type: timestamp
    markets:
       description: reference market data
       type: splayed
       primaryKeys:
        - code
       columns:
        - name: code
          type: symbol
          description: Market code
        - name: opCode
          type: string
          description: Market operating (parent) code
        - name: site
          type: string
          description: Market website
        - name: updateTS
          description: Timestamp of last mutation
          type: timestamp
  mounts:
    rdb:
      type: stream
      baseURI: none
      partition: none
      dependency:
      - idb
    idb:
      type: local
      baseURI: file:///data/db/idb
      partition: ordinal
    hdb:
      type: local
      baseURI: file:///data/db/hdb
      partition: date
      dependency:
      - idb
  elements:
    sp:
      description: Stream for scrapping market identifier codes
      pipelines:
        scrapper:
          protectedExecution: false
          source: north
          destination: south
          spec: |-
            getMarketIdentifierCodes:{[]
                -1 "Downloading Market Identifier Codes";
                resp:.kurl.sync ("https://www.iso20022.org/sites/default/files/ISO10383_MIC/ISO10383_MIC.csv";`GET;()!());
                if[200i <> resp 0;
                    -2 "Downloading data response code is: ", string resp 0;
                    -2 "Reason: ", resp 1
                    '"Failed to download market identifier codes";
                    ];
                -1 "Parsing market identifier codes from CSV";
                t:(12#"S";enlist ",") 0: "\r\n" vs last resp;
                // Rename cols so they are q friendly
                t:`country`iso`code`opCode`os`insitution`acronym`city`site`statusDate`status`creationDate xcol t;
                // For brevity, we only save a few columns.
                -1 "Returning code + opCode + site data";
                :select code, opCode, site:string site from t;
                };

            onDownloadError:{[x]
                -2 "Failed to download initial market identifier codes. ",x,". Returning mock";
                :([] code:`XCHI`XNYS; opCode:`XNYS`XNYS; site:("WWW.NYSE.COM";"WWW.NYSE.COM"))
                };

            // Refreshing market codes info
            .pub.markets:{
                -1 string[.z.p]," reloading market reference data";
                pubMarkets @[getMarketIdentifierCodes;::;onDownloadError];
                };

            // Create timers to update markets codes
            .qsp.onStart {
                .tm.add[`markets;(`.pub.markets;::);14400000;0]; // Redownload every 4 hours
                };

            .qsp.run
                .qsp.read.fromCallback[`pubMarkets]
                  .qsp.map[{ update updateTS: .z.p from x }]
                  .qsp.write.toStream[`markets]
        trade-feed:
          protectedExecution: false
          source: north
          destination: south
          spec: |-
            N:100; // Rows of random trade date per update

            // Append N random trades that reference random markets
            // Trades occur randomly on different branches of NYSE
            .pub.trades:{
                -1 string[.z.p]," publishing trades";
                pubTrades ([]
                    sym    : N?`AAPL`MSFT`EBAY`SHOP;
                    code   : N?`XNLI`NYSD`AMXO`ARCD`ARCO`XNYS`XCHI;
                    price  : N?2000f
                )};

            // Create timers to append trades
            .qsp.onStart {
                .tm.add[`trade;(`.pub.trades;::);5000;0]; // Add N rows every 5 seconds
                }

            .qsp.run
                .qsp.read.fromCallback[`pubTrades]
                  .qsp.map[{ update time: .z.p from x }]
                  .qsp.write.toStream[`trade];
    sm:
      source: south
      tiers:
        - name: streaming
          mount: rdb
        - name: interval
          mount: idb
          schedule:
            freq: 01:00:00
        - name: recent
          mount: hdb
          schedule:
            freq: 1D00:00:00
            snap:   01:35:00
    dap:
      instances:
        idb:
          size: 1
          mountName: idb
        hdb:
          size: 1
          mountName: hdb
        rdb:
          size: 1
          mountName: rdb
          source: south
    sequencer:
      south:
        external: false
        k8sPolicy:
          resources:
            requests:
              memory: "512Mi"
              cpu: 0.5
            limits:
              memory: "512Mi"
              cpu: 0.5
      north:
        k8sPolicy:
          resources:
            requests:
              memory: "512Mi"
              cpu: 0.5
            limits:
              memory: "512Mi"
              cpu: 0.5
        external: true
        topicConfig:
          subTopic: "data"

Apply the assembly using the kdb Insights CLI:

kxi assembly deploy --filepath ref-data.yml

Using the Market Identifier Code (MIC) for a given trade, we can lookup the parent market identifier (operating MIC) in the reference data.

For example, the NYSE Chicago has the MIC XCHI, which in turn has the operating MIC of XNYS which is the parent: The NYSE. This means that the New York Stock Exchange is the parent exchange for the Chicago branch of the NYSE.

To query trades data with the getData API, use the agg parameter to specify the list of columns you want back. By specifying our foreign key, and a column in the market data, we can get back the operating MIC for a given trade.

# Get all data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')

echo "start=$startTS"
echo "end=$endTS"

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"trade\", \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"agg\": [\"sym\",\"price\",\"code\", "markets.opCode"]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Inspecting the JSON results will show that the query returned codes for each trade and the related operating MIC populated as markets.opCode.

    {
      "sym": "AAPL",
      "price": 159.1893,
      "code": "XCHI",
      "markets.opCode": "XNYS"
    }

Now, by taking one of the operating codes from our result, and using it as a code, we can lookup more about the parent exchange from the market data:

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"markets\", \"filter\": [[\"=\",\"code\",\"XNYS\"]]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

An example response shows the parent exchange and when it was last refreshed:

{"header":{"rcvTS":"2022-05-12T12:33:23.379000000","corr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","protocol":"gw","logCorr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","client":":10.0.2.7:5050","http":"json","api":".kxi.getData","ogRcID":"10.0.2.5:5060","retryCount":0,"to":"2022-05-12T12:33:53.379000000","agg":":10.0.5.92:5070","pvVer":2,"rc":0,"ac":0,"ai":""},"payload":[{"code":"XNYS","opCode":"XNYS","site":"WWW.NYSE.COM","updateTS":"2022-05-12T12:16:39.339100303"}]}

Shards

If setting up multiple assemblies, that each publish a reference table, and that table is not mirrored, set the isSharded property for the table to true. This specifies that the table is intended to be sharded across multiple assemblies.

Queries against a sharded table will aggregate.

For example, let's say we have two feeds for user data, one from the US, another from EU, and we want to save this to an accounts reference table.

If shards are not enabled, a query would return either the US or the EU users, which is not what we necessarily want. In this configuration, the database assumes the data is mirrored copies.

With shards are enabled, the query will return the combined US and EU users.

spec:
  tables:
    accounts:
        isSharded: true

For more information see routing.