Skip to content

Pipeline Administration

This page provides the steps to customize existing pipelines, add new pipelines, or remove pipelines that are no longer needed.

In this example we address a fsi-data-assembly accelerator package.

Remove pipelines

The accelertaor packages come with a suite of pipelines. Depending on your use-case, some of these pipelines may not be required. In such cases, these pipelines can be removed from the assembly by doing the following:

Unpack fsi-data-assembly

Unpack the assembly package

> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi

Remove the pipelines

Remove unneeded pipelines from the fsi-data-assembly package using kxi package rm. In the example below, all pipelines other than trthtrades, trthquotes and refinitivrealtime are removed:

> ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi  fsi-lib-1.0.0.kxi
>
>
> kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly

==OBJECT==
Package

==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
  uuid: f70e9722-19f1-407c-8032-bc98ebb61efb
  pakxtime: '2023-07-06T15:33:42.249290'
entrypoints:
  aggregator: src/fsiassembly.agg.q
  data-access: src/fsiassembly.da.q
system:
  _pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
  fsi-data-assembly-db:
    dir: databases/fsi-data-assembly-db
    shards:
    - fsi-data-assembly-db-shard
    tables:
    - Account
    - Auction
    - Book
    - Broker
    - CanCor
    - Contract
    - RelativeContract
    - CoraxCapChange
    - CoraxDividends
    - CorpActions
    - Counterparty
    - Country
    - Currency
    - CurrencyPair
    - Depth
    - DividendRecord
    - FILastQuote
    - FIQuote
    - FITrade
    - fsi_bar_Quote_dayStats
    - fsi_bar_Quote_minStats
    - fsi_bar_Trade_dayStats
    - fsi_bar_Trade_minStats
    - IndexDetail
    - Instrument
    - Issuer
    - LastOpenClose
    - LastQuote
    - LastTrade
    - LegalEntity
    - MIC
    - OpenClose
    - Quote
    - FutQuote
    - Region
    - SalesTeam
    - SettleSystem
    - SourceSystems
    - Summary
    - Trade
    - FutTrade
    - TradeType
    - Trader
    - TriParty
    - Exchange
    - ExchangeHolidayCal
pipelines:
  trthtrades:
    file: pipelines/trthtrades.yaml
  trthquotes:
    file: pipelines/trthquotes.yaml
  trthcancor:
    file: pipelines/trthcancor.yaml
  refinstrument:
    file: pipelines/refinstrument.yaml
  refdividendrecord:
    file: pipelines/refdividendrecord.yaml
  refcorpactions:
    file: pipelines/refcorpactions.yaml
  exchangecalendar:
    file: pipelines/exchangecalendar.yaml
  refinitivrealtime:
    file: pipelines/refinitivrealtime.yaml
  cancortoday:
    file: pipelines/cancortoday.yaml
  cancoryday:
    file: pipelines/cancoryday.yaml
  cancordaybeforeyday:
    file: pipelines/cancordaybeforeyday.yaml
  cancoreachsecondtoday:
    file: pipelines/cancoreachsecondtoday.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml

>
>
> kxi package rm --from fsi-data-assembly pipeline --name refdividendrecord;kxi package rm --from fsi-data-assembly pipeline --name exchangecalendar;kxi package rm --from fsi-data-assembly pipeline --name trthcancor;kxi package rm --from fsi-data-assembly pipeline --name refinstrument;kxi package rm --from fsi-data-assembly pipeline --name refcorpactions;kxi package rm --from fsi-data-assembly pipeline --name cancortoday;kxi package rm --from fsi-data-assembly pipeline --name cancoryday;kxi package rm --from fsi-data-assembly pipeline  --name cancordaybeforeyday;kxi package rm --from fsi-data-assembly pipeline --name cancoreachsecondtoday
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
>
>
> kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly

==OBJECT==
Package

==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
  uuid: 99296d9a-a110-4331-963c-46e74aea95d0
  pakxtime: '2023-07-12T06:41:08.790448'
entrypoints:
  aggregator: src/fsiassembly.agg.q
  data-access: src/fsiassembly.da.q
system:
  _pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
  fsi-data-assembly-db:
    dir: databases/fsi-data-assembly-db
    shards:
    - fsi-data-assembly-db-shard
    tables:
    - Account
    - Auction
    - Book
    - Broker
    - CanCor
    - CanCor_quke
    - CoraxCapChange
    - CoraxDividends
    - CorpActions
    - Counterparty
    - Country
    - Currency
    - CurrencyPair
    - Depth
    - DividendRecord
    - FILastQuote
    - FIQuote
    - FITrade
    - IndexDetail
    - Instrument
    - Issuer
    - LastOpenClose
    - LastQuote
    - LastQuote_quke
    - LastTrade
    - LegalEntity
    - MIC
    - OpenClose
    - Quote
    - FutQuote
    - Quote_quke
    - Region
    - SalesTeam
    - SettleSystem
    - SourceSystems
    - Summary
    - Trade
    - Trade_quke
    - TradeType
    - Trader
    - TriParty
    - ExchangeHolidayCal
pipelines:
  trthtrades:
    file: pipelines/trthtrades.yaml
  trthquotes:
    file: pipelines/trthquotes.yaml
  refinitivrealtime:
    file: pipelines/refinitivrealtime.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml

>

Re-package

Re-package the fsi-data-assembly package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly.

> kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
>

Deploy

Finally, deploy the client-assembly package with the new changes. See How to Deploy the FSI Accelerator for these steps.

Add a new pipeline

You can see how to add a new pipeline to fsi-data-assembly in Ingesting Data.

Customize an existing pipeline

Below outlines the steps to customize a pipeline in the fsi-data-assembly package. The customized pipeline in this example is trthtrades.

Unpack fsi-data-assembly

Unpack the fsi-data-assembly package

> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly  fsi-data-assembly-1.0.0.kxi

Customize the trade schema

Customize the Trade schema to add in the following field:

Qualifier   | string

See the steps for how to customize the schema in Overlaying a Schema with Custom Columns.

Add new columns to the pipeline spec

Add new columns in the trthtrades pipeline spec trthtrades-pipeline-spec.q by updating it's column mapping and schema definition. For more details on configuring pipeline specs see Pipeline Configuration.

> cd fsi-data-assembly
> ls
databases  deployment_config  feed  manifest.yaml  patches  pipelines  router  src
>
>
> #BEFORE
> cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==25'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
    (`eventTimestamp;`$"Date-Time");
    (`instrumentID;`$"#RIC");
    (`exchTime;`$"Exch Time");
    (`price;`Price);
    (`volume;`Volume);
    (`tickDirection;`$"Tick Dir.");
    (`sequenceNumber;`$"Seq. No.");
    (`accVol;`$"Acc Volume.");
    (`executionID;`$"Unique Trade Identification");
    (`gmtOffset;`$"GMT Offset");
    (`dt;`Date)
    );
>
>
> vi src/trthtrades-pipeline-spec.q
>
> #AFTER
> cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==26'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
    (`eventTimestamp;`$"Date-Time");
    (`instrumentID;`$"#RIC");
    (`exchTime;`$"Exch Time");
    (`price;`Price);
    (`volume;`Volume);
    (`tickDirection;`$"Tick Dir.");
    (`sequenceNumber;`$"Seq. No.");
    (`accVol;`$"Acc Volume.");
    (`executionID;`$"Unique Trade Identification");
    (`gmtOffset;`$"GMT Offset");
    (`dt;`Date);
    (`qualifier;`Qualifier)
    );
>
> #BEFORE
> cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==45'
trargs[`schema]:([]
    eventTimestamp:"p"$();
    instrumentID:`$();
    exchTime:"n"$();
    price:"f"$();
    volume:"j"$();
    conditions:();
    normalisedCondition:`$();
    tickDirection:`$();
    tradedExchange:`$();
    sequenceNumber:"j"$();
    accVol:"j"$();
    tickCount:"j"$();
    executionID:();
    srcSys:`$();
    gmtOffset:"n"$();
    dt:"d"$()
    );
>
>
>
> vi src/trthtrades-pipeline-spec.q
>
> #AFTER
> cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==46'
trargs[`schema]:([]
    qualifier:();
    eventTimestamp:"p"$();
    instrumentID:`$();
    exchTime:"n"$();
    price:"f"$();
    volume:"j"$();
    conditions:();
    normalisedCondition:`$();
    tickDirection:`$();
    tradedExchange:`$();
    sequenceNumber:"j"$();
    accVol:"j"$();
    tickCount:"j"$();
    executionID:();
    srcSys:`$();
    gmtOffset:"n"$();
    dt:"d"$()
    );
>

Note

In the pipeline schema definition, new overlay columns must be added at the start.

Re-package

Re-package the fsi-data-assembly package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly.

> kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
>

Deploy

Finally, deploy the client-assembly package with the new changes. See How to Deploy the FSI Accelerator for these steps.