Skip to content

Reference Data

Reference data is non-timeseries data that has some meaningful relation to other data.

Reference data is typically small, static, and/or slowly changing.

A typical example of reference data would be organizational data, which may include postal codes, addresses, names, dates of birth, and more. Reference data are pieces of information that you want to reference when you query or analyze other data.

To define a reference table, define the schema and pick one or more keys as primary keys. A primary key should uniquely identify the reference data.

In the corresponding time series data table, set the foreign property to the table and column name of a primary key column in the reference data.

spec:
  tables:
    trade:
      description: Trade date
      type: partitioned
      columns:
        - name: sym
          description: trade symbol
          type: symbol
          attrMem: grouped
          attrDisk: parted
          attrOrd: parted
        - name: code
          type: symbol
          foreign: markets.code
          description: Code for the market the stock was exchanged on
    markets:
       description: reference market data
       type: splayed
       primaryKeys:
        - code
       columns:
        - name: code
          type: symbol
          description: Market code
        - name: opCode
          type: string
          description: Market operating (parent) code
        - name: updateTS
          description: Timestamp of last mutation
          type: timestamp

Importing Reference Data

Importing reference data is done by publishing to the reference data table, with the results being upserted to the reference-data table. Examples here are provided using the Stream Processor.

Examples that follow show reference metadata for stock listings, where the metadata is simply the stock's full name.

First, create a .qsp pipeline that reads from a callback and writes to a steam named after the reference table

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

To update or append to reference data, pass a table to the callback

listings:getFromCSV[];
-1 string[.z.p]," publishing reference data";
pubListings listings

Scheduled updates to reference data

If reference data needs to be updated on some condition or schedule, you can invoke the callback with a timer.

For example, the name of our timer function is .pub.listings which calls the callback pubListings.

.pub.listings:{
    // Read CSV from disk, scrape it the web, etc
    listings:getCSVData[];
    -1 string[.z.p]," publishing reference data";
    pubListings listings
    };

.qsp.onStart {
    // Reload reference data every 10 seconds
    .tm.add[`listings;(`.pub.listings;::);10000;0];
    };

.qsp.run
    .qsp.read.fromCallback[`pubListings]
    .qsp.write.toStream[`listings];

Example Reference Data

This example will demonstrate how to query timeseries data, with reference data from a CSV source joined.

In this example, we have timeseries data in the table trade, and reference data in the table markets.

Both the trade data and the market data are randomly generated and saved on a schedule.

The example deploys two pipelines: - One pipeline scrapes market identifier codes data from: https://www.iso20022.org/market-identifier-codes. - Another pipeline generates random trades data and assigns a random market code for that trade.

The example YAML can be downloaded here.

To run the apply the assembly:

kubectl apply -f ref-data.yml

Using the Market Identifier Code (MIC) for a given trade, we can lookup the parent market identifier (operating MIC) in the reference data.

For example, the NYSE Chicago has the MIC XCHI, which in turn has the operating MIC of XNYS which is the parent: The NYSE. This means that the New York Stock Exchange is the parent exchange for the Chicago branch of the NYSE.

To query trades data with the getData API, use the agg parameter to specify the list of columns you want back. By specifying our foreign key, and a column in the market data, we can get back the operating MIC for a given trade.

# Get all data within the current hour
startTS=$(date -u '+%Y.%m.%dD%H:00:00')
endTS=$(date -u '+%Y.%m.%dD%H:%M%:%S')

echo "start=$startTS"
echo "end=$endTS"

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"trade\", \"startTS\": \"$startTS\", \"endTS\": \"$endTS\", \"agg\": [\"sym\",\"price\",\"code\", "markets.opCode"]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

Inspecting the JSON results will show that the query returned codes for each trade and the related operating MIC populated as markets.opCode.

    {
      "sym": "AAPL",
      "price": 159.1893,
      "code": "XCHI",
      "markets.opCode": "XNYS"
    }

Now, by taking one of the operating codes from our result, and using it as a code, we can lookup more about the parent exchange from the market data:

curl -X POST --insecure --header "Content-Type: application/json"\
    --header "Accepted: application/json"\
    --header "Authorization: Bearer $INSIGHTS_TOKEN"\
    --data "{\"table\":\"markets\", \"filter\": [[\"=\",\"code\",\"XNYS\"]]}"\
    "https://${INSIGHTS_HOSTNAME}/servicegateway/kxi/getData"

An example response shows the parent exchange and when it was last refreshed:

{"header":{"rcvTS":"2022-05-12T12:33:23.379000000","corr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","protocol":"gw","logCorr":"a372737f-4fef-4744-8b19-e6ad1d60f67b","client":":10.0.2.7:5050","http":"json","api":".kxi.getData","ogRcID":"10.0.2.5:5060","retryCount":0,"to":"2022-05-12T12:33:53.379000000","agg":":10.0.5.92:5070","pvVer":2,"rc":0,"ac":0,"ai":""},"payload":[{"code":"XNYS","opCode":"XNYS","site":"WWW.NYSE.COM","updateTS":"2022-05-12T12:16:39.339100303"}]}

Sharding

If setting up multiple assemblies, that each publish a reference table, and that table is not mirrored, set the isSharded property for the table to true. This specifies that the table is intended to be sharded across multiple assemblies.

Queries against a sharded table will aggregate.

For example, let's say we have two feeds for user data, one from the US, another from EU, and we want to save this to an accounts reference table.

If sharding is false, a query would return either the US or the EU users, which is not what we necessarily want. With sharding false, it would have expected the data to be mirroried copies.

With sharding enabled, the query will return the combined US and EU users.

spec:
  tables:
    accounts:
        isSharded: true

For more information see Purview and Routing.