Skip to content

Ingesting Data

Data is ingested into kdb Insights Enterprise using pipelines. These pipelines are responsible for fetching data from various sources and publishing it downstream to the kdb Insights Database.

Available generic FSI pipelines are:

  • Corporate Actions: Reads and ingests historical corporate actions data from an exported CSV.
  • Dividend Records: Reads and ingests historical dividend records data from an exported CSV.
  • Instrument Reference Data: Reads and ingests historical reference data for financial instruments from an exported CSV.
  • Exchange Holiday Calendar Data: Reads and ingests exchange holiday calendar data.

Feed Handlers

The FSI Accelerator provides a real-time feed handler for the following data sources:

  • ICE: Handles real-time data from ICE.

Pipeline Configuration

There are 8 main points of configuration for the FSI accelerator pipelines:

  1. Assembly Name: The name of the assembly to write to.
  2. Target Table: The name of a table in the kdb Insights database to write to.
  3. Reader: kdb Insights reader to use.
  4. File Path: Path to file to read data from (for historical pipelines).
  5. Region: Region where the file exists (when reading from cloud storage such as Amazon S3).
  6. Column Mapping: Mapping of the source file column names to target table column names.
  7. Value Mapping: Mapping of specific values from the source data to different values when written to the target table e.g. mapping 'buy' to 'B' and 'sell' to 'S' for a side column.
  8. Schema: Schema of the destination table in the kdb Insights database.

Each pipeline has an associated pipeline specification or 'spec' file where these configurations can be made. These pipeline specifications can be found in the src folder of the fsi-data-assembly package and follows the naming convention PIPELINENAME-pipeline-spec.q. Within a spec file, there is a section clearly labelled CONFIGURATION like in the example below. In general, this should be the only part of the file where changes need to be made.

//######################### CONFIGURATION #######################
//assembly name
.fsi.assemblyName:`$"fsi-data-assembly";
//target table
.fsi.targetTable:`ExchangeHolidayCal;
//file path and region
.fsi.filePath:`$":s3://kxi-rnd-dev/acceldev/exchangecalendar.csv";
.fsi.region:"us-east-2";
//reader
.fsi.reader:.qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`text;.fsi.region)];
//define transformation args
args[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
        (`srcSys;`$"srcSys");
        (`exchangeID;`$"exchangeID");
        (`date;`date);
        (`description;`description)
        );

args[`schema]:([]
    srcSys:`$();
    exchangeID:`$();
    date:`date$();
    description:()
    );
//###############################################################

Assembly and Target Table

The assembly and table to write the data to can be configured by updating .fsi.assemblyName and .fsi.targetTable.

Note

If the assembly name changes, any references to that assembly name in the pipeline's yaml file will also have to change. This likely also means the destination in the pipeline's yaml file will have to change. Changing a pipeline's destination is discussed further in Creating Custom Ingestion Pipelines.*

Reader, File Path and Region

kdb Insights Enterprise offers various methods for reading data into a pipeline. See the list of readers here.

The default reader used by pipelines in the FSI accelerator package is .qsp.read.fromAmazonS3, which reads from Amazon S3. The file path and region to use when reading from S3 can be configured with .fsi.filePath and .fsi.region. Depending on the reader used, .fsi.filePath and .fsi.region may or may not be applicable.

Column Mapping

Column mappings allow you to specify how the incoming data columns should be mapped to the corresponding fields of the target table in the kdb Insights database. The column mapping is defined as a kdb+ table with 2 columns, foreignColumnName and KXColumnName. foreignColumnName lists the columns of the source data table to be mapped, KXColumnName lists the names they will be mapped to. Each KXColumnName should match the name of a column in the schema of the target table in the kdb Insights database.

Usually mappings are 1-to-1 (a single KX column mapped to a single foreign column), however, it is also possible to map a single KXColumn to multiple foreignColumns using '+' for example:

`$"foreignCol1+foreignCol2"

In this example the values in foreignCol1 would be concatenated into a single column with the values in foreignCol2, separated by '+'.

Value Mapping

Value mappings enable you to map specific values from the source data to different values when writing it to the target table in kdb Insights Enterprise. This can be a useful feature when ingesting data from multiple sources, where there is often a need to consolidate the values used by each source into a normalized value set. The value mapping is defined as a kdb+ table with 3 columns, columnName, foreignValue and KXValue. The columnName should correspond to a column in the schema of the target table in the kdb Insights database. For each columnName, the foreignValue is mapped to the KXValue in the table. See the below example of a value mapping:

args[`valueMapping]:flip `columnName`KXValue`foreignValue!flip (
        (`eventType;"new";"NEWO");
        (`eventType;"triggered";"TRIG");
        (`eventType;"replaced";"REME");
        (`eventType;"replaced";"REMA");
        (`eventType;"replaced";"REMH");
        (`eventType;"changeOfStatus";"CHME");
        (`eventType;"changeOfStatus";"CHMO");
        (`eventType;"cancelled";"CAME");
        (`eventType;"cancelled";"CAMO");
        (`eventType;"rejected";"REMO");
        (`eventType;"expired";"EXPI");
        (`eventType;"partiallyFilled";"PARF");
        (`eventType;"filled";"FILL");
        (`price;"0";"NOAP");
        (`orderType;"limit";"LMTO");
        (`orderType;"stop";"STOP");
        (`side;"B";"BUYI");
        (`side;"S";"SELL");
        (`venueID;"NYSE";"")
        );

In this example each foreignValue is replaced by the KXValue in the associated column. For example, in the first row:

(`eventType;"new";"NEWO");

every instance of NEWO in the eventType column will be replaced by new.

By default, most of the pipelines in the FSI accelerator package do not have a value mapping defined. If you would like to add one, simply paste in a value mapping in the same format as the example above, anywhere in the CONFIGURATION section of the pipeline spec file.

Schema

Generally, the schema defined in the CONFIGURATION section is the same as the schema of the target table, however, sometimes it can differ.

Generic Pipelines

The FSI Accelerator package includes a generic pipeline for Exchange Holiday calendar data called exchangecalendar. This pipeline is used to ingest data and write it to the ExchangeHolidayCal table in the kdb Insights database. FSI accelerator APIs such as getStats use this data to exclude exchange holidays from its aggregations.

ICE Reference Data

The FSI accelerator has several pipelines to ingest ICE reference data. If these pipelines are in use an expanded Instrument schema is provided to persist the data. The pipelines are:

  • icecorereference:
  • Ingests ICE Core reference files, those with format COREREF__[YYYYMMDD].txt.gz.

  • icecrossreference:

  • Ingests ICE Cross-reference files, those with format CROSSREF__[YYYYMMDD].txt.gz.
  • This pipeline can also ingest ICE CUSIP/ISIN files, those with format CUSIP__[YYYYMMDD].txt.gz.
  • This pipeline can also ingest ICE SEDOL files, those with format SEDOL__[YYYYMMDD].txt.gz.

To target the files you want to ingest the .fsi.filePath variable in the configuration section of the pipeline spec file should be changed, as mentioned in the Pipeline Configuration section above. The pipelines could be duplicated to ingest cross, cusip and sedol files simultaneously for example.

ICE Reference field mapping

The mapping of fields in the ICE reference files to columns in the expanded Instrument schema is detailed in a file named iceReference.config.q in the FSI accelerator. This file can be updated to change the existing mapping or to add additional fields.

The steps to adjust configuration are:

  • unpack your assembly package
  • edit the src/iceReference.config.q file and update the configuration to the desired values
  • re-package your assembly package
  • push the package

Commands to unpack and re-package a package can be found here. These commands would also be used when editing pipelines.

Creating Custom Ingestion Pipelines

The easiest way to quickly add a customized ingestion pipeline to your fsi-accelerator package is using the command line. It is similar to the Pipeline Configuration process discussed earlier, once the necessary files have been created.

Let's walk through the steps:

Prerequisites

The prerequisites to creating a customer ingestion pipeline are:

  • You have kdb Insights Enterprise installed.
  • The kxi CLI installed.
  • The FSI accelerator packages fsi-lib and fsi-data-assembly downloaded.
  • Familiarity with packaging in kdb Insights Enterprise.

Unpack the package

Unpack the fsi-data-assembly package.

> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi

Add a pipeline

Add a new pipeline called mynewpipeline.

> kxi package add --to fsi-data-assembly pipeline --name mynewpipeline
Writing fsi-data-assembly/manifest.yaml
> ls fsi-data-assembly/pipelines|grep mynewpipeline*
mynewpipeline.yaml
>

Add a pipeline spec file

Every pipeline must have an associated spec file. The following pipeline spec can be used as a template.

//load data loader code
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/dl.q"];
args:()!();
//######################### CONFIGURATION #######################
//assembly name
.fsi.assemblyName:`$"fsi-data-assembly";
//target table
.fsi.targetTable:`ExchangeHolidayCal;
//file path and region
.fsi.filePath:`$":s3://kxi-rnd-dev/acceldev/exchangecalendar.csv";
.fsi.region:"us-east-2";
//reader
.fsi.reader:.qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`text;.fsi.region)];
//define transformation args
args[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
        (`srcSys;`$"srcSys");
        (`exchangeID;`$"exchangeID");
        (`date;`date);
        (`description;`description)
        );

args[`schema]:([]
    srcSys:`$();
    exchangeID:`$();
    date:`date$();
    description:()
    );
//###############################################################

// Start a pipeline that sends all incoming data through
// the transform rules
.qsp.run
    .fsi.reader
    .qsp.decode.csv[]
    .qsp.map[.dl.rule.mapSimpleColumns[args;]]
    .qsp.map[.dl.rule.appendUnmappedRequiredCols[args;]]
    .qsp.map[.dl.rule.selectRequiredCols[args;]]
    .qsp.map[.dl.rule.mapColumnValues[args;]]
    .qsp.map[.dl.rule.convertColumnTypes[args;]]
    .qsp.write.toDatabase[.fsi.targetTable;.fsi.assemblyName]

The spec file should have the naming convention PIPELINENAME-pipeline-spec.q and be placed in the src folder of the fsi-data-assembly package.

Configure the pipeline

With the asssumption that the example spec file in Add a pipeline spec file is being used, you should be able to configure your new pipeline by updating the configurable parameters outlined in Pipeline Configuration.

Update the pipeline yaml

In the pipeline folder of the fsi-data-assembly package, update the pipeline yaml mypipeline.yaml so that it is aware of the new pipeline spec file and update the destination to ensure it writes to the correct assembly. In practice, this means:

  • Updating spec: init.q to spec: PIPELINENAME-pipeline-spec.q
  • Updating destination: south to destination: fsi-data

For more details on the fields in the pipeline yaml file, see here.

Re-package

Re-package the fsi-data-assembly package.

> kxi package packit fsi-data-assembly --tag
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/fsi-data-assembly-1.0.0.kxi
fsi-data-assembly-1.0.0.kxi
>

Deploy

Finally, deploy the fsi-data-assembly package with the new changes. See How to Deploy the FSI Accelerator for these steps.

Using a cronjob to schedule a pipeline

A cronjob in kubernetes can be created using a config yaml. This includes the schedule and what commands to run, for example it could run a script that performs the required actions Creating a CronJob.

The .spec.schedule field sets the schedule for the cronjob and is required. To run the job every day at five minutes after midnight the schedule would be:

5 0 * * *

Schedule a CronJob

Any environment variables required can be passed into the cronjob pod by adding them to the cronjob config yaml. They should be referenced using the key name that they are given in whichever kubernetes secret they are defined in Kubernetes Secrets.

If using a cronjob to schedule a pipeline there is more than one approach that could be taken:

Curl

In the script run by the cronjob use apt-get to load curl into the pod running the job. Curl commands could then be used to interact with the cluster, for example a POST query to the SP coordinator to create a pipeline from an existing spec file Coordinator OpenAPI.

CLI

In the script run by the cronjob install the Insights CLI Install CLI.

The cronjob could then interact with the cluster using CLI commands, including kxi package which can be used to deploy and tear down a pipeline.

FSI Pipeline APIs

This section details the APIs from the fsi-lib package used in the FSI accelerator pipeline specs.

Note

Any APIs used and not listed here are likely core kdb Insights Enterprise APIs. The documentation can be found here.

.dl.rule.appendUnmappedRequiredCols

Appends missing required columns to the data table.

Parameters:

name type description
args dict
args.schema table Empty schema containing required columns from the data table
data table Table of data

Returns:

type description
table Returns the data table with any missing required columns appended

.dl.rule.convertColumnTypes

Converts the types of the columns in the data table based on a schema.

Parameters:

name type description
args dict
args.schema table Empty schema containing required columns from the data table
data table Table of data

Returns:

type description
table Returns the data table with column types converted to that of the schema

.dl.rule.mapColumnValues

Updates the values of each column in the data table based on a value mapping table.

Parameters:

name type description
args dict
args.valMapping table Table with 3 columns; columnName (symbol), foreignValue (string), KXValue (string). columnName should correspond to a column in the data table. For each columnName, the foreignValue will be mapped to the KXValue in the data table.
data table Table of data

Returns:

type description
table Returns the data table with its column values updated based on the value mapping

.dl.rule.mapSimpleColumns

Updates the column names of the data table based on a column mapping table.

Parameters:

name type description
args dict
args.colMapping table Table with 2 symbol columns; foreignColumnName lists the columns of the data table to be mapped, KXColumnName lists the names they will be mapped to. A KXColumn can be mapped to a string join of multiple foreignColumns using '+' e.g. foreignCol1+foreignCol2.
data table Table of data

Returns:

type description
table Returns the data table with its column names updated based on the column mapping

.dl.rule.selectRequiredCols

Selects required columns in the data table based on a schema and drops all others.

Parameters:

name type description
args dict
args.schema table Empty schema containing required columns from the data table
data table Table of data

Returns:

Type Description
table Returns the required columns from the data table