Reference Data
Reference data is non-timeseries data that has some meaningful relation to other data.
Reference data is typically small, static, and/or slowly changing.
A typical example of reference data would be organizational data, which may include postal codes, addresses, names, dates of birth, and more. Reference data are pieces of information that you want to reference when you query or analyze other data.
To define a reference table, define the schema and pick one or more keys as primary keys. A primary key should uniquely identify the reference data.
In the corresponding time series data table, set the foreign
property to the table and column name of a primary key column in the reference data.
tables/trade.yaml
:
name: trade
description: Trade date
type: partitioned
columns:
- name: sym
description: trade symbol
type: symbol
attrMem: grouped
attrDisk: parted
attrOrd: parted
- name: code
type: symbol
foreign: markets.code
description: Code for the market the stock was exchanged on
tables/markets.yaml
:
name: markets
description: reference market data
type: splayed
primaryKeys:
- code
columns:
- name: code
type: symbol
description: Market code
- name: opCode
type: string
description: Market operating (parent) code
- name: updateTS
description: Timestamp of last mutation
type: timestamp
Memory considerations
For reference data, the table category is important in determining the memory requirements of having the table. For tables.*.type
of splayed
or basic
the table is stored on-disk and memory mapped by the Data Access Processes. With this table type only recent records or updates are stored in memory until the Storage Manager writes the records down at end-of-interval. If instead, for performance reasons, you want the table to be stored fully in-memory within the DAPs, you can set the table type to splayed_mem
.
Importing Reference Data
Importing reference data is done by publishing to the reference data table, with the results being upsert
ed to the reference-data table. Examples here are provided using the Stream Processor.
Examples that follow show reference metadata for stock listings, where the metadata is simply the stock's full name.
First, create a .qsp
pipeline that reads from a callback and writes to a steam named after the reference table
.qsp.run
.qsp.read.fromCallback[`pubListings]
.qsp.write.toStream[`listings];
To update or append to reference data, pass a table to the callback
listings:getFromCSV[];
-1 string[.z.p]," publishing reference data";
pubListings listings
Scheduled updates to reference data
If reference data needs to be updated on some condition or schedule, you can invoke the callback with a timer.
For example, the name of our timer function is .pub.listings
which calls the callback pubListings
.
.pub.listings:{
// Read CSV from disk, scrape it the web, etc
listings:getCSVData[];
-1 string[.z.p]," publishing reference data";
pubListings listings
};
.qsp.onStart {
// Reload reference data every 10 seconds
.tm.add[`listings;(`.pub.listings;::);10000;0];
};
.qsp.run
.qsp.read.fromCallback[`pubListings]
.qsp.write.toStream[`listings];
Example Reference Data
This example will demonstrate how to query timeseries data, with reference data from a CSV source joined.
In this example, we have timeseries data in the table trade
, and reference data in the table markets
.
Both the trade data and the market data are randomly generated and saved on a schedule.
The example deploys two pipelines:
-
One pipeline scrapes market identifier codes data from: https://www.iso20022.org/market-identifier-codes.
-
Another pipeline generates random trades data and assigns a random market code for that trade.
Example Package
The example package content can be seen below.
manifest.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Manifest
uuid: f54d5cfb-c20f-47e9-9d2c-7e6b7641f1fd
name: ref-data
version: 0.0.1
metadata: {}
tables:
markets:
file: tables/markets.yaml
trade:
file: tables/trade.yaml
databases:
ref-data-db:
dir: databases/ref-data-db
shards:
- ref-data-db-shard
tables: []
pipelines:
trade-feed:
file: pipelines/trade-feed.yaml
scraper:
file: pipelines/scraper.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml
databases/ref-data-db/shards/ref-data-db-shard.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Shard
uuid: 2e81bdf5-d716-4ccb-8615-7ded1054a1db
name: ref-data-db-shard
labels:
type: basic
dbSettings:
encryption:
encryptAll: true
sm:
clusterMode: shared
k8sPolicy:
serviceAccountConfigure:
create: true
reloadTimeout: 0D01:00:00
rtLogVolume:
mountPath: /logs/rt/
persistLogs: true
size: 20Gi
single: true
size: 1
source: south
tiers:
- mount: rdb
name: streaming
- mount: idb
name: interval
schedule:
freq: 01:00:00
- mount: hdb
name: recent
schedule:
freq: 1D00:00:00
snap: 01:35:00
daps:
instances:
idb:
allowPartialResults: true
enforceSchema: false
k8sPolicy:
serviceAccountConfigure:
create: true
mapPartitions: false
mountName: idb
rtLogVolume:
mountPath: /logs/rt/
persistLogs: true
size: 20Gi
size: 1
hdb:
allowPartialResults: true
enforceSchema: false
k8sPolicy:
serviceAccountConfigure:
create: true
mapPartitions: false
mountName: hdb
rtLogVolume:
mountPath: /logs/rt/
persistLogs: true
size: 20Gi
size: 1
rdb:
allowPartialResults: true
enforceSchema: false
k8sPolicy:
serviceAccountConfigure:
create: true
mapPartitions: false
mountName: rdb
rtLogVolume:
mountPath: /logs/rt/
persistLogs: true
size: 20Gi
size: 1
source: south
lateData: true
sequencers:
south:
external: false
k8sPolicy:
resources:
limits:
memory: 512Mi
cpu: '0.5'
requests:
memory: 512Mi
cpu: '0.5'
tmpDirSize: 100Mi
serviceAccountConfigure:
create: true
maxDiskUsagePercent: 90
size: 3
topicConfig:
topicPrefix: rt-
topicConfigDir: /config/topics/
volume:
mountPath: /s/
size: 20Gi
subPaths:
cp: state
in: in
out: out
north:
external: true
k8sPolicy:
resources:
limits:
memory: 512Mi
cpu: '0.5'
requests:
memory: 512Mi
cpu: '0.5'
tmpDirSize: 100Mi
serviceAccountConfigure:
create: true
maxDiskUsagePercent: 90
size: 3
topicConfig:
subTopic: data
topicPrefix: rt-
topicConfigDir: /config/topics/
volume:
mountPath: /s/
size: 20Gi
subPaths:
cp: state
in: in
out: out
mounts:
rdb:
baseURI: none
dependency:
- idb
description: KXI Database Mount
partition: none
type: stream
volume:
size: 20Gi
idb:
baseURI: file:///data/db/idb
description: KXI Database Mount
partition: ordinal
type: local
volume:
size: 20Gi
hdb:
baseURI: file:///data/db/hdb
dependency:
- idb
description: KXI Database Mount
partition: date
type: local
volume:
size: 20Gi
deployment_config/deployment_config.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/DeploymentConfig
uuid: 1b7b7e86-f48f-4c58-beef-73a58b842ff8
attach: false
imagePullPolicy: IfNotPresent
qlog:
formatMode: json
router/router.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Router
uuid: 1be87ec4-6311-44b7-889c-9ee6fa739b75
name: router
tables/.uuid
:
a762157a-b5e7-4e8c-aff0-b0a9e49d6384
tables/trade.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Table
name: trade
blockSize: 10000
columns:
- attrDisk: parted
attrMem: grouped
attrOrd: parted
description: trade symbol
name: sym
type: symbol
- description: Code for the market the stock was exchanged on
foreign: markets.code
name: code
type: symbol
- name: price
type: float
- description: timestamp
name: time
type: timestamp
description: Trade date
prtnCol: time
sortColsDisk:
- sym
sortColsOrd:
- sym
type: partitioned
tables/markets.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Table
name: markets
columns:
- description: Market code
name: code
type: symbol
- description: Market operating (parent) code
name: opCode
type: string
- description: Market website
name: site
type: string
- description: Timestamp of last mutation
name: updateTS
type: timestamp
description: reference market data
primaryKeys:
- code
type: splayed
pipelines/trade-feed.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Pipeline
base: q
destination: south
name: trade-feed
protectedExecution: false
replicaAffinityTopologyKey: zone
replicas: 1
source: north
spec: "N:100; // Rows of random trade date per update\n\n// Append N random trades
that reference random markets\n// Trades occur randomly on different branches of
NYSE\n.pub.trades:{\n -1 string[.z.p],\" publishing trades\";\n pubTrades
([]\n sym : N?`AAPL`MSFT`EBAY`SHOP;\n code : N?`XNLI`NYSD`AMXO`ARCD`ARCO`XNYS`XCHI;\n
\ price : N?2000f\n )};\n\n// Create timers to append trades\n.qsp.onStart
{\n .tm.add[`trade;(`.pub.trades;::);5000;0]; // Add N rows every 5 seconds\n
\ }\n\n.qsp.run\n .qsp.read.fromCallback[`pubTrades]\n .qsp.map[{ update
time: .z.p from x }]\n .qsp.write.toStream[`trade];"
type: spec
pipelines/scraper.yaml
:
# yaml-language-server: $schema=https://code.kx.com/insights/enterprise/packaging/schemas/package.json#/$defs/Pipeline
base: q
destination: south
name: scraper
protectedExecution: false
replicaAffinityTopologyKey: zone
replicas: 1
source: north
spec: "getMarketIdentifierCodes:{[]\n -1 \"Downloading Market Identifier Codes\";\n
\ resp:.kurl.sync (\"https://www.iso20022.org/sites/default/files/ISO10383_MIC/ISO10383_MIC.csv\";`GET;()!());\n
\ if[200i <> resp 0;\n -2 \"Downloading data response code is: \", string
resp 0;\n -2 \"Reason: \", resp 1\n '\"Failed to download market identifier
codes\";\n ];\n -1 \"Parsing market identifier codes from CSV\";\n t:(12#\"S\";enlist
\",\") 0: \"\\r\\n\" vs last resp;\n // Rename cols so they are q friendly\n
\ t:`country`iso`code`opCode`os`institution`acronym`city`site`statusDate`status`creationDate
xcol t;\n // For brevity, we only save a few columns.\n -1 \"Returning code
+ opCode + site data\";\n :select code, opCode, site:string site from t;\n };\n\nonDownloadError:{[x]\n
\ -2 \"Failed to download initial market identifier codes. \",x,\". Returning
mock\";\n :([] code:`XCHI`XNYS; opCode:`XNYS`XNYS; site:(\"WWW.NYSE.COM\";\"WWW.NYSE.COM\"))\n
\ };\n\n// Refreshing market codes info\n.pub.markets:{\n -1 string[.z.p],\"
reloading market reference data\";\n pubMarkets @[getMarketIdentifierCodes;::;onDownloadError];\n
\ };\n\n// Create timers to update markets codes\n.qsp.onStart {\n .tm.add[`markets;(`.pub.markets;::);14400000;0];
// Redownload every 4 hours\n };\n\n.qsp.run\n .qsp.read.fromCallback[`pubMarkets]\n
\ .qsp.map[{ update updateTS: .z.p from x }]\n .qsp.write.toStream[`markets]"
type: spec
Apply the package using the kdb Insights CLI:
kxi pm deploy ref-data
Using the Market Identifier Code (MIC) for a given trade, we can lookup the parent market identifier (operating MIC) in the reference data.
For example, the NYSE Chicago has the MIC XCHI
, which in turn has the operating MIC of XNYS
which is the parent: The NYSE. This means that the New York Stock Exchange is the parent exchange for the Chicago branch of the NYSE.
To query trades data with the getData
API, use the agg
parameter to specify the list of columns you want back. By specifying our foreign key, and a column in the market data, we can get back the operating MIC for a given trade.
# Get all data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
echo "start=$startTS"
echo "end=$endTS"
curl -X POST --insecure --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\":\"trade\", \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"agg\": [\"sym\",\"price\",\"code\", "markets.opCode"]}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
Inspecting the JSON results will show that the query returned codes for each trade and the related operating MIC populated as markets.opCode
.
{
"sym": "AAPL",
"price": 159.1893,
"code": "XCHI",
"markets.opCode": "XNYS"
}
Now, by taking one of the operating codes from our result, and using it as a code, we can lookup more about the parent exchange from the market data:
curl -X POST --insecure --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\":\"markets\", \"filter\": [[\"=\",\"code\",\"XNYS\"]]}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
An example response shows the parent exchange and when it was last refreshed:
{"header":{"rcvTS":"2022-05-12T12:33:23.379000000","corr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","protocol":"gw","logCorr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","client":":10.0.2.7:5050","http":"json","api":".kxi.getData","ogRcID":"10.0.2.5:5060","retryCount":0,"to":"2022-05-12T12:33:53.379000000","agg":":10.0.5.92:5070","pvVer":2,"rc":0,"ac":0,"ai":""},"payload":[{"code":"XNYS","opCode":"XNYS","site":"WWW.NYSE.COM","updateTS":"2022-05-12T12:16:39.339100303"}]}
Shards
If setting up multiple packages, that each publish a reference table, and that table is not mirrored, set the isSharded
property for the table to true. This specifies that the table is intended to be sharded across multiple packages.
Queries against a sharded table will aggregate.
For example, let's say we have two feeds for user data, one from the US, another from EU, and we want to save this to an accounts reference table.
If shards are not enabled, a query would return either the US or the EU users, which is not what we necessarily want. In this configuration, the database assumes the data is mirrored copies.
With shards are enabled, the query will return the combined US and EU users.
tables/accounts.yaml
:
name: accounts
isSharded: true
For more information see routing.