Creating pipelines¶
When modifying the YAML files for Refinery, it is highly recommended to use Visual Studio Code with the YAML extension and configure it to reference the Refinery YAML schemas, which can be found as part of the Refinery package that you installed at [DELTA_HOME]/kxinstall/refinery/packages/DaaSCore_*/yaml-schemas.
System¶
This configuration defines system-level properties of the application. A single YAML file is supported and should be stored at $DELTADATA_HOME/refinery/system-config/system/system.yaml.
Supported parameters:
| Parameter name | Description |
|---|---|
layout |
Array of tag names and associated servers for each |
data-hierarchy |
Array of routing parameters to unique identify pipelines within the system |
delta-messaging-server |
The configuration parameter for the Delta Messaging Server to use |
time-sort |
If true all table results will be sorted by their primary time column |
timezone |
The default time zone that all data is stored in |
default-cpu-taskset |
A default setting for the CPU cores that all Refinery processes should run on |
Pipeline¶
This configuration defines a single data capture stack of processes. Each YAML file should contain a single pipeline and be stored within $DELTADATA_HOME/refinery/system-config/pipelines.
Info
The link between the pipelines and schemas is defined by the taxonomy parameter
Supported parameters:
| Parameter name | Description | Supported values |
|---|---|---|
name |
Unique name for the pipeline | |
type |
The type of pipeline | realtimeentrypoint |
environment |
Environment variable to parse procLayout / processes (if defined) | (see below) |
proc-layout |
The layout of the pipeline based on the system server layout | (see below) |
enable-auto-data-copy |
If automatic data copy between pipeline instances should be enabled | truefalse |
custom-data-copy |
Additional configuration for data copy | (see below) |
enable-monitoring |
Enable Refinery Monitoring within this pipeline | truefalse |
taxonomy |
The taxonomy of this pipeline based on the system data hierarchy | |
startup-timeout |
How long to wait for the pipeline to start up before timing out (seconds) | |
additional-q-libraries |
A list of additional q libraries to load into all processes on boot | |
processes |
The processes that should be included in the pipeline (type-specific) |
To run multiple pipeline instances (for high availability), multiple entries in the proc-layout object must be defined.
Environment¶
Note
This is an optional pipeline parameter and is only necessary if maintaining multiple environments.See Process layout by environment and Process configuration by environment.
To utilise the environment parameter within the procLayout and/or processes, you must first define an environment variable on the system running Refinery, which can be done in one of two ways:
- Add the environment variable to
$DELTABIN_HOME/delta.profile(Recommended)
OR
- Export the environment variable via the command-line
export KX_REFINERY_ENV=prod
For either approach, after defining the environment variable you will need to restart Delta Control and Refinery processes to detect the environment variable. See Quick Start Guide for restarting processes.
Process layout¶
proc-layout defines how the processes within a pipeline are physically laid out on the servers available. Servers are "tagged" in the system configuration, and these tags are used in this element for layout.
Each element of the array is an object that defines an instance of the pipeline. Running multiple instances allows for redundancy in the data capture.
The supported keys of the object are:
all: All processes in the pipeline*process-type*: All processes of the specified type in the pipeline*process-type*.*process-instance*: The specified instance of the process type in the pipeline
within each object, the additional supported keys are:
host: Process host for*process-type*instances: Number of processes for*process-type*
Note
host and instances must be supplied together when defined in proc-layout for each *process-type*
all example¶
proc-layout:
-
all: all-processes
*process-type* example¶
proc-layout:
-
tp: tp-server
rdb: rdb-server
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
*process-type*.*process-instance* example¶
proc-layout:
-
tp: tp-server
rdb.0: rdb-server-1
rdb.1: rdb-server-2
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
Process layout by environment¶
Note
This is an optional additional level of defining process layouts based on an environment variable.
proc-layout:
-
all-environments:
all: default-server
dev:
tp: tp-server
rdb: rdb-server
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
prod:
tp: tp-server
rdb:
-
host: rdb-server
instances: 2
-
host: rdb-server2
instances: 2
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
In this example, there will be a total of 1 rdb instances for dev on rdb-server. There will be 4 rdb instances for prod, 2 on rdb-server and 2 on rdb-server2. This example assigns all of the processes to the same cluster-number of 0.
See here for more information on Creating environment-based pipelines
Warning
It is not possible to mix & match the default proc-layout with environment based tags; i.e., within a given pipeline.yaml only one layout format is supported. For example, the following is an invalid template.
proc-layout:
-
all: default-server
tp: tp-server
rdb: rdb-server
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
-
dev:
tp: tp-server
rdb: rdb-server
hdb: disk-processes-server
ipdb: disk-processes-server
epdb: disk-processes-server
Pipeline naming structure¶
Within any pipeline naming structure, you will always have 4 sections:
- Pipeline name
- Cluster number
- Process type
- Process instance
These are combined to make the unique pipeline process names. Using the default entrypoint gateway as an example, DefaultEntrypoint.0.gw.0. It is visible to see the pipeline structure here with pipeline-name being DefaultEntrypoint, cluster-number being 0, process-type being gw and process-instance being 0.
This follows through when using a pipeline in a hot-hot system where you are looking for fault tolerance. If you assign multiple hosts/servers to your pipeline's proc-layout, as seen in this example, then you'll see a pipeline process name of pipeline-name.0.process-type.process-instance and pipeline-name.1.process-type.process-instance, as there is two different servers in this example.
Real-time pipeline configuration¶
If type is realtime, these additional parameters are supported:
| Parameter name | Description | Supported values |
|---|---|---|
journal-replay-on-restart |
Should all TP clients replay the journal if they restart intra-day | truefalse |
expose-to-gw |
Should the data processes in this pipeline be available for GW query | truefalse |
routing |
Additional routing configuration for GW | |
allow-non-timeseries-data |
Configuration to allow non-time series data to be loaded into the system | truefalse |
timezone |
Assigns individual pipelines to different time zones | Supported time zones |
gw-rerouting-procs |
Re-routes gateway queries to another available instance if a process instance fails | truefalse |
rollover-exclude-today |
Enables the inability to rollover today's data to the HDB | truefalse |
The following processes are supported in a real-time pipeline (additional custom processes can be defined as required):
tp: TickerPlant (required)ipdb: Intraday Persisting Databaseepdb: End of day Persisting Databaseidb: Intraday Databasehdb: Historical Databaserdb: Real-time Databasectp: Chained TickerPlantrte: Real-Time Processing Engineldidb: Late Data Intraday Database
Re-routing gateway queries¶
The gw-rerouting-procs configuration allows rerouting of gateway queries if one of your database process instances fails. This occurs when the RDB, HDB, or RTE instances (0, 1, or 2) are distributed across three different servers, and one server becomes unavailable. In this case, the queries are rerouted to the remaining available instances.
Exclude today end-of-day rollover¶
The rollover-exclude-today configuration allows the pipeline to disable the end-of-day (EOD) rollover of today's data getting written to the HDB. This means that at EOD (00:00:00) any data written to the RDB after midnight remains in the RDB and is not transferred to the HDB. As a result, .z.D-1 represents the 'current' day, and any data older than this is considered late data.
Entrypoint pipeline configuration¶
If type is entrypoint, these additional parameters are supported:
| Parameter name | Description | Supported values |
|---|---|---|
enable-legacy-routing |
If true, use the legacy-routes state configuration file |
truefalse |
expose-to-qr |
If true, allow routing via the QR / QP mechanism | truefalse |
The following processes are supported in an entrypoint pipeline:
gw: Refinery Gatewayudfp: UDF processor
gw¶
| Parameter name | Description | Supported values |
|---|---|---|
multi-pipe-route |
If true, queries to the gateway can target multiple pipelines with the same schema. Allows further data sharding. | truefalse |
Process configuration¶
The following process types have no additional configuration other than the common configuration described below:
hdbgwudfp
Common¶
ALL processes have a common set of properties that can be set:
| Parameter name | Description | Supported values |
|---|---|---|
console-size |
Console size (\c) setting |
|
tls-server-mode |
Sets the TLS mode for the process | 0 = plain1 = mixed2 = TLS only |
garbage-collection |
Sets the garbage collection mode | 0 = deferred1 = immediate |
timeout |
Timeout in seconds for client queries (\T) |
|
secondary-threads |
Number of secondary threads to run the process with (-s) |
|
cpu-taskset |
The CPU cores to allow this process to run on | |
port |
The port to listen on. A random port is selected if not specified | |
instances |
The number of instances of this process type to run | |
enable-analyst |
Allow analyst connections to this process type | truefalse |
additional-q-libraries |
A list of additional q libraries to load into this process type on boot |
TP consumers¶
The following process types have no additional configuration other than the TP Consumer configuration defined here:
rdbrte
| Parameter name | Description | Supported values |
|---|---|---|
receive-mode |
Defines how the data is received from the upstream Tickerplants | allsharded |
If receive-mode is sharded:
| Parameter name | Description | Supported values |
|---|---|---|
shard-type |
The shard type to use (custom must be added to 'shard' library) | md5firstLetter |
shard-col |
Column used as the argument to the sharding function |
tp¶
| Parameter Name | Description | Supported Values |
|---|---|---|
pub-mode |
The downstream publication method | directtimertimer-with-row-batch |
log-to-journal |
Should the TP log every update to a local file | truefalse |
rollover-mode |
How the pipeline rollover should be initiated | passthroughdaily-at-time |
subscribe-from-delta-messaging |
Allow updates to be received via DMS. Topic name will be the process name | truefalse |
If pub-mode is timer:
| Parameter name | Description | Supported values |
|---|---|---|
pub-freq-ms |
Frequency in milliseconds to publish batches |
If pub-mode is timer-with-row-batch:
| Parameter name | Description | Supported values |
|---|---|---|
pub-freq-ms |
Frequency in milliseconds to publish batches | |
pub-row-limit |
Row limit that would trigger a batch to be published |
If rollover-mode is daily-at-time:
| Parameter name | Description | Supported values |
|---|---|---|
rollover-time |
The time of day to initiate a rollover |
ipdb¶
The IPDB uses all the "TP Consumers" configuration above as well as:
| Parameter name | Description | Supported values |
|---|---|---|
write-freq |
The frequency in milliseconds to write to disk | |
write-row-limit |
The number of rows once received trigger a write to disk | 0 = disabled |
ctp¶
| Parameter name | Description | Supported values |
|---|---|---|
slow-consumer-disconnect |
Should consumers be disconnected if they cannot process updates quickly enough | truefalse |
slow-consumer-disconnect-bytes |
The number of pending bytes outstanding that causes a disconnect | |
publish-to-delta-messaging |
Allow updates to be sent via DMS. Topic name will be the process name | truefalse |
Process configuration by environment¶
Note
This is an optional additional level of defining process setting overrides based on an environment variable
processes:
all-environments:
tp:
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
enable-analyst: true
subscribe-from-delta-messaging: true
rdb:
timeout: 30
enable-analyst: true
instances: 2
hdb:
timeout: 30
enable-analyst: true
ipdb:
write-freq: 30000
write-row-limit: 0
epdb:
timeout: 0
dev:
ipdb:
write-freq: 20000
prod:
ipdb:
write-freq: 60000
More information on Setting environment based process settings
Schema¶
This configuration defines a kdb+ table within the system, which pipelines to be available within and how the table should be persisted to disk. Each YAML file should contain a single table and be stored within $DELTADATA_HOME/refinery/system-config/tables.
Supported parameters:
| Parameter name | Description | Supported values |
|---|---|---|
name |
The name of the table within the system (API parameter dataType) |
|
columns |
Array of columns | (see below) |
id-col |
The primary identifier column of the table (API parameter idList and parted column for persistence) |
|
time-col |
The primary time column of the table (sorted on query and persistence) | |
taxonomy |
Array of taxonomy's used to determine which pipelines will have this table available | |
intra-persist-type |
The intra-day persistence type. For most tables, we recommend using splay |
nonesplayparted |
end-persist-type |
The persistence type into the HDB | date-partitionmonth-partitionyear-partitionroot-splay |
end-of-day-sort |
Optional sorting of intra-day data at EOD. Defaults to true if not specified |
true false |
is-reference-data |
Defines that this table contains reference data. Defaults to false if not specified |
true false |
End persist type¶
Date partitions vs year/month partitions¶
Date partitions should be your default partitioning method. It will store your data in partition by date (e.g. 2023.01.01).
This is best used for a large number of data points.
Month partitions will store your data in partitions by month (e.g. 2023.01). Year partitions will store your data in partitions by year (e.g. 2023). This must only be used with a low number of data points; otherwise query times will be affected.
The only configuration required to apply date-partition is adding it as the end-persist-type parameter.
Note
Within a single pipeline, your tables must be of the same partition type, date, month or year.
Columns¶
Each element of the columns array defines a column in the table. Supported parameters are:
| Parameter name | Description | Supported values |
|---|---|---|
name |
The name of the column | |
data-type |
The type of the column. For complex typed columns do not specify | booleanguidbyteshortintlongrealfloatcharsymboltimestampmonthdatedatetimetimespanminutesecondtime |
attribute |
The attribute to apply to column | sortedgroupeduniqueparted |
exchangeColumn |
Optional definition that a column is an exchange column, default is false |
true false |
Exchange column¶
Exchange columns are timestamp only columns that DO NOT get converted due to time zone conversions as they are assumed to have a time zone associated with the local exchange. These columns are for specific timestamp values typically being ingested from stock exchanges.
Is-reference-data¶
This parameter instructs the HDB to treat the table as part of an HDB-only pipeline, where no process boundary exists.
You can pair is-reference-data tables with the non-timeseries parameter to enable tables without a time column to function within Refinery.
These tables cannot be queried directly using getTicks or getStats. However, you can reference them through a kdb foreign key link.
Runtime¶
The Process Manager will look for all YAML configuration within $DELTADATA_HOME/refinery/system-config:
system: System-wide configurationpipelines: Pipeline definitiontables: Table schema and persistence definition
Currently, any changes to any of the YAML configuration files within this folder requires the Process Manager to be restarted.
Examples¶
System example¶
system:
layout:
-
name: all-processes
nodes:
-
host: localhost
default-cpu-taskset: 0-256
data-hierarchy:
- region
- data-source
- data-class
- sub-class
delta-messaging-server: DS_MESSAGING_SERVER:refinery_a
timezone: UTC
time-sort: false
Note
When defining the host for each process in system.yaml, the hostname must match the domain name referenced in the <DELTACONTROL_HOST> field of delta.profile in order for Refinery processes to boot successfully.
This name can be either the fully qualified domain name or shortened hostname of an environment, as long as there is consistency across the system.
The name must also be present under /etc/hosts within the environment.
Pipeline example¶
This example pipeline defines:
- A real-time data capture pipeline that can be queried via the GW
- Available via
demodata source
- Available via
- A tickerplant
- Runs on a static port 32121
- Publishes downstream every 100ms
- Triggers rollover at 00:00:00.001 every day
- Logs to a journal file
- A real-time database
- Runs on a static port 32122
- Query timeout of 30 seconds
- An historical database
- Query timeout of 30 seconds
- A persisting database
- Writes to disk in 1 minute intervals
- Does not write based on rows received from upstream
pipeline:
name: DemoPipeline
type: realtime
expose-to-gw: true
taxonomy:
region: global
data-source: demo
proc-layout:
-
all: all-processes
processes:
tp:
port: 32121
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
rdb:
port: 32122
timeout: 30
hdb:
timeout: 30
ipdb:
write-freq: 60000
write-row-limit: 0
epdb:
timeout: 30
idb:
timeout: 300
Schema example¶
This example schema defines:
- A kdb+ table
- Will be associated with any pipeline with a taxonomy of
globalregion anddemodata source - The primary ID column in
sym- The IPDB will apply the parted attribute to this column
- The primary time column is
time- The IPDB will sort by this column (within the primary ID column)
- The IPDB will store this in a splay intraday and EPDB date partition at end of day
The equivalent schema in kdb+ form:
update time:`s#time, sym:`g#sym from flip `time`sym`price`volume!"PSFJ"$\:()
table:
name: DemoTable
id-col: sym
time-col: time
intra-persist-type: splay
end-persist-type: date-partition
taxonomy:
-
region: global
data-source: demo
columns:
-
name: time
data-type: timestamp
attribute: sorted
-
name: sym
data-type: symbol
attribute: grouped
-
name: price
data-type: float
-
name: volume
data-type: long