Introduction to Refinery pipelines¶
Pipelines are an integral part of Refinery. They are made up of a set of processes that communicate with each other to ingest, process and store data. Pipelines are defined by a YAML file that describes the pipeline attributes and process attributes.
Pipelines are matched with schema, also defined by a YAML file, which describes the structure of a table that the pipeline can push data to. Pipelines are managed by the Process Manager (PM), which loads these YAML files and starts up and shuts down the pipelines as ordered by the Refinery CLI.
If a pipeline defined in the system does not have at least one schema associated with it, the PM will not start up.
Once Refinery is installed, these YAML files are stored under <DELTADATA_HOME>/refinery/system-config.
The directory structure is as follows:
delta-data/
|_refinery/
|_system-config/
|_pipelines/
|_**.yaml
|_***.yaml
|_system/
|_system.yaml
|_tables/
|_**.yaml
|_***.yaml
Sample pipeline breakdown¶
Below is a YAML definition of a sample pipeline that will be studied in detail. This is an all-purpose pipeline for data capture; further configuration can be added by using parameters described in the pipeline documentation.
pipeline:
name: DemoPipeline
type: realtime
expose-to-gw: true
taxonomy:
region: global
data-source: demo
data-class: demo
sub-class: demo
proc-layout:
-
all: all-processes
processes:
tp:
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
subscribe-from-delta-messaging: true
enable-analyst: true
rdb:
timeout: 30
enable-analyst: true
hdb:
timeout: 30
enable-analyst: true
ipdb:
write-freq: 60000
write-row-limit: 0
enable-analyst: true
epdb:
timeout: 0
Note
The indentation of YAML files is highly important in Refinery and incorrect indentation may prevent the PM from starting up. It is recommended to use a code editor such as Visual Studo Code with the YAML extension to ensure correct indentation
Pipeline definition¶
name: DemoPipeline
Each pipeline's name must be unique. This is the name that will be used to start up the pipeline using the refinery pipeline --start
type: realtime
This indicates that the pipeline is for real-time data capture, the most common type of pipeline. The other pipeline type, entrypoint, is for data query and analysis. It supports different process types.
expose-to-gw: true
This allows the data captured by the pipeline to be queried using the gateway of an entrypoint pipeline that supports the taxonomy of this pipeline.
Taxonomy¶
taxonomy:
region: global
data-source: demo
data-class: demo
sub-class: demo
The taxonomy parameter classifies the category of data this pipeline will capture. There are no set values for this parameter. There must be at least one table YAML file that contains the same taxonomy value of each pipeline in a Refinery deployment in order for the PM to start up correctly.
Process layout¶
proc-layout:
-
all: all-processes
This parameter determines which server the processes in the pipeline will run on. For single node Refinery deployments, where Refinery is installed on one server and does not interact with a Refinery install on any other server, the default is the above value all-processes.
The values of this parameter must be present in the layout parameter of the system.yaml file.
For the following proc-layout in a pipeline on a cluster deployment:
proc-layout:
-
tp: tp-server
rdb: rdb-server
hdb: hdb-server
ipdb: ipdb-server
epdb: epdb-server
The corresponding system.yaml might look like:
system:
layout:
-
name: tp-server
nodes:
-
host: aaa.host.com
-
name: rdb-server
nodes:
-
host: bbb.host.com
-
name: hdb-server
nodes:
-
host: ccc.host.com
default-cpu-taskset: 0-256
data-hierarchy:
- region
- data-source
- data-class
- sub-class
delta-messaging-server: DS_MESSAGING_SERVER:refinery_a
timezone: UTC
time-sort: false
To provide more flexibility when defining proc-layout, specify host and instances for a given *process-type* For example, with the following proc-layout we will define 2 rdb processes on the rdb-primary-server and 2 rdb processes on the rdb-secondary-server:
proc-layout:
-
tp: tp-server
rdb:
-
host: rdb-primary-server
instances: 2
-
host: rdb-secondary-server
instances: 2
hdb: hdb-server
ipdb: ipdb-server
epdb: epdb-server
Processes¶
Here the processes that make up the pipeline are defined; all out-of-the-box processes types are listed here.
processes:
tp:
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
subscribe-from-delta-messaging: true
enable-analyst: true
rdb:
timeout: 30
enable-analyst: true
hdb:
timeout: 30
enable-analyst: true
ipdb:
write-freq: 60000
write-row-limit: 0
enable-analyst: true
epdb:
timeout: 0
Ensure that the processes included in the pipeline have the correct configuration.
Note
From Refinery 5.4.0, the pdb has been split into ipdb and epdb. If you are using Refinery 5.4.0 or later, a pipeline definition must contain an ipdb and epdb or the PM will not start. More information see here.
If you are including an RTE in the pipeline, a upd function must be implemented for each table matched with the pipeline, defined within a q script with the naming convention rte.*.q. This script must reside in the /src/lib folder of a client package.
Note
Using the rte.*.q naming convention will load the functionality on all RTE processes in the system. You can avoid this by omitting rte from the script name and including the script name in the additional-q-libraries parameter of the RTE in the pipeline YAML file. The script must also reside in the lib folder of a client package
Sample RTE upd function¶
For a schema named distanceTravelled, a simple upd function would take the form:
.rte.upds.distanceTravelled:{[t;x]
t upsert x
};
Sample table for pipeline¶
Now the sample pipeline needs a schema to match with its taxonomy. Below is a sample schema YAML file that references the same taxonomy as the sample pipeline.
table:
name: sales
id-col: sym
time-col: time
intra-persist-type: splay
end-persist-type: date-partition
taxonomy:
-
region: global
data-source: demo
data-class: demo
sub-class: demo
columns:
-
name: time
data-type: timestamp
attribute: sorted
-
name: sym
data-type: symbol
attribute: grouped
-
name: price
data-type: float
-
name: volume
data-type: long
Note that the schema YAML file contains the same taxonomy as the sample pipeline. This is how the table is loaded onto the pipeline. A table can have many taxonomies, and so can be loaded onto many pipelines, but a pipeline can only have one taxonomy.
More information on creating schema is available here.
Sample table with multiple taxonomies¶
Below is another sample pipeline.
pipeline:
name: stream
type: realtime
expose-to-gw: true
taxonomy:
region: global
data-source: stream
data-class: stream
sub-class: stream
proc-layout:
- all: all-processes
processes:
tp:
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
enable-analyst: true
rdb:
timeout: 30
enable-analyst: true
hdb:
timeout: 30
enable-analyst: true
ipdb:
write-freq: 500000
write-row-limit: 0
enable-analyst: true
epdb:
enable-analyst: true
timeout: 0
To configure the sample schema to be loaded onto this new sample pipeline as well as the previous sample pipeline, add the new pipeline's taxonomy to the schema's taxonomy.
table:
name: sales
id-col: sym
time-col: time
intra-persist-type: splay
end-persist-type: date-partition
taxonomy:
-
region: global
data-source: demo
data-class: demo
sub-class: demo
-
region: global
data-source: stream
data-class: stream
sub-class: stream
columns:
-
name: time
data-type: timestamp
attribute: sorted
-
name: sym
data-type: symbol
attribute: grouped
-
name: price
data-type: float
-
name: volume
data-type: long
Now both pipelines can upsert captured data to their own version of the sample table.