Ingesting Data
Data is ingested into kdb Insights Enterprise using pipelines. These pipelines are responsible for fetching data from various sources and publishing it downstream to the kdb Insights Database.
Available generic FSI pipelines are:
- Corporate Actions: Reads and ingests historical corporate actions data from an exported CSV.
- Dividend Records: Reads and ingests historical dividend records data from an exported CSV.
- Instrument Reference Data: Reads and ingests historical reference data for financial instruments from an exported CSV.
- Exchange Holiday Calendar Data: Reads and ingests exchange holiday calendar data.
Feed Handlers
The FSI Accelerator provides a real-time feed handler for the following data sources:
- ICE: Handles real-time data from ICE.
Pipeline Configuration
There are 8 main points of configuration for the FSI accelerator pipelines:
- Assembly Name: The name of the assembly to write to.
- Target Table: The name of a table in the kdb Insights database to write to.
- Reader: kdb Insights reader to use.
- File Path: Path to file to read data from (for historical pipelines).
- Region: Region where the file exists (when reading from cloud storage such as Amazon S3).
- Column Mapping: Mapping of the source file column names to target table column names.
- Value Mapping: Mapping of specific values from the source data to different values when written to the target table e.g. mapping 'buy' to 'B' and 'sell' to 'S' for a side column.
- Schema: Schema of the destination table in the kdb Insights database.
Each pipeline has an associated pipeline specification or 'spec' file where these configurations can be made. These pipeline specifications can be found in the src
folder of the fsi-data-assembly
package and follows the naming convention PIPELINENAME-pipeline-spec.q
. Within a spec file, there is a section clearly labelled CONFIGURATION
like in the example below. In general, this should be the only part of the file where changes need to be made.
//######################### CONFIGURATION #######################
//assembly name
.fsi.assemblyName:`$"fsi-data-assembly";
//target table
.fsi.targetTable:`ExchangeHolidayCal;
//file path and region
.fsi.filePath:`$":s3://kxi-rnd-dev/acceldev/exchangecalendar.csv";
.fsi.region:"us-east-2";
//reader
.fsi.reader:.qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`text;.fsi.region)];
//define transformation args
args[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
(`srcSys;`$"srcSys");
(`exchangeID;`$"exchangeID");
(`date;`date);
(`description;`description)
);
args[`schema]:([]
srcSys:`$();
exchangeID:`$();
date:`date$();
description:()
);
//###############################################################
Assembly and Target Table
The assembly and table to write the data to can be configured by updating .fsi.assemblyName
and .fsi.targetTable
.
Note
If the assembly name changes, any references to that assembly name in the pipeline's yaml file will also have to change. This likely also means the destination in the pipeline's yaml file will have to change. Changing a pipeline's destination is discussed further in Creating Custom Ingestion Pipelines.*
Reader, File Path and Region
kdb Insights Enterprise offers various methods for reading data into a pipeline. See the list of readers here.
The default reader used by pipelines in the FSI accelerator package is .qsp.read.fromAmazonS3
, which reads from Amazon S3. The file path and region to use when reading from S3 can be configured with .fsi.filePath
and .fsi.region
. Depending on the reader used, .fsi.filePath
and .fsi.region
may or may not be applicable.
Column Mapping
Column mappings allow you to specify how the incoming data columns should be mapped to the corresponding fields of the target table in the kdb Insights database. The column mapping is defined as a kdb+ table with 2 columns, foreignColumnName
and KXColumnName
. foreignColumnName
lists the columns of the source data table to be mapped, KXColumnName
lists the names they will be mapped to. Each KXColumnName
should match the name of a column in the schema of the target table in the kdb Insights database.
Usually mappings are 1-to-1 (a single KX column mapped to a single foreign column), however, it is also possible to map a single KXColumn to multiple foreignColumns using '+' for example:
`$"foreignCol1+foreignCol2"
In this example the values in foreignCol1
would be concatenated into a single column with the values in foreignCol2
, separated by '+'.
Value Mapping
Value mappings enable you to map specific values from the source data to different values when writing it to the target table in kdb Insights Enterprise. This can be a useful feature when ingesting data from multiple sources, where there is often a need to consolidate the values used by each source into a normalized value set. The value mapping is defined as a kdb+ table with 3 columns, columnName
, foreignValue
and KXValue
. The columnName
should correspond to a column in the schema of the target table in the kdb Insights database. For each columnName
, the foreignValue
is mapped to the KXValue
in the table. See the below example of a value mapping:
args[`valueMapping]:flip `columnName`KXValue`foreignValue!flip (
(`eventType;"new";"NEWO");
(`eventType;"triggered";"TRIG");
(`eventType;"replaced";"REME");
(`eventType;"replaced";"REMA");
(`eventType;"replaced";"REMH");
(`eventType;"changeOfStatus";"CHME");
(`eventType;"changeOfStatus";"CHMO");
(`eventType;"cancelled";"CAME");
(`eventType;"cancelled";"CAMO");
(`eventType;"rejected";"REMO");
(`eventType;"expired";"EXPI");
(`eventType;"partiallyFilled";"PARF");
(`eventType;"filled";"FILL");
(`price;"0";"NOAP");
(`orderType;"limit";"LMTO");
(`orderType;"stop";"STOP");
(`side;"B";"BUYI");
(`side;"S";"SELL");
(`venueID;"NYSE";"")
);
In this example each foreignValue
is replaced by the KXValue
in the associated column. For example, in the first row:
(`eventType;"new";"NEWO");
every instance of NEWO
in the eventType
column will be replaced by new
.
By default, most of the pipelines in the FSI accelerator package do not have a value mapping defined. If you would like to add one, simply paste in a value mapping in the same format as the example above, anywhere in the CONFIGURATION
section of the pipeline spec file.
Schema
Generally, the schema defined in the CONFIGURATION
section is the same as the schema of the target table, however, sometimes it can differ. For example, in the TRTH Market Data pipelines, the gmtOffset
column exists in the pipeline spec schema so it can be applied to any timestamp columns, then it is deleted as it is not a part of the target table schema.
Generic Pipelines
The FSI Accelerator package includes a generic pipeline for Exchange Holiday calendar data called exchangecalendar
. This pipeline is used to ingest data and write it to the ExchangeHolidayCal
table in the kdb Insights database. FSI accelerator APIs such as getStats use this data to exclude exchange holidays from its aggregations.
ICE Reference Data
The FSI accelerator has several pipelines to ingest ICE reference data. If these pipelines are in use an expanded Instrument
schema is provided to persist the data. The pipelines are:
- icecorereference:
-
Ingests ICE Core reference files, those with format COREREF_
_[YYYYMMDD].txt.gz. -
icecrossreference:
- Ingests ICE Cross-reference files, those with format CROSSREF_
_[YYYYMMDD].txt.gz. - This pipeline can also ingest ICE CUSIP/ISIN files, those with format CUSIP_
_[YYYYMMDD].txt.gz. - This pipeline can also ingest ICE SEDOL files, those with format SEDOL_
_[YYYYMMDD].txt.gz.
To target the files you want to ingest the .fsi.filePath
variable in the configuration section of the pipeline spec file should be changed, as mentioned in the Pipeline Configuration section above. The pipelines could be duplicated to ingest cross, cusip and sedol files simultaneously for example.
ICE Reference field mapping
The mapping of fields in the ICE reference files to columns in the expanded Instrument
schema is detailed in a file named iceReference.config.q
in the FSI accelerator. This file can be updated to change the existing mapping or to add additional fields.
The steps to adjust configuration are:
- unpack your assembly package
- edit the
src/iceReference.config.q
file and update the configuration to the desired values - re-package your assembly package
- push the package
Commands to unpack and re-package a package can be found here. These commands would also be used when editing pipelines.
Creating Custom Ingestion Pipelines
The easiest way to quickly add a customized ingestion pipeline to your fsi-accelerator package is using the command line. It is similar to the Pipeline Configuration process discussed earlier, once the necessary files have been created.
Let's walk through the steps:
Prerequisites
The prerequisites to creating a customer ingestion pipeline are:
- You have kdb Insights Enterprise installed.
- The
kxi
CLI installed. - The FSI accelerator packages
fsi-lib
andfsi-data-assembly
downloaded. - Familiarity with packaging in kdb Insights Enterprise.
Unpack the package
Unpack the fsi-data-assembly package.
> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly fsi-data-assembly-1.0.0.kxi
Add a pipeline
Add a new pipeline called mynewpipeline
.
> kxi package add --to fsi-data-assembly pipeline --name mynewpipeline
Writing fsi-data-assembly/manifest.yaml
> ls fsi-data-assembly/pipelines|grep mynewpipeline*
mynewpipeline.yaml
>
Add a pipeline spec file
Every pipeline must have an associated spec file. The following pipeline spec can be used as a template.
//load data loader code
@[.kxi.packages.load;"fsi-lib";{x}];
.kxi.packages.file.load["src/dl.q"];
args:()!();
//######################### CONFIGURATION #######################
//assembly name
.fsi.assemblyName:`$"fsi-data-assembly";
//target table
.fsi.targetTable:`ExchangeHolidayCal;
//file path and region
.fsi.filePath:`$":s3://kxi-rnd-dev/acceldev/exchangecalendar.csv";
.fsi.region:"us-east-2";
//reader
.fsi.reader:.qsp.read.fromAmazonS3[.fsi.filePath;.qsp.use `mode`region!(`text;.fsi.region)];
//define transformation args
args[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
(`srcSys;`$"srcSys");
(`exchangeID;`$"exchangeID");
(`date;`date);
(`description;`description)
);
args[`schema]:([]
srcSys:`$();
exchangeID:`$();
date:`date$();
description:()
);
//###############################################################
// Start a pipeline that sends all incoming data through
// the transform rules
.qsp.run
.fsi.reader
.qsp.decode.csv[]
.qsp.map[.dl.rule.mapSimpleColumns[args;]]
.qsp.map[.dl.rule.appendUnmappedRequiredCols[args;]]
.qsp.map[.dl.rule.selectRequiredCols[args;]]
.qsp.map[.dl.rule.mapColumnValues[args;]]
.qsp.map[.dl.rule.convertColumnTypes[args;]]
.qsp.write.toDatabase[.fsi.targetTable;.fsi.assemblyName]
The spec file should have the naming convention PIPELINENAME-pipeline-spec.q
and be placed in the src
folder of the fsi-data-assembly
package.
Configure the pipeline
With the asssumption that the example spec file in Add a pipeline spec file is being used, you should be able to configure your new pipeline by updating the configurable parameters outlined in Pipeline Configuration.
Update the pipeline yaml
In the pipeline folder of the fsi-data-assembly
package, update the pipeline yaml mypipeline.yaml
so that it is aware of the new pipeline spec file and update the destination to ensure it writes to the correct assembly. In practice, this means:
- Updating
spec: init.q
tospec: PIPELINENAME-pipeline-spec.q
- Updating
destination: south
todestination: fsi-data
For more details on the fields in the pipeline yaml file, see here.
Re-package
Re-package the fsi-data-assembly
package.
> kxi package packit fsi-data-assembly --tag
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/fsi-data-assembly-1.0.0.kxi
fsi-data-assembly-1.0.0.kxi
>
Deploy
Finally, deploy the fsi-data-assembly
package with the new changes. See How to Deploy the FSI Accelerator for these steps.
Using a cronjob to schedule a pipeline
A cronjob in kubernetes can be created using a config yaml. This includes the schedule and what commands to run, for example it could run a script that performs the required actions Creating a CronJob.
The .spec.schedule
field sets the schedule for the cronjob and is required. To run the job every day at five minutes after midnight the schedule would be
5 0 * * *
Any environment variables required can be passed into the cronjob pod by adding them to the cronjob config yaml. They should be referenced using the key name that they are given in whichever kubernetes secret they are defined in Kubernetes Secrets.
If using a cronjob to schedule a pipeline there is more than one approach that could be taken:
Curl
In the script run by the cronjob use apt-get
to load curl into the pod running the job. Curl commands could then be used to interact with the cluster, for example a POST
query to the SP coordinator to create a pipeline from an existing spec file Coordinator OpenAPI.
CLI
In the script run by the cronjob install the Insights CLI Install CLI.
The cronjob could then interact with the cluster using CLI commands, including kxi package
which can be used to deploy and tear down a pipeline.
FSI Pipeline APIs
This section details the APIs from the fsi-lib
package used in the FSI accelerator pipeline specs.
Note
Any APIs used and not listed here are likely core kdb Insights Enterprise APIs. The documentation can be found here.
.dl.rule.appendUnmappedRequiredCols
Appends missing required columns to the data table.
Parameters:
name | type | description |
---|---|---|
args | dict | |
args.schema | table | Empty schema containing required columns from the data table |
data | table | Table of data |
Returns:
type | description |
---|---|
table | Returns the data table with any missing required columns appended |
.dl.rule.convertColumnTypes
Converts the types of the columns in the data table based on a schema.
Parameters:
name | type | description |
---|---|---|
args | dict | |
args.schema | table | Empty schema containing required columns from the data table |
data | table | Table of data |
Returns:
type | description |
---|---|
table | Returns the data table with column types converted to that of the schema |
.dl.rule.mapColumnValues
Updates the values of each column in the data table based on a value mapping table.
Parameters:
name | type | description |
---|---|---|
args | dict | |
args.valMapping | table | Table with 3 columns; columnName (symbol), foreignValue (string), KXValue (string). columnName should correspond to a column in the data table. For each columnName, the foreignValue will be mapped to the KXValue in the data table. |
data | table | Table of data |
Returns:
type | description |
---|---|
table | Returns the data table with its column values updated based on the value mapping |
.dl.rule.mapSimpleColumns
Updates the column names of the data table based on a column mapping table.
Parameters:
name | type | description |
---|---|---|
args | dict | |
args.colMapping | table | Table with 2 symbol columns; foreignColumnName lists the columns of the data table to be mapped, KXColumnName lists the names they will be mapped to. A KXColumn can be mapped to a string join of multiple foreignColumns using '+' e.g. foreignCol1+foreignCol2. |
data | table | Table of data |
Returns:
type | description |
---|---|
table | Returns the data table with its column names updated based on the column mapping |
.dl.rule.selectRequiredCols
Selects required columns in the data table based on a schema and drops all others.
Parameters:
name | type | description |
---|---|---|
args | dict | |
args.schema | table | Empty schema containing required columns from the data table |
data | table | Table of data |
Returns:
Type | Description |
---|---|
table | Returns the required columns from the data table |