Real-time UDF

The real-time user-defined function (RTUDF) framework enables Refinery to run custom functions on data in real-time. The setup of this involves defining functions and setting configuration parameters appropriately.

Once set up the results of these real-time UDFs will flow through a standard tick capture system, available for stream and persistence. The data is accessible historically via the getUDF API.


q real-time UDFs


  • Real-time UDF functions can either take no arguments or the argument set of (tableName;tableData).

  • Real-time UDFs will have access to the get* APIs, that will operate on a local set of data. Temporal arguments are not required (startDate, endDate, startTime, endTime)

  • The data available to a RTUDF will be the set of data since the last trigger of the RTUDF

  • RTUDFs should output a table. If the output is not a table, it will be put into a 1x1 table with column name result

q syntax taking arguments - qUDF

{[tabName;data]
    select avg price from data
    }

q syntax, no arguments, using getTicks API - qUDF

{[]
    select avg price from getTicks[`symList`assetClass`dataType!`VOD.L`equity`trade]
    }

Trigger functions

  • Trigger functions operate on a table to determine whether a UDF should be run.

  • Trigger functions take a singular argument, which is the table listed in the trigTab field.

  • Trigger functions must output a boolean true or false, representing run UDF or not respectively.

Example trigger function - qTriggerFunc

{[data]
    if[`mySym in data`sym;
        :1b;
        ];
    0b
    }

In this example, we would execute our UDF in the event of my trigTab containing the sym `mySym.

Initialization functions

Initialization functions are run on boot of the UDF worker nodes to prepare the process for any UDF execution. These are useful for doing any one-time loads of configuration, libraries, or reference data.

  • Initialization functions must take no arguments

Example initialization function - qInitFunc

{[]
    system"l ",getenv[`UDF_LIBS_DIR],"/myDependency.q";

    .myUDF.referenceTable:([]sym:`a`b;num:1 2)
    }

In this example, we load in a q script containing a dependency and set a table that we can later reference in a namespace.

  • Note: dependencies can be placed in the UDF_LIBS_DIR which is located in the udfdir. Dependencies will then be included when the UDF package is exported via the CLI.

Python real-time UDFs


The real-time UDF framework supports Python 3 integration out of the box. Due to the need to map kdb tables to more useful formats, Python real-time UDFs support pre and post execution functions. These are q functions that operate on the inbound data and outbound result respectively before and after execution of the Python RTUDF. A basic use case is to convert a q table to a pandas dataframe.

Pre and post execution functions are optional. If unconfigured or left blank, no function will be run and input/output will be directly handed between q and Python.

Python UDFs can be defined in two formats:

  • A simple function definition
  • A method of a class

Python function syntax taking argument - pyUDF

def pyUDF(data):
    return data

This function takes a data frame and returns it, but represents the data flowing through a Python function.

Note

The function name must match the file name.

Python method syntax taking argument - myMethod

class pyUDF:

    def myMethod(self,data):
        return data

Note

The class name must match the file name.

Note

The class should be instantiated either in the initialization function using the ml.q library, or at the bottom of the Python UDF script.

Note

Python UDFs must have .p extension to be loaded by embedpy (as oppose to .py).

Pre execution function - qToDataframePreExFunc

{[t;d] .ml.tab2df d}

This function makes use of the ml.q utility function to convert a q table to a dataframe.

Post execution function - dataframeToQPostExFunc

{[t;d] .ml.df2tab .p.wrap d}

This function makes use of the ml.q utility function to convert a dataframe to a q table.

Configuring pre and post execution functions for Python UDFs

The configuration parameter for pre/post execution functions on Python UDFs is separate from the main real-time config. The parameter is .daas.udf.pythonRTUDFConfig.

Parameter Description Example
udfName The name of the Python UDF pythonUDF
preExFunc Function to run before execution. Leave blank for none. qToDataframePreExFunc
postExFunc Function to run after execution. Leave blank for none. dataframeToQPostExFunc
method Method of class to be run, if not using a function. Leave blank if using function myMethod

This can be managed via the Refinery CLI.

Using custom Python library functions inside a UDF

A set of Python utility functions can be referenced in a Python UDF if they are imported on initialization. Consider the setup below

File structure:

user@machine:~/data/refinery/refinery/udf> tree
.
├── libraries
│   ├── libfunc.p
├── realtime
│   ├── myLibLoadInitFunc.q
│   ├── myPythonUDF.p
│   ├── myPythonUDF.txt
├── static
└── trigger

Refinery UDF configs:

/.daas.udf.realtimeConfig

UDF         dataReq  dataTabChannel       triggered trigTab trigTabChannel trigCond trigFunc initFunc          procNo
---------------------------------------------------------------------------------------------------------------------
myPythonUDF "`trade" testPipeline.0.ctp.0 0                                ""                myLibLoadInitFunc -1    

/.daas.udf.pythonRTUDFConfig

udfName     preExFunc postExFunc method
---------------------------------------
myPythonUDF                            

UDF definitions:

user@machine:~/data/refinery/refinery/udf> cat libraries/libfunc.p
def libfunc ():
        print("libfunc")
        x = 111
        return x


user@machine:~/data/refinery/refinery/udf> cat realtime/myLibLoadInitFunc.q
{
    system"l ",getenv[`UDF_LIBS_DIR],"/libfunc.p"
    }


user@machine:~/data/refinery/refinery/udf> cat realtime/myPythonUDF.p
def myPythonUDF (data):
        x = libfunc()
        return x

With the above setup, we have a UDF myPythonUDF executing on every trade tick. It has its initialization function set to myLibLoadInitFunc which is a simple q script, pointing at a load of the libfunc.p Python script. This ensures that any function definitions inside libfunc.p are accessible from within the Python UDF.

Dependencies

Required - Python 3.6/3.7, KxEmbedPy (installable .tgz), Multiprocessing (Python library).

Recommended - ml.q (KX machine learning library), Pandas (Python library).

Recommended dependencies will enable smoother conversion between q tables and Python. See preExFunc example.

Parallel execution of Python UDFs

There are two methods for parallel execution of UDFs. This means that Python RT UDFs that operate on the same set of data set can be run simultaneously. This will be determined by UDFs that share a trigger function, data requirement, and pre-execution function.

Multithreading

The most optimal method of paralellisation makes use of the peach operator. By setting the realtime-udf-threads variable to number of threads in the system.yaml config, the real-time UDF node can use multithreading.

Note

Global operations cannot be used with multithreading.

Python multiprocessing

If the system is restricted for kdb cores, then the alternative method of parallel execution makes use of the multiprocessing module in Python to have a pool of Python processes running behind each embedPy worker node.

Note

The Python multiprocessing pool appears as child processes of the q worker node when viewed through ps -ef.

This can be turned on using the realtime-udf-python-muliprocess=true flag in the system.yaml config file. By default (no flag) this is false. If set to true, there will be no multithreading use of peach.

Python library management

Each Python real-time UDF may require different Python libraries, as well as different versions of the same libraries. To manage this UDFs can be stored in a nested tree under udf/realtime alongside a requirements.txt file.

A Python UDF will inherit the requirements.txt lower down in the tree if it does not have one higher up the tree (e.g. in its directory).

user@machineName:~/data/refinery/refinery/udf/realtime> tree
.
├── foo
│   ├── fizz
│   │   ├── pyfunc6.py
│   │   └── requirements.txt
│   ├── buzz
│   │   └── pyfunc5.py
│   ├── pyfunc4.p
│   ├── pyfunc3.p
│   └── requirements.txt
├── bar
│   ├── pyfunc2.p
│   ├── pyfunc1.p
│   └── requirements.txt
├── myInitFunc.q
└── myInitFunc.txt


In the above example, pyfunc1 and pyfunc2 will share a requirements.txt in bar. pyfunc3 , pyfunc4 and pyfunc5 will share a different requirements.txt in foo, whereas pyfunc6 will have its own, from foo/fizz. This is achieved with the use of venv.

When the UDF directory is set up as above, the UDF daemon will group UDFs by their requirements.txt and run the node under a venv.

This means that pyfunc1 and pyfunc3 cannot run on the same node, even if they share a data dependency. Therefore, procNo allocation must either be -1 (auto allocate) or set such that they are on seperate nodes. For example, the following allocations are valid:

UDF procNo
pyfunc1 -1
pyfunc2 -1
pyfunc3 -1
UDF procNo
pyfunc1 1
pyfunc2 1
pyfunc3 2
UDF procNo
pyfunc1 1
pyfunc2 2
pyfunc3 3

Whereas, the following is not. The UDF deamon will error and refuse to boot.

UDF procNo
pyfunc1 1
pyfunc2 2
pyfunc3 2

venv creation is managed by the UDF daemon on startup. The venvs will live under refinery/venv. Any new requirements that are added to a requirement.txt will be installed/updated on the next startup of the UDF daemon.


Adding new functions


New UDFs can be added to the system and managed using the Refinery CLI. Usage of the refinery udf command is as follows:

    --add                                       Add new UDF
        --funcName                              Name of the function
        --funcType                              realtime / static / trigger
        --file                                  Location of UDF function
        [--description]                         Location of text file with description OR written string description
        [--language]                            To specify the language the UDF is written in. Parameter sub

    --modify                                    Update a UDF
        --funcName
        [--file]                                Location of new UDF function
        [--description]                         Location of text file with description OR written string description
        [--language]                            To specify the language the UDF is written in. Parameter sub

    --delete                                    Delete a UDF
        --funcName                              Name of UDF to delete

    --info                                      List information on all UDFs available, including function type, code and description
        [--funcName]                            Specify function name for information on this function only

    --export                                    Package up entire contents of UDF directory into an installable .tgz file
        [--dir]                                 Directory output package to (default is current working directory)

    --import                                    Install a .tgz UDF package
        --file                                  Location of .tgz file

    --reload                                    Force load any underling UDF directory changes into UDF processes. Automatically done by CLI operations

For example, adding my real-time UDF:

refinery udf --add --funcName qUDF --funcType realtime --file qUDF.q

And adding the trigger function to accompany it:

refinery udf --add --funcName qTriggerFunc --funcType trigger --file qTriggerFunc.q

Note

Initialization funcType is `realtime.

refinery udf --add --funcName qInitFunc --funcType realtime --file qInitFunc.q

Note

UDFs are parsed on loading into system to verify that they are structured correctly. Comment lines in q files may cause an error. It is suggested the q files be comment free, the accompanying .txt description file can be used to document.


Configuration


The realtime config describes the setup of real-time user-defined functions. It is named .daas.udf.realtimeConfig.

Parameter Description Example
UDF Name of function myRealtimeUDF
dataReq Table(s) required `test`quote
dataTabChannel Channel of the tickerplant that is publishing the required data refinitivEquity.0.ctp.0
triggered 1b - runs the UDF when the trigger condition/triggerFunc is satisfied.
0b - runs the UDF on every update of the dataReq table
1b
trigTab Table to run trigger against metadata
trigTabChannel Channel of the tickerplant that is publishing the trigger table refinitivEquity.0.ctp.0
trigCond kdb where clause parse tree as condition to satisfy on the trigger table. Should be left blank if the trigger function is used enlist(=;`sym;enlist`VOD.L)
trigFunc Name of the trigger function UDF to run on the trigger table myTrigFunc
initFunc UDF function to be run on boot of UDF process. Used to load in any configuration or dependencies once on startup myInitFunc
procNo Which process to allocate the UDF to. If set to -1 the system will auto-allocate for best performance. -1

Configuration management

The realtimeConfig can be managed using the configuration management CLI with the Refinery configuration command and following syntax:

    --list                          List editable config parameters
        [--category]                Only display config parameters of a specified category

    --show                          Print out configuration parameter
        --param                     Configuration parameter name

    --export                        Export the configuration parameter to .csv
        --param                     Configuration parameter name
        [--dir]                     Output to specified location

    --import                        Import and overwrite configuration parameter with new file
        --param                     Configuration parameter name
        --file                      Specified .csv file
        [--do-not-validate]         Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)

    --delete-rows                   Deletes the last row of the configuration parameter
        --param                     Configuration parameter name
        [--index]                   Deletes specified indicies

    --add-rows                      Adds rows to configuration parameter
        --param                     Configuration parameter name
        [--file]                    Loads rows from specified .csv file
        [--<CONFIG_COLUMN_1>]
        [--<CONFIG_COLUMN_2>]
        [...                ]
        [--<CONFIG_COLUMN_N>]       For configuration not loaded from file, arguments must be specified with a flag per column
        [--do-not-validate]         Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)

To see what the current real-time config looks like, use the command:

refinery configuration --show --param .daas.udf.realtimeConfig

You can then replace it with your new copy:

refinery configuration --import --param .daas.udf.realtimeConfig --file myNewConfig.csv

Querying results


You can access outputs of real-time UDFs through the getUDF API call. Parameter inputs are as follows:

Parameter Description Mandatory Example Type
funcType Must be set to `realtime realtime Symbol
funcName The name of the UDF that produced the output qUDF Symbol
startTime Start point of query .z.p-01:00:00 timestamp
endTime End point of query .z.p timestamp

Note

If startTime and endTime are not provided, the most recent value for today for the requested funcName will be returned.

Note

If multiple results are within the startTime and endTime, they will joined via uj. If this fails due to incompatible outputs, they will be returned as a list.

Streaming results of real-time UDFs

Streaming subscriptions can be set up for real-time UDFs similarly to any other streaming call. The only parameters to hand to the getUDF call are funcName and funcType.

.stream.regSub[`getUDF;`funcName`funcType!(`qUDF;`realtime);period;callback;`;`user]
  • Streaming calls will output in the full results table format, with timestamp, udfName, and result columns.

Framework architecture notes

The framework consists of a group of worker nodes that publish to a standard tick stack. The group of worker nodes are managed by a singular daemon. The group of worker nodes will be dynamically scaled to

  1. Minimize memory utilization
  2. Maximize CPU parallelization

This means that if two real-time UDFs share a common data set, they will be assigned to the same worker node to reduce data duplication. If they have distinct data sets, they will be on separate nodes to enable parallel execution.

You can overwritte this with manual configuration of the procNo field in the real-time configuration.

Data flushing within worker nodes

Subscribed data is held in memory and 'reserved' by a real-time UDF. Once a real-time UDF is executed, it will then only 'reserve' data from that time onwards. If no other UDF has claim to any previous data, it will be flushed from memory.

This means that each real-time UDF will only have access to data since its last execution.