Skip to content

Pipeline Administration

This section describes the steps to customize existing pipelines, add new pipelines, or remove pipelines that are no longer needed.

In this example we will address a fsi-data-assembly accelerator package.

Removing Pipelines

The accelertaor packages come with a suite of pipelines. Depending on your use-case, some of these pipelines may not be required. In such cases, these pipelines can be removed from the assembly by doing the following:

Unpack fsi-data-assembly

Unpack the assembly package

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package unpack fsi-data-assembly-1.0.0.kxi
user@A-LPTP-5VuBgQiX:/tmp/artifacts$ ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi

Remove the Pipelines

Remove unneeded pipelines from the fsi-data-assembly package using kxi package rm. In the example below, all pipelines other than trthtrades, trthquotes and refinitivrealtime are removed:

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi  fsi-lib-1.0.0.kxi
user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly

==OBJECT==
Package

==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
  uuid: f70e9722-19f1-407c-8032-bc98ebb61efb
  pakxtime: '2023-07-06T15:33:42.249290'
entrypoints:
  aggregator: src/fsiassembly.agg.q
  data-access: src/fsiassembly.da.q
system:
  _pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
  fsi-data-assembly-db:
    dir: databases/fsi-data-assembly-db
    shards:
    - fsi-data-assembly-db-shard
    tables:
    - Account
    - Auction
    - Book
    - Broker
    - CanCor
    - Contract
    - RelativeContract
    - CoraxCapChange
    - CoraxDividends
    - CorpActions
    - Counterparty
    - Country
    - Currency
    - CurrencyPair
    - Depth
    - DividendRecord
    - FILastQuote
    - FIQuote
    - FITrade
    - fsi_bar_Quote_dayStats
    - fsi_bar_Quote_minStats
    - fsi_bar_Trade_dayStats
    - fsi_bar_Trade_minStats
    - IndexDetail
    - Instrument
    - Issuer
    - LastOpenClose
    - LastQuote
    - LastTrade
    - LegalEntity
    - MIC
    - OpenClose
    - Quote
    - FutQuote
    - Region
    - SalesTeam
    - SettleSystem
    - SourceSystems
    - Summary
    - Trade
    - FutTrade
    - TradeType
    - Trader
    - TriParty
    - Exchange
    - ExchangeHolidayCal
pipelines:
  trthtrades:
    file: pipelines/trthtrades.yaml
  trthquotes:
    file: pipelines/trthquotes.yaml
  trthcancor:
    file: pipelines/trthcancor.yaml
  refinstrument:
    file: pipelines/refinstrument.yaml
  refdividendrecord:
    file: pipelines/refdividendrecord.yaml
  refcorpactions:
    file: pipelines/refcorpactions.yaml
  exchangecalendar:
    file: pipelines/exchangecalendar.yaml
  refinitivrealtime:
    file: pipelines/refinitivrealtime.yaml
  cancortoday:
    file: pipelines/cancortoday.yaml
  cancoryday:
    file: pipelines/cancoryday.yaml
  cancordaybeforeyday:
    file: pipelines/cancordaybeforeyday.yaml
  cancoreachsecondtoday:
    file: pipelines/cancoreachsecondtoday.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml

user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package rm --from fsi-data-assembly pipeline --name refdividendrecord;kxi package rm --from fsi-data-assembly pipeline --name exchangecalendar;kxi package rm --from fsi-data-assembly pipeline --name trthcancor;kxi package rm --from fsi-data-assembly pipeline --name refinstrument;kxi package rm --from fsi-data-assembly pipeline --name refcorpactions;kxi package rm --from fsi-data-assembly pipeline --name cancortoday;kxi package rm --from fsi-data-assembly pipeline --name cancoryday;kxi package rm --from fsi-data-assembly pipeline  --name cancordaybeforeyday;kxi package rm --from fsi-data-assembly pipeline --name cancoreachsecondtoday
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$
user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly

==OBJECT==
Package

==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
  uuid: 99296d9a-a110-4331-963c-46e74aea95d0
  pakxtime: '2023-07-12T06:41:08.790448'
entrypoints:
  aggregator: src/fsiassembly.agg.q
  data-access: src/fsiassembly.da.q
system:
  _pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
  fsi-data-assembly-db:
    dir: databases/fsi-data-assembly-db
    shards:
    - fsi-data-assembly-db-shard
    tables:
    - Account
    - Auction
    - Book
    - Broker
    - CanCor
    - CanCor_quke
    - CoraxCapChange
    - CoraxDividends
    - CorpActions
    - Counterparty
    - Country
    - Currency
    - CurrencyPair
    - Depth
    - DividendRecord
    - FILastQuote
    - FIQuote
    - FITrade
    - IndexDetail
    - Instrument
    - Issuer
    - LastOpenClose
    - LastQuote
    - LastQuote_quke
    - LastTrade
    - LegalEntity
    - MIC
    - OpenClose
    - Quote
    - FutQuote
    - Quote_quke
    - Region
    - SalesTeam
    - SettleSystem
    - SourceSystems
    - Summary
    - Trade
    - Trade_quke
    - TradeType
    - Trader
    - TriParty
    - ExchangeHolidayCal
pipelines:
  trthtrades:
    file: pipelines/trthtrades.yaml
  trthquotes:
    file: pipelines/trthquotes.yaml
  refinitivrealtime:
    file: pipelines/refinitivrealtime.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml

user@A-LPTP-5VuBgQiX:/tmp/artifacts$

Re-package

Re-package the fsi-data-assembly package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly.

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
user@A-LPTP-5VuBgQiX:/tmp/artifacts$

Deploy

Finally, deploy the client-assembly package with the new changes. See How to Deploy the FSI Accelerator for these steps.

Adding a New Pipeline

You can see how to add a new pipeline to fsi-data-assembly in Ingesting Data.

Customizing an Existing Pipeline

Below outlines the steps to customize a pipeline in the fsi-data-assembly package. The customized pipeline in this example is trthtrades.

Unpack fsi-data-assembly

Unpack the fsi-data-assembly package

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package unpack fsi-data-assembly-1.0.0.kxi
user@A-LPTP-5VuBgQiX:/tmp/artifacts$ ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi

Customize the Trade Schema

Customize the Trade schema to add in the following field:

Qualifier   | string

See the steps for how to customize the schema in Overlaying a Schema with Custom Columns.

Add New Columns to the Pipeline Spec

Add new columns in the trthtrades pipeline spec trthtrades-pipeline-spec.q by updating it's column mapping and schema definition. For more details on configuring pipeline specs see Pipeline Configuration.

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ cd fsi-data-assembly
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ ls
databases  deployment_config  feed  manifest.yaml  patches  pipelines  router  src
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ #BEFORE
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==25'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
    (`eventTimestamp;`$"Date-Time");
    (`instrumentID;`$"#RIC");
    (`exchTime;`$"Exch Time");
    (`price;`Price);
    (`volume;`Volume);
    (`tickDirection;`$"Tick Dir.");
    (`sequenceNumber;`$"Seq. No.");
    (`accVol;`$"Acc Volume.");
    (`executionID;`$"Unique Trade Identification");
    (`gmtOffset;`$"GMT Offset");
    (`dt;`Date)
    );
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ vi src/trthtrades-pipeline-spec.q
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ #AFTER
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==26'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
    (`eventTimestamp;`$"Date-Time");
    (`instrumentID;`$"#RIC");
    (`exchTime;`$"Exch Time");
    (`price;`Price);
    (`volume;`Volume);
    (`tickDirection;`$"Tick Dir.");
    (`sequenceNumber;`$"Seq. No.");
    (`accVol;`$"Acc Volume.");
    (`executionID;`$"Unique Trade Identification");
    (`gmtOffset;`$"GMT Offset");
    (`dt;`Date);
    (`qualifier;`Qualifier)
    );
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ #BEFORE
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==45'
trargs[`schema]:([]
    eventTimestamp:"p"$();
    instrumentID:`$();
    exchTime:"n"$();
    price:"f"$();
    volume:"j"$();
    conditions:();
    normalisedCondition:`$();
    tickDirection:`$();
    tradedExchange:`$();
    sequenceNumber:"j"$();
    accVol:"j"$();
    tickCount:"j"$();
    executionID:();
    srcSys:`$();
    gmtOffset:"n"$();
    dt:"d"$()
    );
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ vi src/trthtrades-pipeline-spec.q
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ #AFTER
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$ cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==46'
trargs[`schema]:([]
    qualifier:();
    eventTimestamp:"p"$();
    instrumentID:`$();
    exchTime:"n"$();
    price:"f"$();
    volume:"j"$();
    conditions:();
    normalisedCondition:`$();
    tickDirection:`$();
    tradedExchange:`$();
    sequenceNumber:"j"$();
    accVol:"j"$();
    tickCount:"j"$();
    executionID:();
    srcSys:`$();
    gmtOffset:"n"$();
    dt:"d"$()
    );
user@A-LPTP-5VuBgQiX:/tmp/artifacts/fsi-data-assembly$

Note

In the pipeline schema definition, new overlay columns must be added at the start.

Re-package

Re-package the fsi-data-assembly package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly.

user@A-LPTP-5VuBgQiX:/tmp/artifacts$ kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
user@A-LPTP-5VuBgQiX:/tmp/artifacts$

Deploy

Finally, deploy the client-assembly package with the new changes. See How to Deploy the FSI Accelerator for these steps.