Introduction to Refinery pipelines

Pipelines are an integral part of Refinery. They are made up of a set of processes that communicate with each other to ingest, process and store data. Pipelines are defined by a YAML file that describes the pipeline attributes and process attributes.

Pipelines are matched with schema, also defined by a YAML file, which describes the structure of a table that the pipeline can push data to. Pipelines are managed by the Process Manager (PM), which loads these YAML files and starts up and shuts down the pipelines as ordered by the Refinery CLI.

If a pipeline defined in the system does not have at least one schema associated with it, the PM will not start up.

Once Refinery is installed, these YAML files are stored under <DELTADATA_HOME>/refinery/system-config.

The directory structure is as follows:

delta-data/
|_refinery/
   |_system-config/
     |_pipelines/
       |_**.yaml
       |_***.yaml
     |_system/
       |_system.yaml 
     |_tables/
       |_**.yaml
       |_***.yaml

Sample pipeline breakdown

Below is a YAML definition of a sample pipeline that will be studied in detail. This is an all-purpose pipeline for data capture; further configuration can be added by using parameters described in the pipeline documentation.

pipeline:
  name: DemoPipeline
  type: realtime

  expose-to-gw: true

  taxonomy:
    region: global
    data-source: demo
    data-class: demo
    sub-class: demo

  proc-layout:
    -
      all: all-processes

  processes:
    tp:
      pub-mode: timer
      pub-freq-ms: 100
      log-to-journal: true
      rollover-mode: daily-at-time
      rollover-time: "00:00:00.001"
      subscribe-from-delta-messaging: true
      enable-analyst: true
    rdb:
      timeout: 30
      enable-analyst: true
    hdb:
      timeout: 30
      enable-analyst: true
    ipdb:
      write-freq: 60000
      write-row-limit: 0
      enable-analyst: true
    epdb:
      timeout: 0

Note

The indentation of YAML files is highly important in Refinery and incorrect indentation may prevent the PM from starting up. It is recommended to use a code editor such as Visual Studo Code with the YAML extension to ensure correct indentation

Pipeline definition

  name: DemoPipeline

Each pipeline's name must be unique. This is the name that will be used to start up the pipeline using the refinery pipeline --start

  type: realtime

This indicates that the pipeline is for real-time data capture, the most common type of pipeline. The other pipeline type, entrypoint, is for data query and analysis. It supports different process types.

  expose-to-gw: true

This allows the data captured by the pipeline to be queried using the gateway of an entrypoint pipeline that supports the taxonomy of this pipeline.

Taxonomy

  taxonomy:
    region: global
    data-source: demo
    data-class: demo
    sub-class: demo

The taxonomy parameter classifies the category of data this pipeline will capture. There are no set values for this parameter. There must be at least one table YAML file that contains the same taxonomy value of each pipeline in a Refinery deployment in order for the PM to start up correctly.

Process layout

  proc-layout:
    -
      all: all-processes

This parameter determines which server the processes in the pipeline will run on. For single node Refinery deployments, where Refinery is installed on one server and does not interact with a Refinery install on any other server, the default is the above value all-processes.

The values of this parameter must be present in the layout parameter of the system.yaml file.

For the following proc-layout in a pipeline on a cluster deployment:

proc-layout:
  -
    tp: tp-server
    rdb: rdb-server
    hdb: hdb-server
    ipdb: ipdb-server
    epdb: epdb-server

The corresponding system.yaml might look like:

system:
  layout:
    -
      name: tp-server
      nodes:
        -
          host: aaa.host.com
    -
      name: rdb-server
      nodes:
        -
          host: bbb.host.com
    -
      name: hdb-server
      nodes:
        -
          host: ccc.host.com

  default-cpu-taskset: 0-256

  data-hierarchy:
    - region
    - data-source
    - data-class
    - sub-class

  delta-messaging-server: DS_MESSAGING_SERVER:refinery_a

  timezone: UTC

  time-sort: false

To provide more flexibility when defining proc-layout, specify host and instances for a given *process-type* For example, with the following proc-layout we will define 2 rdb processes on the rdb-primary-server and 2 rdb processes on the rdb-secondary-server:

proc-layout:
  -
    tp: tp-server
    rdb:
      -
        host: rdb-primary-server
        instances: 2
      -
        host: rdb-secondary-server
        instances: 2
    hdb: hdb-server
    ipdb: ipdb-server
    epdb: epdb-server

Processes

Here the processes that make up the pipeline are defined; all out-of-the-box processes types are listed here.

  processes:
    tp:
      pub-mode: timer
      pub-freq-ms: 100
      log-to-journal: true
      rollover-mode: daily-at-time
      rollover-time: "00:00:00.001"
      subscribe-from-delta-messaging: true
      enable-analyst: true
    rdb:
      timeout: 30
      enable-analyst: true
    hdb:
      timeout: 30
      enable-analyst: true
    ipdb:
      write-freq: 60000
      write-row-limit: 0
      enable-analyst: true
    epdb:
      timeout: 0

Ensure that the processes included in the pipeline have the correct configuration.

Note

From Refinery 5.4.0, the pdb has been split into ipdb and epdb. If you are using Refinery 5.4.0 or later, a pipeline definition must contain an ipdb and epdb or the PM will not start. More information see here.

If you are including an RTE in the pipeline, a upd function must be implemented for each table matched with the pipeline, defined within a q script with the naming convention rte.*.q. This script must reside in the /src/lib folder of a client package.

Note

Using the rte.*.q naming convention will load the functionality on all RTE processes in the system. You can avoid this by omitting rte from the script name and including the script name in the additional-q-libraries parameter of the RTE in the pipeline YAML file. The script must also reside in the lib folder of a client package

Sample RTE upd function

For a schema named distanceTravelled, a simple upd function would take the form:

.rte.upds.distanceTravelled:{[t;x]
                t upsert x
        };

Sample table for pipeline

Now the sample pipeline needs a schema to match with its taxonomy. Below is a sample schema YAML file that references the same taxonomy as the sample pipeline.

table:
  name: sales
  id-col: sym
  time-col:  time
  intra-persist-type: splay
  end-persist-type: date-partition

  taxonomy:
    -
      region: global
      data-source: demo
      data-class: demo
      sub-class: demo

  columns:
    -
      name: time
      data-type: timestamp
      attribute: sorted
    -
      name: sym
      data-type: symbol
      attribute: grouped
    -
      name: price
      data-type: float
    -
      name: volume
      data-type: long

Note that the schema YAML file contains the same taxonomy as the sample pipeline. This is how the table is loaded onto the pipeline. A table can have many taxonomies, and so can be loaded onto many pipelines, but a pipeline can only have one taxonomy.

More information on creating schema is available here.

Sample table with multiple taxonomies

Below is another sample pipeline.

pipeline:
  name: stream
  type: realtime

  expose-to-gw: true

  taxonomy:
    region: global
    data-source: stream
    data-class: stream
    sub-class: stream

  proc-layout:
    - all: all-processes

  processes:
    tp:
      pub-mode: timer
      pub-freq-ms: 100
      log-to-journal: true
      rollover-mode: daily-at-time
      rollover-time: "00:00:00.001"
      enable-analyst: true
    rdb:
      timeout: 30
      enable-analyst: true
    hdb:
      timeout: 30
      enable-analyst: true
    ipdb:
      write-freq: 500000
      write-row-limit: 0
      enable-analyst: true
    epdb:
      enable-analyst: true
      timeout: 0

To configure the sample schema to be loaded onto this new sample pipeline as well as the previous sample pipeline, add the new pipeline's taxonomy to the schema's taxonomy.

table:
  name: sales
  id-col: sym
  time-col:  time
  intra-persist-type: splay
  end-persist-type: date-partition

  taxonomy:
    -
      region: global
      data-source: demo
      data-class: demo
      sub-class: demo
    -
      region: global
      data-source: stream
      data-class: stream
      sub-class: stream

  columns:
    -
      name: time
      data-type: timestamp
      attribute: sorted
    -
      name: sym
      data-type: symbol
      attribute: grouped
    -
      name: price
      data-type: float
    -
      name: volume
      data-type: long

Now both pipelines can upsert captured data to their own version of the sample table.