Pipeline Administration
This section describes the steps to customize existing pipelines, add new pipelines, or remove pipelines that are no longer needed.
In this example we will address a fsi-data-assembly
accelerator package.
Removing Pipelines
The accelertaor packages come with a suite of pipelines. Depending on your use-case, some of these pipelines may not be required. In such cases, these pipelines can be removed from the assembly by doing the following:
Unpack fsi-data-assembly
Unpack the assembly package
> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly fsi-data-assembly-1.0.0.kxi
Remove the Pipelines
Remove unneeded pipelines from the fsi-data-assembly
package using kxi package rm
. In the example below, all pipelines other than trthtrades, trthquotes and refinitivrealtime are removed:
> ls
fsi-data-assembly fsi-data-assembly-1.0.0.kxi fsi-lib-1.0.0.kxi
>
>
> kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly
==OBJECT==
Package
==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
uuid: f70e9722-19f1-407c-8032-bc98ebb61efb
pakxtime: '2023-07-06T15:33:42.249290'
entrypoints:
aggregator: src/fsiassembly.agg.q
data-access: src/fsiassembly.da.q
system:
_pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
fsi-data-assembly-db:
dir: databases/fsi-data-assembly-db
shards:
- fsi-data-assembly-db-shard
tables:
- Account
- Auction
- Book
- Broker
- CanCor
- Contract
- RelativeContract
- CoraxCapChange
- CoraxDividends
- CorpActions
- Counterparty
- Country
- Currency
- CurrencyPair
- Depth
- DividendRecord
- FILastQuote
- FIQuote
- FITrade
- fsi_bar_Quote_dayStats
- fsi_bar_Quote_minStats
- fsi_bar_Trade_dayStats
- fsi_bar_Trade_minStats
- IndexDetail
- Instrument
- Issuer
- LastOpenClose
- LastQuote
- LastTrade
- LegalEntity
- MIC
- OpenClose
- Quote
- FutQuote
- Region
- SalesTeam
- SettleSystem
- SourceSystems
- Summary
- Trade
- FutTrade
- TradeType
- Trader
- TriParty
- Exchange
- ExchangeHolidayCal
pipelines:
trthtrades:
file: pipelines/trthtrades.yaml
trthquotes:
file: pipelines/trthquotes.yaml
trthcancor:
file: pipelines/trthcancor.yaml
refinstrument:
file: pipelines/refinstrument.yaml
refdividendrecord:
file: pipelines/refdividendrecord.yaml
refcorpactions:
file: pipelines/refcorpactions.yaml
exchangecalendar:
file: pipelines/exchangecalendar.yaml
refinitivrealtime:
file: pipelines/refinitivrealtime.yaml
cancortoday:
file: pipelines/cancortoday.yaml
cancoryday:
file: pipelines/cancoryday.yaml
cancordaybeforeyday:
file: pipelines/cancordaybeforeyday.yaml
cancoreachsecondtoday:
file: pipelines/cancoreachsecondtoday.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml
>
>
> kxi package rm --from fsi-data-assembly pipeline --name refdividendrecord;kxi package rm --from fsi-data-assembly pipeline --name exchangecalendar;kxi package rm --from fsi-data-assembly pipeline --name trthcancor;kxi package rm --from fsi-data-assembly pipeline --name refinstrument;kxi package rm --from fsi-data-assembly pipeline --name refcorpactions;kxi package rm --from fsi-data-assembly pipeline --name cancortoday;kxi package rm --from fsi-data-assembly pipeline --name cancoryday;kxi package rm --from fsi-data-assembly pipeline --name cancordaybeforeyday;kxi package rm --from fsi-data-assembly pipeline --name cancoreachsecondtoday
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
Writing fsi-data-assembly/manifest.yaml
>
>
> kxi package info fsi-data-assembly
==PATH==
/tmp/artifacts/fsi-data-assembly
==OBJECT==
Package
==Manifest==
name: fsi-data-assembly
version: 1.0.0
metadata:
uuid: 99296d9a-a110-4331-963c-46e74aea95d0
pakxtime: '2023-07-12T06:41:08.790448'
entrypoints:
aggregator: src/fsiassembly.agg.q
data-access: src/fsiassembly.da.q
system:
_pakx_version: 1.6.0rc3.dev2+g5a0a4eb.d20230504
databases:
fsi-data-assembly-db:
dir: databases/fsi-data-assembly-db
shards:
- fsi-data-assembly-db-shard
tables:
- Account
- Auction
- Book
- Broker
- CanCor
- CanCor_quke
- CoraxCapChange
- CoraxDividends
- CorpActions
- Counterparty
- Country
- Currency
- CurrencyPair
- Depth
- DividendRecord
- FILastQuote
- FIQuote
- FITrade
- IndexDetail
- Instrument
- Issuer
- LastOpenClose
- LastQuote
- LastQuote_quke
- LastTrade
- LegalEntity
- MIC
- OpenClose
- Quote
- FutQuote
- Quote_quke
- Region
- SalesTeam
- SettleSystem
- SourceSystems
- Summary
- Trade
- Trade_quke
- TradeType
- Trader
- TriParty
- ExchangeHolidayCal
pipelines:
trthtrades:
file: pipelines/trthtrades.yaml
trthquotes:
file: pipelines/trthquotes.yaml
refinitivrealtime:
file: pipelines/refinitivrealtime.yaml
router: router/router.yaml
deployment_config: deployment_config/deployment_config.yaml
>
Re-package
Re-package the fsi-data-assembly
package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly
.
> kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
>
Deploy
Finally, deploy the client-assembly
package with the new changes. See How to Deploy the FSI Accelerator for these steps.
Adding a New Pipeline
You can see how to add a new pipeline to fsi-data-assembly
in Ingesting Data.
Customizing an Existing Pipeline
Below outlines the steps to customize a pipeline in the fsi-data-assembly
package. The customized pipeline in this example is trthtrades.
Unpack fsi-data-assembly
Unpack the fsi-data-assembly package
> kxi package unpack fsi-data-assembly-1.0.0.kxi
> ls
fsi-data-assembly fsi-data-assembly-1.0.0.kxi
Customize the Trade Schema
Customize the Trade schema to add in the following field:
Qualifier | string
See the steps for how to customize the schema in Overlaying a Schema with Custom Columns.
Add New Columns to the Pipeline Spec
Add new columns in the trthtrades
pipeline spec trthtrades-pipeline-spec.q
by updating it's column mapping and schema definition. For more details on configuring pipeline specs see Pipeline Configuration.
> cd fsi-data-assembly
> ls
databases deployment_config feed manifest.yaml patches pipelines router src
>
>
> #BEFORE
> cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==25'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
(`eventTimestamp;`$"Date-Time");
(`instrumentID;`$"#RIC");
(`exchTime;`$"Exch Time");
(`price;`Price);
(`volume;`Volume);
(`tickDirection;`$"Tick Dir.");
(`sequenceNumber;`$"Seq. No.");
(`accVol;`$"Acc Volume.");
(`executionID;`$"Unique Trade Identification");
(`gmtOffset;`$"GMT Offset");
(`dt;`Date)
);
>
>
> vi src/trthtrades-pipeline-spec.q
>
> #AFTER
> cat src/trthtrades-pipeline-spec.q | awk 'NR==13, NR==26'
trargs[`colMapping]:flip `KXColumnName`foreignColumnName!flip (
(`eventTimestamp;`$"Date-Time");
(`instrumentID;`$"#RIC");
(`exchTime;`$"Exch Time");
(`price;`Price);
(`volume;`Volume);
(`tickDirection;`$"Tick Dir.");
(`sequenceNumber;`$"Seq. No.");
(`accVol;`$"Acc Volume.");
(`executionID;`$"Unique Trade Identification");
(`gmtOffset;`$"GMT Offset");
(`dt;`Date);
(`qualifier;`Qualifier)
);
>
> #BEFORE
> cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==45'
trargs[`schema]:([]
eventTimestamp:"p"$();
instrumentID:`$();
exchTime:"n"$();
price:"f"$();
volume:"j"$();
conditions:();
normalisedCondition:`$();
tickDirection:`$();
tradedExchange:`$();
sequenceNumber:"j"$();
accVol:"j"$();
tickCount:"j"$();
executionID:();
srcSys:`$();
gmtOffset:"n"$();
dt:"d"$()
);
>
>
>
> vi src/trthtrades-pipeline-spec.q
>
> #AFTER
> cat src/trthtrades-pipeline-spec.q | awk 'NR==28, NR==46'
trargs[`schema]:([]
qualifier:();
eventTimestamp:"p"$();
instrumentID:`$();
exchTime:"n"$();
price:"f"$();
volume:"j"$();
conditions:();
normalisedCondition:`$();
tickDirection:`$();
tradedExchange:`$();
sequenceNumber:"j"$();
accVol:"j"$();
tickCount:"j"$();
executionID:();
srcSys:`$();
gmtOffset:"n"$();
dt:"d"$()
);
>
Note
In the pipeline schema definition, new overlay columns must be added at the start.
Re-package
Re-package the fsi-data-assembly
package. It's recommended that the package is renamed during repackaging so that it can be distinguished as a custom package. In this example it's renamed to client-assembly
.
> kxi package packit fsi-data-assembly --tag --package-name client-assembly --version 1.0.0
Refreshing package before packing...
Writing fsi-data-assembly/manifest.yaml
Creating package from fsi-data-assembly
Package created: /tmp/artifacts/client-assembly-1.0.0.kxi
client-assembly-1.0.0.kxi
>
Deploy
Finally, deploy the client-assembly
package with the new changes. See How to Deploy the FSI Accelerator for these steps.