My first realtimeUDF¶
This topic will take you step-by-step through the process of making a realtimeUDF.
Step 1: creating the relevant files¶
Using the examples below create the following files.
Pipeline YAML¶
This is a simple pipeline YAML that we will use to demonstrate basic data flowing through the system.
example.pipeline.yaml
pipeline:
name: "examplePipeline"
type: "realtime"
proc-layout:
- all: all-processes
journal-replay-on-restart: false
expose-to-gw: true
enable-monitoring: true
startup-timeout: 60
taxonomy:
region: global
data-source: ex
processes:
tp:
pub-mode: timer
pub-freq-ms: 500
log-to-journal: false
rollover-mode: daily-at-time
rollover-time: 00:00:00.001
subscribe-from-delta-messaging: true
port: 6004
ipdb:
write-row-limit: 0
write-freq: 60000
port: 6005
epdb:
timeout: 0
rdb:
timeout: 20
port: 6003
hdb:
timeout: 30
port: 6000
idb:
timeout: 30
instances: 1
ctp:
pub-mode: direct
log-to-journal: false
rollover-mode: passthrough
slow-consumer-disconnect: true
slow-consumer-disconnect-bytes: 1000000
publish-to-delta-messaging: true
Table/schema¶
This is the table YAML that we will use to store our dummy data.
example.table.yaml
---
table:
name: exTab
id-col: id
time-col: time
intra-persist-type: splay
end-persist-type: date-partition
taxonomy:
-
region: global
data-source: ex
columns:
-
name: time
data-type: timestamp
attribute: sorted
-
name: id
data-type: symbol
attribute: grouped
-
name: number
data-type: long
RealtimeUDF¶
This is a simple realtimeUDF that will aggregate our numbers column from our exTab when triggered.
exampleRealtimeUDF.q
{select avg number from `exTab}
UDF trigger¶
Our trigger function here is looking for any values over 100 in the number column. If this happens, it will return 1b, triggering the realtimeUDF.
exTrigger.q
{100<first (exec num:max number from `exTab)`num}
realtimeConfig¶
The realtime config describes the setup of realtime user-defined functions.
realtimeConfig.csv
UDF,dataReq,dataTabChannel,triggered,trigTab,trigTabChannel,trigCond,trigFunc,initFunc,procNo
exampleRealtimeUDF,`exTab,examplePipeline.0.ctp.0,1,exTab,examplePipeline.0.ctp.0,,exTrigger,,0
Pub q¶
pub.q
tp:hopen 6004 //Opening connection to our TP
gen:{[n;p] //Function for generating some exTab data
([]id:n#`a;time:n#.z.p;number:n#p)
}
pub:{[t] //function that sends the data(t) to the TP which updates the exTab table with the data in the RDB
tp(`upd;`exTab;t)
}
pubData:{[] //function that generates data for TP
pub[gen[10;5]]
}
pubTrigger:{[] //function that will upload data that triggers our RTUDF
pub[gen[1;1000]]
}
Step 2 : uploading files to respective locations¶
YAML files¶
Add the pipeline YAML file to the ../refinery/system-config/pipelines directory. Then add the table YAML file to the ../refinery/system-config/tables directory.
q files¶
Add the exampleRealtimeUDF, exTrigger and the pub file to the [HOME] directory.
Note
It doesn't matter where you place your files as long as you use the --file flag to specify the location of your files. The files just need to be in same directory you're working in when you call the CLI.
CSV files¶
Add the realtimeConfig file to the [HOME] directory.
Step 3 : start up pipelines and workflows¶
Start up Refinery
refinery application --start-control
refinery application --start-daemon
refinery process-manager --start --wait
refinery workflow --start REFINERY_CORE_A
refinery workflow --start REFINERY_ENTRYPOINT_0_a
sleep 2
refinery pipeline --start DefaultEntrypoint
refinery pipeline --start RealtimeUDF
refinery pipeline --start examplePipeline
sleep 2
refinery workflow --start REALTIME_UDF_A
sleep 2
refinery service-class --start-template refinery-gw-client
Step 4 : setting the rtudf up¶
Now with Refinery up and running, we can upload the realtimeUDF example using the Refinery CLI. To upload the RTUDF into the system, we must run the following commands.
refinery udf --add --funcName exampleRealtimeUDF --funcType realtime --file exampleRealtimeUDF.q --description "example realtime udf"
refinery udf --add --funcName exTrigger --funcType trigger --file exTrigger.q --description "example trigger"
These commands upload our exampleRealtimeUDF and our exTrigger into the system for use. They can be seen here:
refinery udf --info
Next, we upload the realtimeConfig.csv into the KX Control .daas.udf.realtimeConfig.
refinery configuration --import --param .daas.udf.realtimeConfig --file realtimeConfig.csv
To see it uploaded in the system, run the following:
refinery configuration --show --param .daas.udf.realtimeConfig
Step 5 : publishing data and activating trigger¶
Now with all the setting up done, it's time to publish some data. Run the pub.q file by running:
q pub.q
This will connect to the Tickerplant and open up q window where you can then call the functions to publish data to the TP. pubData[] to publish regular data to the system and pubTrigger[] to publish the data that will trigger the RTUDF.
Note
In a real system, this pub.q script will just be replaced by the real-time feed. We are just using it here as a controlled environment to demonstrate data into the system.
Step 6 : realtimeUDF created¶
Now that data has been sent to the TP and the trigger activated, let's see if this is all working. Connect to the gateway client and query the table where all the data has been getting sent to.
.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`ex; `exTab; .z.d; .z.d; `a)]
This will display the data in the exTab table. To see where the RTUDF triggered, call the rtudfres table.
.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`rtudf; `rtudfres; .z.d; .z.d; `)]
To extract the table made by the RTUDF raze and extract the result column
raze .gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`rtudf; `rtudfres; .z.d; .z.d; `)]`result
or to use the getUDF function:
.gwClient.query.sync[`getUDF; `funcType`funcName`startTime`endTime!(`realtime;`exampleRealtimeUDF;.z.p-1D;.z.p)]
GetUDF will uj (union join) across the data of the given UDF, whereas getTicks extracts raw data from the realtime UDF results table.