Skip to content

Reference Data

Reference data is non-timeseries data that has some meaningful relation to other data.

Reference data is typically small, static, and/or slowly changing.

A typical example of reference data would be organizational data, which may include postal codes, addresses, names, dates of birth, and more. Reference data are pieces of information that you want to reference when you query or analyze other data.

To define a reference table, define the schema and pick one or more keys as primary keys. A primary key should uniquely identify the reference data.

In the corresponding time series data table, set the foreign property to the table and column name of a primary key column in the reference data.

tables/trade.yaml:

name: trade
description: Trade date
type: partitioned
columns:
  - name: sym
    description: trade symbol
    type: symbol
    attrMem: grouped
    attrDisk: parted
    attrOrd: parted
  - name: code
    type: symbol
    foreign: markets.code
    description: Code for the market the stock was exchanged on

tables/markets.yaml:

name: markets
description: reference market data
type: splayed
primaryKeys:
  - code
columns:
  - name: code
    type: symbol
    description: Market code
  - name: opCode
    type: string
    description: Market operating (parent) code
  - name: updateTS
    description: Timestamp of last mutation
    type: timestamp

Memory considerations

For reference data, the table category is important in determining the memory requirements of having the table. For tables.*.type of splayed or basic the table is stored on-disk and memory mapped by the Data Access Processes. With this table type only recent records or updates are stored in memory until the Storage Manager writes the records down at end-of-interval. If instead, for performance reasons, you want the table to be stored fully in-memory within the DAPs, you can set the table type to splayed_mem.

Importing Reference Data

Importing reference data is done by publishing to the reference data table, with the results being upserted to the reference-data table. Examples here are provided using the Stream Processor.

Examples that follow show reference metadata for stock listings, where the metadata is simply the stock's full name.

First, create a .qsp pipeline that reads from a callback and writes to a steam named after the reference table

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

To update or append to reference data, pass a table to the callback

listings:getFromCSV[];
-1 string[.z.p]," publishing reference data";
pubListings listings

Scheduled updates to reference data

If reference data needs to be updated on some condition or schedule, you can invoke the callback with a timer.

For example, the name of our timer function is .pub.listings which calls the callback pubListings.

.pub.listings:{
    // Read CSV from disk, scrape it the web, etc
    listings:getCSVData[];
    -1 string[.z.p]," publishing reference data";
    pubListings listings
    };

.qsp.onStart {
    // Reload reference data every 10 seconds
    .tm.add[`listings;(`.pub.listings;::);10000;0];
    };

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

Example Reference Data

This example will demonstrate how to query timeseries data, with reference data from a CSV source joined.

In this example, we have timeseries data in the table trade, and reference data in the table markets.

Both the trade data and the market data are randomly generated and saved on a schedule.

The example deploys two pipelines:

  • One pipeline scrapes market identifier codes data from: https://www.iso20022.org/market-identifier-codes.

  • Another pipeline generates random trades data and assigns a random market code for that trade.

Example Package

The example package content can be seen below.

manifest.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Manifest
uuid: f54d5cfb-c20f-47e9-9d2c-7e6b7641f1fd
name: ref-data
version: 0.0.1
metadata: {}
tables:
  markets:
    file: tables/markets.yaml
  trade:
    file: tables/trade.yaml
databases:
  ref-data-db:
    dir: databases/ref-data-db
    shards:
    - ref-data-db-shard
    tables: []
pipelines:
  trade-feed:
    file: pipelines/trade-feed.yaml
  scraper:
    file: pipelines/scraper.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml

databases/ref-data-db/shards/ref-data-db-shard.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Shard
uuid: 2e81bdf5-d716-4ccb-8615-7ded1054a1db
name: ref-data-db-shard
labels:
  type: basic
dbSettings:
  encryption:
    encryptAll: true
sm:
  clusterMode: shared
  k8sPolicy:
    serviceAccountConfigure:
      create: true
  reloadTimeout: 0D01:00:00
  rtLogVolume:
    mountPath: /logs/rt/
    persistLogs: true
    size: 20Gi
  single: true
  size: 1
  source: south
  tiers:
  - mount: rdb
    name: streaming
  - mount: idb
    name: interval
    schedule:
      freq: 01:00:00
  - mount: hdb
    name: recent
    schedule:
      freq: 1D00:00:00
      snap: 01:35:00
daps:
  instances:
    idb:
      allowPartialResults: true
      enforceSchema: false
      k8sPolicy:
        serviceAccountConfigure:
          create: true
      mapPartitions: false
      mountName: idb
      rtLogVolume:
        mountPath: /logs/rt/
        persistLogs: true
        size: 20Gi
      size: 1
    hdb:
      allowPartialResults: true
      enforceSchema: false
      k8sPolicy:
        serviceAccountConfigure:
          create: true
      mapPartitions: false
      mountName: hdb
      rtLogVolume:
        mountPath: /logs/rt/
        persistLogs: true
        size: 20Gi
      size: 1
    rdb:
      allowPartialResults: true
      enforceSchema: false
      k8sPolicy:
        serviceAccountConfigure:
          create: true
      mapPartitions: false
      mountName: rdb
      rtLogVolume:
        mountPath: /logs/rt/
        persistLogs: true
        size: 20Gi
      size: 1
      source: south
  lateData: true
sequencers:
  south:
    external: false
    k8sPolicy:
      resources:
        limits:
          memory: 512Mi
          cpu: '0.5'
        requests:
          memory: 512Mi
          cpu: '0.5'
        tmpDirSize: 100Mi
      serviceAccountConfigure:
        create: true
    maxDiskUsagePercent: 90
    size: 3
    topicConfig:
      topicPrefix: rt-
    topicConfigDir: /config/topics/
    volume:
      mountPath: /s/
      size: 20Gi
      subPaths:
        cp: state
        in: in
        out: out
  north:
    external: true
    k8sPolicy:
      resources:
        limits:
          memory: 512Mi
          cpu: '0.5'
        requests:
          memory: 512Mi
          cpu: '0.5'
        tmpDirSize: 100Mi
      serviceAccountConfigure:
        create: true
    maxDiskUsagePercent: 90
    size: 3
    topicConfig:
      subTopic: data
      topicPrefix: rt-
    topicConfigDir: /config/topics/
    volume:
      mountPath: /s/
      size: 20Gi
      subPaths:
        cp: state
        in: in
        out: out
mounts:
  rdb:
    baseURI: none
    dependency:
    - idb
    description: KXI Database Mount
    partition: none
    type: stream
    volume:
      size: 20Gi
  idb:
    baseURI: file:///data/db/idb
    description: KXI Database Mount
    partition: ordinal
    type: local
    volume:
      size: 20Gi
  hdb:
    baseURI: file:///data/db/hdb
    dependency:
    - idb
    description: KXI Database Mount
    partition: date
    type: local
    volume:
      size: 20Gi

deployment_config/deployment_config.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/DeploymentConfig
uuid: 1b7b7e86-f48f-4c58-beef-73a58b842ff8
attach: false
imagePullPolicy: IfNotPresent
qlog:
  formatMode: json

router/router.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Router
uuid: 1be87ec4-6311-44b7-889c-9ee6fa739b75
name: router

tables/.uuid:

a762157a-b5e7-4e8c-aff0-b0a9e49d6384

tables/trade.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Table
name: trade
blockSize: 10000
columns:
- attrDisk: parted
  attrMem: grouped
  attrOrd: parted
  description: trade symbol
  name: sym
  type: symbol
- description: Code for the market the stock was exchanged on
  foreign: markets.code
  name: code
  type: symbol
- name: price
  type: float
- description: timestamp
  name: time
  type: timestamp
description: Trade date
prtnCol: time
sortColsDisk:
- sym
sortColsOrd:
- sym
type: partitioned

tables/markets.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Table
name: markets
columns:
- description: Market code
  name: code
  type: symbol
- description: Market operating (parent) code
  name: opCode
  type: string
- description: Market website
  name: site
  type: string
- description: Timestamp of last mutation
  name: updateTS
  type: timestamp
description: reference market data
primaryKeys:
- code
type: splayed

pipelines/trade-feed.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Pipeline
base: q
destination: south
name: trade-feed
protectedExecution: false
replicaAffinityTopologyKey: zone
replicas: 1
source: north
spec: "N:100; // Rows of random trade date per update\n\n// Append N random trades
  that reference random markets\n// Trades occur randomly on different branches of
  NYSE\n.pub.trades:{\n    -1 string[.z.p],\" publishing trades\";\n    pubTrades
  ([]\n        sym    : N?`AAPL`MSFT`EBAY`SHOP;\n        code   : N?`XNLI`NYSD`AMXO`ARCD`ARCO`XNYS`XCHI;\n
  \       price  : N?2000f\n    )};\n\n// Create timers to append trades\n.qsp.onStart
  {\n    .tm.add[`trade;(`.pub.trades;::);5000;0]; // Add N rows every 5 seconds\n
  \   }\n\n.qsp.run\n    .qsp.read.fromCallback[`pubTrades]\n      .qsp.map[{ update
  time: .z.p from x }]\n      .qsp.write.toStream[`trade];"
type: spec

pipelines/scraper.yaml:

# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Pipeline
base: q
destination: south
name: scraper
protectedExecution: false
replicaAffinityTopologyKey: zone
replicas: 1
source: north
spec: "getMarketIdentifierCodes:{[]\n    -1 \"Downloading Market Identifier Codes\";\n
  \   resp:.kurl.sync (\"https://www.iso20022.org/sites/default/files/ISO10383_MIC/ISO10383_MIC.csv\";`GET;()!());\n
  \   if[200i <> resp 0;\n        -2 \"Downloading data response code is: \", string
  resp 0;\n        -2 \"Reason: \", resp 1\n        '\"Failed to download market identifier
  codes\";\n        ];\n    -1 \"Parsing market identifier codes from CSV\";\n    t:(12#\"S\";enlist
  \",\") 0: \"\\r\\n\" vs last resp;\n    // Rename cols so they are q friendly\n
  \   t:`country`iso`code`opCode`os`institution`acronym`city`site`statusDate`status`creationDate
  xcol t;\n    // For brevity, we only save a few columns.\n    -1 \"Returning code
  + opCode + site data\";\n    :select code, opCode, site:string site from t;\n    };\n\nonDownloadError:{[x]\n
  \   -2 \"Failed to download initial market identifier codes. \",x,\". Returning
  mock\";\n    :([] code:`XCHI`XNYS; opCode:`XNYS`XNYS; site:(\"WWW.NYSE.COM\";\"WWW.NYSE.COM\"))\n
  \   };\n\n// Refreshing market codes info\n.pub.markets:{\n    -1 string[.z.p],\"
  reloading market reference data\";\n    pubMarkets @[getMarketIdentifierCodes;::;onDownloadError];\n
  \   };\n\n// Create timers to update markets codes\n.qsp.onStart {\n    .tm.add[`markets;(`.pub.markets;::);14400000;0];
  // Redownload every 4 hours\n    };\n\n.qsp.run\n    .qsp.read.fromCallback[`pubMarkets]\n
  \     .qsp.map[{ update updateTS: .z.p from x }]\n      .qsp.write.toStream[`markets]"
type: spec

Apply the package using the kdb Insights CLI:

kxi pm deploy ref-data

Using the Market Identifier Code (MIC) for a given trade, we can lookup the parent market identifier (operating MIC) in the reference data.

For example, the NYSE Chicago has the MIC XCHI, which in turn has the operating MIC of XNYS which is the parent: The NYSE. This means that the New York Stock Exchange is the parent exchange for the Chicago branch of the NYSE.

To query trades data with the getData API, use the agg parameter to specify the list of columns you want back. By specifying our foreign key, and a column in the market data, we can get back the operating MIC for a given trade.

# Get all data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')

echo "start=$startTS"
echo "end=$endTS"

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"trade\", \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"agg\": [\"sym\",\"price\",\"code\", "markets.opCode"]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Inspecting the JSON results will show that the query returned codes for each trade and the related operating MIC populated as markets.opCode.

    {
      "sym": "AAPL",
      "price": 159.1893,
      "code": "XCHI",
      "markets.opCode": "XNYS"
    }

Now, by taking one of the operating codes from our result, and using it as a code, we can lookup more about the parent exchange from the market data:

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"markets\", \"filter\": [[\"=\",\"code\",\"XNYS\"]]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

An example response shows the parent exchange and when it was last refreshed:

{"header":{"rcvTS":"2022-05-12T12:33:23.379000000","corr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","protocol":"gw","logCorr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","client":":10.0.2.7:5050","http":"json","api":".kxi.getData","ogRcID":"10.0.2.5:5060","retryCount":0,"to":"2022-05-12T12:33:53.379000000","agg":":10.0.5.92:5070","pvVer":2,"rc":0,"ac":0,"ai":""},"payload":[{"code":"XNYS","opCode":"XNYS","site":"WWW.NYSE.COM","updateTS":"2022-05-12T12:16:39.339100303"}]}

Shards

If setting up multiple packages, that each publish a reference table, and that table is not mirrored, set the isSharded property for the table to true. This specifies that the table is intended to be sharded across multiple packages.

Queries against a sharded table will aggregate.

For example, let's say we have two feeds for user data, one from the US, another from EU, and we want to save this to an accounts reference table.

If shards are not enabled, a query would return either the US or the EU users, which is not what we necessarily want. In this configuration, the database assumes the data is mirrored copies.

With shards are enabled, the query will return the combined US and EU users.

tables/accounts.yaml:

name: accounts
isSharded: true

For more information see routing.