Reference Data
Reference data is non-timeseries data that has some meaningful relation to other data.
Reference data is typically small, static, and/or slowly changing.
A typical example of reference data would be organizational data, which may include postal codes, addresses, names, dates of birth, and more. Reference data are pieces of information that you want to reference when you query or analyze other data.
To define a reference table, define the schema and pick one or more keys as primary keys. A primary key should uniquely identify the reference data.
In the corresponding time series data table, set the foreign
property to the table and column name of a primary key column in the reference data.
spec:
tables:
trade:
description: Trade date
type: partitioned
columns:
- name: sym
description: trade symbol
type: symbol
attrMem: grouped
attrDisk: parted
attrOrd: parted
- name: code
type: symbol
foreign: markets.code
description: Code for the market the stock was exchanged on
markets:
description: reference market data
type: splayed
primaryKeys:
- code
columns:
- name: code
type: symbol
description: Market code
- name: opCode
type: string
description: Market operating (parent) code
- name: updateTS
description: Timestamp of last mutation
type: timestamp
Importing Reference Data
Importing reference data is done by publishing to the reference data table, with the results being upsert
ed to the reference-data table. Examples here are provided using the Stream Processor.
Examples that follow show reference metadata for stock listings, where the metadata is simply the stock's full name.
First, create a .qsp
pipeline that reads from a callback and writes to a steam named after the reference table
.qsp.run
.qsp.read.fromCallback[`pubListings]
.qsp.write.toStream[`listings];
To update or append to reference data, pass a table to the callback
listings:getFromCSV[];
-1 string[.z.p]," publishing reference data";
pubListings listings
Scheduled updates to reference data
If reference data needs to be updated on some condition or schedule, you can invoke the callback with a timer.
For example, the name of our timer function is .pub.listings
which calls the callback pubListings
.
.pub.listings:{
// Read CSV from disk, scrape it the web, etc
listings:getCSVData[];
-1 string[.z.p]," publishing reference data";
pubListings listings
};
.qsp.onStart {
// Reload reference data every 10 seconds
.tm.add[`listings;(`.pub.listings;::);10000;0];
};
.qsp.run
.qsp.read.fromCallback[`pubListings]
.qsp.write.toStream[`listings];
Example Reference Data
This example will demonstrate how to query timeseries data, with reference data from a CSV source joined.
In this example, we have timeseries data in the table trade
, and reference data in the table markets
.
Both the trade data and the market data are randomly generated and saved on a schedule.
The example deploys two pipelines: - One pipeline scrapes market identifier codes data from: https://www.iso20022.org/market-identifier-codes. - Another pipeline generates random trades data and assigns a random market code for that trade.
Example Assembly
The example assembly content can be seen below.
apiVersion: insights.kx.com/v1
kind: Assembly
metadata:
name: ref-data
labels:
env: dev
spec:
labels:
type: basic
tables:
trade:
description: Trade date
type: partitioned
blockSize: 10000
prtnCol: time
sortColsOrd: [sym]
sortColsDisk: [sym]
columns:
- name: sym
description: trade symbol
type: symbol
attrMem: grouped
attrDisk: parted
attrOrd: parted
- name: code
type: symbol
foreign: markets.code
description: Code for the market the stock was exchanged on
- name: price
type: float
- name: time
description: timestamp
type: timestamp
markets:
description: reference market data
type: splayed
primaryKeys:
- code
columns:
- name: code
type: symbol
description: Market code
- name: opCode
type: string
description: Market operating (parent) code
- name: site
type: string
description: Market website
- name: updateTS
description: Timestamp of last mutation
type: timestamp
mounts:
rdb:
type: stream
baseURI: none
partition: none
dependency:
- idb
idb:
type: local
baseURI: file:///data/db/idb
partition: ordinal
hdb:
type: local
baseURI: file:///data/db/hdb
partition: date
dependency:
- idb
elements:
sp:
description: Stream for scrapping market identifier codes
pipelines:
scrapper:
protectedExecution: false
source: north
destination: south
spec: |-
getMarketIdentifierCodes:{[]
-1 "Downloading Market Identifier Codes";
resp:.kurl.sync ("https://www.iso20022.org/sites/default/files/ISO10383_MIC/ISO10383_MIC.csv";`GET;()!());
if[200i <> resp 0;
-2 "Downloading data response code is: ", string resp 0;
-2 "Reason: ", resp 1
'"Failed to download market identifier codes";
];
-1 "Parsing market identifier codes from CSV";
t:(12#"S";enlist ",") 0: "\r\n" vs last resp;
// Rename cols so they are q friendly
t:`country`iso`code`opCode`os`insitution`acronym`city`site`statusDate`status`creationDate xcol t;
// For brevity, we only save a few columns.
-1 "Returning code + opCode + site data";
:select code, opCode, site:string site from t;
};
onDownloadError:{[x]
-2 "Failed to download initial market identifier codes. ",x,". Returning mock";
:([] code:`XCHI`XNYS; opCode:`XNYS`XNYS; site:("WWW.NYSE.COM";"WWW.NYSE.COM"))
};
// Refreshing market codes info
.pub.markets:{
-1 string[.z.p]," reloading market reference data";
pubMarkets @[getMarketIdentifierCodes;::;onDownloadError];
};
// Create timers to update markets codes
.qsp.onStart {
.tm.add[`markets;(`.pub.markets;::);14400000;0]; // Redownload every 4 hours
};
.qsp.run
.qsp.read.fromCallback[`pubMarkets]
.qsp.map[{ update updateTS: .z.p from x }]
.qsp.write.toStream[`markets]
trade-feed:
protectedExecution: false
source: north
destination: south
spec: |-
N:100; // Rows of random trade date per update
// Append N random trades that reference random markets
// Trades occur randomly on different branches of NYSE
.pub.trades:{
-1 string[.z.p]," publishing trades";
pubTrades ([]
sym : N?`AAPL`MSFT`EBAY`SHOP;
code : N?`XNLI`NYSD`AMXO`ARCD`ARCO`XNYS`XCHI;
price : N?2000f
)};
// Create timers to append trades
.qsp.onStart {
.tm.add[`trade;(`.pub.trades;::);5000;0]; // Add N rows every 5 seconds
}
.qsp.run
.qsp.read.fromCallback[`pubTrades]
.qsp.map[{ update time: .z.p from x }]
.qsp.write.toStream[`trade];
sm:
source: south
tiers:
- name: streaming
mount: rdb
- name: interval
mount: idb
schedule:
freq: 01:00:00
- name: recent
mount: hdb
schedule:
freq: 1D00:00:00
snap: 01:35:00
dap:
instances:
idb:
size: 1
mountName: idb
hdb:
size: 1
mountName: hdb
rdb:
size: 1
mountName: rdb
source: south
sequencer:
south:
external: false
k8sPolicy:
resources:
requests:
memory: "512Mi"
cpu: 0.5
limits:
memory: "512Mi"
cpu: 0.5
north:
k8sPolicy:
resources:
requests:
memory: "512Mi"
cpu: 0.5
limits:
memory: "512Mi"
cpu: 0.5
external: true
topicConfig:
subTopic: "data"
Apply the assembly using the kdb Insights CLI:
kxi assembly deploy --filepath ref-data.yml
Using the Market Identifier Code (MIC) for a given trade, we can lookup the parent market identifier (operating MIC) in the reference data.
For example, the NYSE Chicago has the MIC XCHI
, which in turn has the operating MIC of XNYS
which is the parent: The NYSE. This means that the New York Stock Exchange is the parent exchange for the Chicago branch of the NYSE.
To query trades data with the getData
API, use the agg
parameter to specify the list of columns you want back. By specifying our foreign key, and a column in the market data, we can get back the operating MIC for a given trade.
# Get all data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')
echo "start=$startTS"
echo "end=$endTS"
curl -X POST --insecure --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\":\"trade\", \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"agg\": [\"sym\",\"price\",\"code\", "markets.opCode"]}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
Inspecting the JSON results will show that the query returned codes for each trade and the related operating MIC populated as markets.opCode
.
{
"sym": "AAPL",
"price": 159.1893,
"code": "XCHI",
"markets.opCode": "XNYS"
}
Now, by taking one of the operating codes from our result, and using it as a code, we can lookup more about the parent exchange from the market data:
curl -X POST --insecure --header "Content-Type: application/json"\
--header "Accepted: application/json"\
--header "Authorization: Bearer $INSIGHTS_TOKEN"\
--data "{\"table\":\"markets\", \"filter\": [[\"=\",\"code\",\"XNYS\"]]}"\
"https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"
An example response shows the parent exchange and when it was last refreshed:
{"header":{"rcvTS":"2022-05-12T12:33:23.379000000","corr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","protocol":"gw","logCorr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","client":":10.0.2.7:5050","http":"json","api":".kxi.getData","ogRcID":"10.0.2.5:5060","retryCount":0,"to":"2022-05-12T12:33:53.379000000","agg":":10.0.5.92:5070","pvVer":2,"rc":0,"ac":0,"ai":""},"payload":[{"code":"XNYS","opCode":"XNYS","site":"WWW.NYSE.COM","updateTS":"2022-05-12T12:16:39.339100303"}]}
Shards
If setting up multiple assemblies, that each publish a reference table, and that table is not mirrored, set the isSharded
property for the table to true. This specifies that the table is intended to be sharded across multiple assemblies.
Queries against a sharded table will aggregate.
For example, let's say we have two feeds for user data, one from the US, another from EU, and we want to save this to an accounts reference table.
If shards are not enabled, a query would return either the US or the EU users, which is not what we necessarily want. In this configuration, the database assumes the data is mirrored copies.
With shards are enabled, the query will return the combined US and EU users.
spec:
tables:
accounts:
isSharded: true
For more information see routing.