Creating pipelines

When modifying the YAML files for Refinery, it is highly recommended to use Visual Studio Code with the YAML extension and configure it to reference the Refinery YAML schemas, which can be found as part of the Refinery package that you installed at [DELTA_HOME]/kxinstall/refinery/packages/DaaSCore_*/yaml-schemas.

System

This configuration defines system-level properties of the application. A single YAML file is supported and should be stored at $DELTADATA_HOME/refinery/system-config/system/system.yaml.

Supported parameters:

Parameter name Description
layout Array of tag names and associated servers for each
data-hierarchy Array of routing parameters to unique identify pipelines within the system
delta-messaging-server The configuration parameter for the Delta Messaging Server to use
time-sort If true all table results will be sorted by their primary time column
timezone The default time zone that all data is stored in
default-cpu-taskset A default setting for the CPU cores that all Refinery processes should run on

Pipeline

This configuration defines a single data capture stack of processes. Each YAML file should contain a single pipeline and be stored within $DELTADATA_HOME/refinery/system-config/pipelines.

Info

The link between the pipelines and schemas is defined by the taxonomy parameter

Supported parameters:

Parameter name Description Supported values
name Unique name for the pipeline
type The type of pipeline realtime
entrypoint
environment Environment variable to parse procLayout / processes (if defined) (see below)
proc-layout The layout of the pipeline based on the system server layout (see below)
enable-auto-data-copy If automatic data copy between pipeline instances should be enabled true
false
custom-data-copy Additional configuration for data copy (see below)
enable-monitoring Enable Refinery Monitoring within this pipeline true
false
taxonomy The taxonomy of this pipeline based on the system data hierarchy
startup-timeout How long to wait for the pipeline to start up before timing out (seconds)
additional-q-libraries A list of additional q libraries to load into all processes on boot
processes The processes that should be included in the pipeline (type-specific)

To run multiple pipeline instances (for high availability), multiple entries in the proc-layout object must be defined.

Environment

Note

This is an optional pipeline parameter and is only necessary if maintaining multiple environments.
See Process layout by environment and Process configuration by environment.

To utilise the environment parameter within the procLayout and/or processes, you must first define an environment variable on the system running Refinery, which can be done in one of two ways:

  1. Add the environment variable to $DELTABIN_HOME/delta.profile (Recommended)

OR

  1. Export the environment variable via the command-line
export KX_REFINERY_ENV=prod

For either approach, after defining the environment variable you will need to restart Delta Control and Refinery processes to detect the environment variable. See Quick Start Guide for restarting processes.

Process layout

proc-layout defines how the processes within a pipeline are physically laid out on the servers available. Servers are "tagged" in the system configuration, and these tags are used in this element for layout.

Each element of the array is an object that defines an instance of the pipeline. Running multiple instances allows for redundancy in the data capture.

The supported keys of the object are:

  • all: All processes in the pipeline
  • *process-type*: All processes of the specified type in the pipeline
  • *process-type*.*process-instance*: The specified instance of the process type in the pipeline

within each object, the additional supported keys are:

  • host: Process host for *process-type*
  • instances: Number of processes for *process-type*

Note

host and instances must be supplied together when defined in proc-layout for each *process-type*

all example

proc-layout:
  -
    all: all-processes

*process-type* example

proc-layout:
  -
    tp: tp-server
    rdb: rdb-server
    hdb: disk-processes-server
    ipdb: disk-processes-server
    epdb: disk-processes-server

*process-type*.*process-instance* example

proc-layout:
  -
    tp: tp-server
    rdb.0: rdb-server-1
    rdb.1: rdb-server-2
    hdb: disk-processes-server
    ipdb: disk-processes-server
    epdb: disk-processes-server

Process layout by environment

Note

This is an optional additional level of defining process layouts based on an environment variable.

proc-layout:
  -
    all-environments:
      all: default-server
    dev:
      tp: tp-server
      rdb: rdb-server
      hdb: disk-processes-server
      ipdb: disk-processes-server
      epdb: disk-processes-server
    prod:
      tp: tp-server
      rdb:
        -
          host: rdb-server
          instances: 2
        -
          host: rdb-server2
          instances: 2
      hdb: disk-processes-server
      ipdb: disk-processes-server
      epdb: disk-processes-server

In this example, there will be a total of 1 rdb instances for dev on rdb-server. There will be 4 rdb instances for prod, 2 on rdb-server and 2 on rdb-server2. This example assigns all of the processes to the same cluster-number of 0.

See here for more information on Creating environment-based pipelines

Warning

It is not possible to mix & match the default proc-layout with environment based tags; i.e., within a given pipeline.yaml only one layout format is supported. For example, the following is an invalid template.

proc-layout:
  -
    all: default-server
    tp: tp-server
    rdb: rdb-server
    hdb: disk-processes-server
    ipdb: disk-processes-server
    epdb: disk-processes-server
  -
    dev:
      tp: tp-server
      rdb: rdb-server
      hdb: disk-processes-server
      ipdb: disk-processes-server
      epdb: disk-processes-server

Pipeline naming structure

Within any pipeline naming structure, you will always have 4 sections:

  • Pipeline name
  • Cluster number
  • Process type
  • Process instance

These are combined to make the unique pipeline process names. Using the default entrypoint gateway as an example, DefaultEntrypoint.0.gw.0. It is visible to see the pipeline structure here with pipeline-name being DefaultEntrypoint, cluster-number being 0, process-type being gw and process-instance being 0.

This follows through when using a pipeline in a hot-hot system where you are looking for fault tolerance. If you assign multiple hosts/servers to your pipeline's proc-layout, as seen in this example, then you'll see a pipeline process name of pipeline-name.0.process-type.process-instance and pipeline-name.1.process-type.process-instance, as there is two different servers in this example.

Real-time pipeline configuration

If type is realtime, these additional parameters are supported:

Parameter name Description Supported values
journal-replay-on-restart Should all TP clients replay the journal if they restart intra-day true
false
expose-to-gw Should the data processes in this pipeline be available for GW query true
false
routing Additional routing configuration for GW
allow-non-timeseries-data Configuration to allow non-time series data to be loaded into the system true
false
timezone Assigns individual pipelines to different time zones Supported time zones
gw-rerouting-procs Re-routes gateway queries to another available instance if a process instance fails true
false
rollover-exclude-today Enables the inability to rollover today's data to the HDB true
false

The following processes are supported in a real-time pipeline (additional custom processes can be defined as required):

  • tp: TickerPlant (required)
  • ipdb: Intraday Persisting Database
  • epdb: End of day Persisting Database
  • idb: Intraday Database
  • hdb: Historical Database
  • rdb: Real-time Database
  • ctp: Chained TickerPlant
  • rte: Real-Time Processing Engine
  • ldidb: Late Data Intraday Database

Re-routing gateway queries

The gw-rerouting-procs configuration allows rerouting of gateway queries if one of your database process instances fails. This occurs when the RDB, HDB, or RTE instances (0, 1, or 2) are distributed across three different servers, and one server becomes unavailable. In this case, the queries are rerouted to the remaining available instances.

Exclude today end-of-day rollover

The rollover-exclude-today configuration allows the pipeline to disable the end-of-day (EOD) rollover of today's data getting written to the HDB. This means that at EOD (00:00:00) any data written to the RDB after midnight remains in the RDB and is not transferred to the HDB. As a result, .z.D-1 represents the 'current' day, and any data older than this is considered late data.

Entrypoint pipeline configuration

If type is entrypoint, these additional parameters are supported:

Parameter name Description Supported values
enable-legacy-routing If true, use the legacy-routes state configuration file true
false
expose-to-qr If true, allow routing via the QR / QP mechanism true
false

The following processes are supported in an entrypoint pipeline:

  • gw: Refinery Gateway
  • udfp: UDF processor

gw

Parameter name Description Supported values
multi-pipe-route If true, queries to the gateway can target multiple pipelines with the same schema. Allows further data sharding. true
false

Process configuration

The following process types have no additional configuration other than the common configuration described below:

  • hdb
  • gw
  • udfp

Common

ALL processes have a common set of properties that can be set:

Parameter name Description Supported values
console-size Console size (\c) setting
tls-server-mode Sets the TLS mode for the process 0 = plain
1 = mixed
2 = TLS only
garbage-collection Sets the garbage collection mode 0 = deferred
1 = immediate
timeout Timeout in seconds for client queries (\T)
secondary-threads Number of secondary threads to run the process with (-s)
cpu-taskset The CPU cores to allow this process to run on
port The port to listen on. A random port is selected if not specified
instances The number of instances of this process type to run
enable-analyst Allow analyst connections to this process type true
false
additional-q-libraries A list of additional q libraries to load into this process type on boot

TP consumers

The following process types have no additional configuration other than the TP Consumer configuration defined here:

  • rdb
  • rte
Parameter name Description Supported values
receive-mode Defines how the data is received from the upstream Tickerplants all
sharded

If receive-mode is sharded:

Parameter name Description Supported values
shard-type The shard type to use (custom must be added to 'shard' library) md5
firstLetter
shard-col Column used as the argument to the sharding function

tp

Parameter Name Description Supported Values
pub-mode The downstream publication method direct
timer
timer-with-row-batch
log-to-journal Should the TP log every update to a local file true
false
rollover-mode How the pipeline rollover should be initiated passthrough
daily-at-time
subscribe-from-delta-messaging Allow updates to be received via DMS. Topic name will be the process name true
false

If pub-mode is timer:

Parameter name Description Supported values
pub-freq-ms Frequency in milliseconds to publish batches

If pub-mode is timer-with-row-batch:

Parameter name Description Supported values
pub-freq-ms Frequency in milliseconds to publish batches
pub-row-limit Row limit that would trigger a batch to be published

If rollover-mode is daily-at-time:

Parameter name Description Supported values
rollover-time The time of day to initiate a rollover

ipdb

The IPDB uses all the "TP Consumers" configuration above as well as:

Parameter name Description Supported values
write-freq The frequency in milliseconds to write to disk
write-row-limit The number of rows once received trigger a write to disk 0 = disabled

ctp

Parameter name Description Supported values
slow-consumer-disconnect Should consumers be disconnected if they cannot process updates quickly enough true
false
slow-consumer-disconnect-bytes The number of pending bytes outstanding that causes a disconnect
publish-to-delta-messaging Allow updates to be sent via DMS. Topic name will be the process name true
false

Process configuration by environment

Note

This is an optional additional level of defining process setting overrides based on an environment variable

processes:
    all-environments:
      tp:
        pub-mode: timer
        pub-freq-ms: 100
        log-to-journal: true
        rollover-mode: daily-at-time
        rollover-time: "00:00:00.001"
        enable-analyst: true
        subscribe-from-delta-messaging: true
      rdb:
        timeout: 30
        enable-analyst: true
        instances: 2
      hdb:
        timeout: 30
        enable-analyst: true
      ipdb:
        write-freq: 30000
        write-row-limit: 0
      epdb:
        timeout: 0
    dev:
      ipdb:
        write-freq: 20000
    prod:
      ipdb:
        write-freq: 60000

More information on Setting environment based process settings

Schema

This configuration defines a kdb+ table within the system, which pipelines to be available within and how the table should be persisted to disk. Each YAML file should contain a single table and be stored within $DELTADATA_HOME/refinery/system-config/tables.

Supported parameters:

Parameter name Description Supported values
name The name of the table within the system (API parameter dataType)
columns Array of columns (see below)
id-col The primary identifier column of the table (API parameter idList and parted column for persistence)
time-col The primary time column of the table (sorted on query and persistence)
taxonomy Array of taxonomy's used to determine which pipelines will have this table available
intra-persist-type The intra-day persistence type. For most tables, we recommend using splay none
splay
parted
end-persist-type The persistence type into the HDB date-partition
month-partition
year-partition
root-splay
end-of-day-sort Optional sorting of intra-day data at EOD. Defaults to true if not specified true
false
is-reference-data Defines that this table contains reference data. Defaults to false if not specified true
false

End persist type

Date partitions vs year/month partitions

Date partitions should be your default partitioning method. It will store your data in partition by date (e.g. 2023.01.01). This is best used for a large number of data points.

Month partitions will store your data in partitions by month (e.g. 2023.01). Year partitions will store your data in partitions by year (e.g. 2023). This must only be used with a low number of data points; otherwise query times will be affected.

The only configuration required to apply date-partition is adding it as the end-persist-type parameter.

Note

Within a single pipeline, your tables must be of the same partition type, date, month or year.

Columns

Each element of the columns array defines a column in the table. Supported parameters are:

Parameter name Description Supported values
name The name of the column
data-type The type of the column. For complex typed columns do not specify boolean
guid
byte
short
int
long
real
float
char
symbol
timestamp
month
date
datetime
timespan
minute
second
time
attribute The attribute to apply to column sorted
grouped
unique
parted
exchangeColumn Optional definition that a column is an exchange column, default is false true
false

Exchange column

Exchange columns are timestamp only columns that DO NOT get converted due to time zone conversions as they are assumed to have a time zone associated with the local exchange. These columns are for specific timestamp values typically being ingested from stock exchanges.

Is-reference-data

This parameter instructs the HDB to treat the table as part of an HDB-only pipeline, where no process boundary exists.

You can pair is-reference-data tables with the non-timeseries parameter to enable tables without a time column to function within Refinery.

These tables cannot be queried directly using getTicks or getStats. However, you can reference them through a kdb foreign key link.

Runtime

The Process Manager will look for all YAML configuration within $DELTADATA_HOME/refinery/system-config:

  • system: System-wide configuration
  • pipelines: Pipeline definition
  • tables: Table schema and persistence definition

Currently, any changes to any of the YAML configuration files within this folder requires the Process Manager to be restarted.

Examples

System example

system:
  layout:
    -
      name: all-processes
      nodes:
        -
          host: localhost

  default-cpu-taskset: 0-256

  data-hierarchy:
    - region
    - data-source
    - data-class
    - sub-class

  delta-messaging-server: DS_MESSAGING_SERVER:refinery_a

  timezone: UTC

  time-sort: false

Note

When defining the host for each process in system.yaml, the hostname must match the domain name referenced in the <DELTACONTROL_HOST> field of delta.profile in order for Refinery processes to boot successfully. This name can be either the fully qualified domain name or shortened hostname of an environment, as long as there is consistency across the system. The name must also be present under /etc/hosts within the environment.

Pipeline example

This example pipeline defines:

  1. A real-time data capture pipeline that can be queried via the GW
    • Available via demo data source
  2. A tickerplant
    • Runs on a static port 32121
    • Publishes downstream every 100ms
    • Triggers rollover at 00:00:00.001 every day
    • Logs to a journal file
  3. A real-time database
    • Runs on a static port 32122
    • Query timeout of 30 seconds
  4. An historical database
    • Query timeout of 30 seconds
  5. A persisting database
    • Writes to disk in 1 minute intervals
    • Does not write based on rows received from upstream
pipeline:
  name: DemoPipeline
  type: realtime

  expose-to-gw: true

  taxonomy:
    region: global
    data-source: demo

  proc-layout:
    -
      all: all-processes

  processes:
    tp:
      port: 32121
      pub-mode: timer
      pub-freq-ms: 100
      log-to-journal: true
      rollover-mode: daily-at-time
      rollover-time: "00:00:00.001"
    rdb:
      port: 32122
      timeout: 30
    hdb:
      timeout: 30
    ipdb:
      write-freq: 60000
      write-row-limit: 0
    epdb:
      timeout: 30
    idb:
      timeout: 300

Schema example

This example schema defines:

  1. A kdb+ table
  2. Will be associated with any pipeline with a taxonomy of global region and demo data source
  3. The primary ID column in sym
    • The IPDB will apply the parted attribute to this column
  4. The primary time column is time
    • The IPDB will sort by this column (within the primary ID column)
  5. The IPDB will store this in a splay intraday and EPDB date partition at end of day

The equivalent schema in kdb+ form:

update time:`s#time, sym:`g#sym from flip `time`sym`price`volume!"PSFJ"$\:()
table:
  name: DemoTable
  id-col: sym
  time-col:  time
  intra-persist-type: splay
  end-persist-type: date-partition

  taxonomy:
    -
      region: global
      data-source: demo

  columns:
    -
      name: time
      data-type: timestamp
      attribute: sorted
    -
      name: sym
      data-type: symbol
      attribute: grouped
    -
      name: price
      data-type: float
    -
      name: volume
      data-type: long