My first realtimeUDF

This topic will take you step-by-step through the process of making a realtimeUDF.

Step 1: creating the relevant files

Using the examples below create the following files.

Pipeline YAML

This is a simple pipeline YAML that we will use to demonstrate basic data flowing through the system.

example.pipeline.yaml

pipeline:
  name: "examplePipeline"
  type: "realtime"

  proc-layout:
    - all: all-processes

  journal-replay-on-restart: false
  expose-to-gw: true
  enable-monitoring: true
  startup-timeout: 60

  taxonomy:
    region: global
    data-source: ex

  processes:
    tp:
      pub-mode: timer
      pub-freq-ms: 500
      log-to-journal: false
      rollover-mode: daily-at-time
      rollover-time: 00:00:00.001
      subscribe-from-delta-messaging: true
      port: 6004
    ipdb:
      write-row-limit: 0
      write-freq: 60000
      port: 6005
    epdb:
      timeout: 0
    rdb:
      timeout: 20
      port: 6003
    hdb:
      timeout: 30
      port: 6000
    idb:
      timeout: 30
      instances: 1
    ctp:
      pub-mode: direct
      log-to-journal: false
      rollover-mode: passthrough
      slow-consumer-disconnect: true
      slow-consumer-disconnect-bytes: 1000000
      publish-to-delta-messaging: true

Table/schema

This is the table YAML that we will use to store our dummy data.

example.table.yaml

---
  table: 
    name: exTab
    id-col: id
    time-col: time
    intra-persist-type: splay
    end-persist-type: date-partition
    taxonomy:
      -
        region: global
        data-source: ex
    columns: 
      - 
        name: time
        data-type: timestamp
        attribute: sorted
      - 
        name: id
        data-type: symbol
        attribute: grouped
      - 
        name: number
        data-type: long

RealtimeUDF

This is a simple realtimeUDF that will aggregate our numbers column from our exTab when triggered.

exampleRealtimeUDF.q

{select avg number from `exTab}

UDF trigger

Our trigger function here is looking for any values over 100 in the number column. If this happens, it will return 1b, triggering the realtimeUDF.

exTrigger.q

{100<first (exec num:max number from `exTab)`num}

realtimeConfig

The realtime config describes the setup of realtime user-defined functions.

realtimeConfig.csv

UDF,dataReq,dataTabChannel,triggered,trigTab,trigTabChannel,trigCond,trigFunc,initFunc,procNo
exampleRealtimeUDF,`exTab,examplePipeline.0.ctp.0,1,exTab,examplePipeline.0.ctp.0,,exTrigger,,0

Pub q

pub.q

tp:hopen 6004 //Opening connection to our TP

gen:{[n;p] //Function for generating some exTab data
    ([]id:n#`a;time:n#.z.p;number:n#p)
    }

pub:{[t] //function that sends the data(t) to the TP which updates the exTab table with the data in the RDB
    tp(`upd;`exTab;t)
    }

pubData:{[] //function that generates data for TP
    pub[gen[10;5]]
    }

pubTrigger:{[] //function that will upload data that triggers our RTUDF
    pub[gen[1;1000]]
    }

Step 2 : uploading files to respective locations

YAML files

Add the pipeline YAML file to the ../refinery/system-config/pipelines directory. Then add the table YAML file to the ../refinery/system-config/tables directory.

q files

Add the exampleRealtimeUDF, exTrigger and the pub file to the [HOME] directory.

Note

It doesn't matter where you place your files as long as you use the --file flag to specify the location of your files. The files just need to be in same directory you're working in when you call the CLI.

CSV files

Add the realtimeConfig file to the [HOME] directory.

Step 3 : start up pipelines and workflows

Start up Refinery

refinery application --start-control
refinery application --start-daemon

refinery process-manager --start --wait

refinery workflow --start REFINERY_CORE_A
refinery workflow --start REFINERY_ENTRYPOINT_0_a

sleep 2
refinery pipeline --start DefaultEntrypoint
refinery pipeline --start RealtimeUDF
refinery pipeline --start examplePipeline

sleep 2

refinery workflow --start REALTIME_UDF_A

sleep 2

refinery service-class --start-template refinery-gw-client

Step 4 : setting the rtudf up

Now with Refinery up and running, we can upload the realtimeUDF example using the Refinery CLI. To upload the RTUDF into the system, we must run the following commands.

refinery udf --add --funcName exampleRealtimeUDF --funcType realtime --file exampleRealtimeUDF.q --description "example realtime udf"
refinery udf --add --funcName exTrigger --funcType trigger --file exTrigger.q --description "example trigger"

These commands upload our exampleRealtimeUDF and our exTrigger into the system for use. They can be seen here:

refinery udf --info

Next, we upload the realtimeConfig.csv into the KX Control .daas.udf.realtimeConfig.

refinery configuration --import --param .daas.udf.realtimeConfig --file realtimeConfig.csv

To see it uploaded in the system, run the following:

refinery configuration --show --param .daas.udf.realtimeConfig

Step 5 : publishing data and activating trigger

Now with all the setting up done, it's time to publish some data. Run the pub.q file by running:

q pub.q

This will connect to the Tickerplant and open up q window where you can then call the functions to publish data to the TP. pubData[] to publish regular data to the system and pubTrigger[] to publish the data that will trigger the RTUDF.

Note

In a real system, this pub.q script will just be replaced by the real-time feed. We are just using it here as a controlled environment to demonstrate data into the system.

Step 6 : realtimeUDF created

Now that data has been sent to the TP and the trigger activated, let's see if this is all working. Connect to the gateway client and query the table where all the data has been getting sent to.

.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`ex; `exTab; .z.d; .z.d; `a)]

This will display the data in the exTab table. To see where the RTUDF triggered, call the rtudfres table.

.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`rtudf; `rtudfres; .z.d; .z.d; `)]

To extract the table made by the RTUDF raze and extract the result column

raze .gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`rtudf; `rtudfres; .z.d; .z.d; `)]`result

or to use the getUDF function:

.gwClient.query.sync[`getUDF; `funcType`funcName`startTime`endTime!(`realtime;`exampleRealtimeUDF;.z.p-1D;.z.p)]

GetUDF will uj (union join) across the data of the given UDF, whereas getTicks extracts raw data from the realtime UDF results table.