Real-time UDF¶
The real-time user-defined function (RTUDF) framework enables Refinery to run custom functions on data in real-time. The setup of this involves defining functions and setting configuration parameters appropriately.
Once set up the results of these real-time UDFs will flow through a standard tick capture system, available for stream and persistence. The data is accessible historically via the getUDF API.
q real-time UDFs¶
-
Real-time UDF functions can either take no arguments or the argument set of (tableName;tableData).
-
Real-time UDFs will have access to the get* APIs, that will operate on a local set of data. Temporal arguments are not required (startDate, endDate, startTime, endTime)
-
The data available to a RTUDF will be the set of data since the last trigger of the RTUDF
-
RTUDFs should output a table. If the output is not a table, it will be put into a 1x1 table with column name result
q syntax taking arguments - qUDF
{[tabName;data]
select avg price from data
}
q syntax, no arguments, using getTicks API - qUDF
{[]
select avg price from getTicks[`symList`assetClass`dataType!`VOD.L`equity`trade]
}
Trigger functions¶
-
Trigger functions operate on a table to determine whether a UDF should be run.
-
Trigger functions take a singular argument, which is the table listed in the trigTab field.
-
Trigger functions must output a boolean true or false, representing run UDF or not respectively.
Example trigger function - qTriggerFunc
{[data]
if[`mySym in data`sym;
:1b;
];
0b
}
In this example, we would execute our UDF in the event of my trigTab containing the sym `mySym.
Initialization functions¶
Initialization functions are run on boot of the UDF worker nodes to prepare the process for any UDF execution. These are useful for doing any one-time loads of configuration, libraries, or reference data.
- Initialization functions must take no arguments
Example initialization function - qInitFunc
{[]
system"l ",getenv[`UDF_LIBS_DIR],"/myDependency.q";
.myUDF.referenceTable:([]sym:`a`b;num:1 2)
}
In this example, we load in a q script containing a dependency and set a table that we can later reference in a namespace.
- Note: dependencies can be placed in the
UDF_LIBS_DIRwhich is located in the udfdir. Dependencies will then be included when the UDF package is exported via the CLI.
Python real-time UDFs¶
The real-time UDF framework supports Python 3 integration out of the box. Due to the need to map kdb tables to more useful formats, Python real-time UDFs support pre and post execution functions. These are q functions that operate on the inbound data and outbound result respectively before and after execution of the Python RTUDF. A basic use case is to convert a q table to a pandas dataframe.
Pre and post execution functions are optional. If unconfigured or left blank, no function will be run and input/output will be directly handed between q and Python.
Python UDFs can be defined in two formats:
- A simple function definition
- A method of a class
Python function syntax taking argument - pyUDF
def pyUDF(data):
return data
This function takes a data frame and returns it, but represents the data flowing through a Python function.
Note
The function name must match the file name.
Python method syntax taking argument - myMethod
class pyUDF:
def myMethod(self,data):
return data
Note
The class name must match the file name.
Note
The class should be instantiated either in the initialization function using the ml.q library, or at the bottom of the Python UDF script.
Note
Python UDFs must have .p extension to be loaded by embedpy (as oppose to .py).
Pre execution function - qToDataframePreExFunc
{[t;d] .ml.tab2df d}
This function makes use of the ml.q utility function to convert a q table to a dataframe.
Post execution function - dataframeToQPostExFunc
{[t;d] .ml.df2tab .p.wrap d}
This function makes use of the ml.q utility function to convert a dataframe to a q table.
Configuring pre and post execution functions for Python UDFs¶
The configuration parameter for pre/post execution functions on Python UDFs is separate from the main real-time config. The parameter is .daas.udf.pythonRTUDFConfig.
| Parameter | Description | Example |
|---|---|---|
| udfName | The name of the Python UDF | pythonUDF |
| preExFunc | Function to run before execution. Leave blank for none. | qToDataframePreExFunc |
| postExFunc | Function to run after execution. Leave blank for none. | dataframeToQPostExFunc |
| method | Method of class to be run, if not using a function. Leave blank if using function | myMethod |
This can be managed via the Refinery CLI.
Using custom Python library functions inside a UDF¶
A set of Python utility functions can be referenced in a Python UDF if they are imported on initialization. Consider the setup below
File structure:
user@machine:~/data/refinery/refinery/udf> tree
.
├── libraries
│ ├── libfunc.p
├── realtime
│ ├── myLibLoadInitFunc.q
│ ├── myPythonUDF.p
│ ├── myPythonUDF.txt
├── static
└── trigger
Refinery UDF configs:
/.daas.udf.realtimeConfig
UDF dataReq dataTabChannel triggered trigTab trigTabChannel trigCond trigFunc initFunc procNo
---------------------------------------------------------------------------------------------------------------------
myPythonUDF "`trade" testPipeline.0.ctp.0 0 "" myLibLoadInitFunc -1
/.daas.udf.pythonRTUDFConfig
udfName preExFunc postExFunc method
---------------------------------------
myPythonUDF
UDF definitions:
user@machine:~/data/refinery/refinery/udf> cat libraries/libfunc.p
def libfunc ():
print("libfunc")
x = 111
return x
user@machine:~/data/refinery/refinery/udf> cat realtime/myLibLoadInitFunc.q
{
system"l ",getenv[`UDF_LIBS_DIR],"/libfunc.p"
}
user@machine:~/data/refinery/refinery/udf> cat realtime/myPythonUDF.p
def myPythonUDF (data):
x = libfunc()
return x
With the above setup, we have a UDF myPythonUDF executing on every trade tick. It has its initialization function set to myLibLoadInitFunc which is a simple q script, pointing at a load of the libfunc.p Python script. This ensures that any function definitions inside libfunc.p are accessible from within the Python UDF.
Dependencies¶
Required - Python 3.6/3.7, KxEmbedPy (installable .tgz), Multiprocessing (Python library).
Recommended - ml.q (KX machine learning library), Pandas (Python library).
Recommended dependencies will enable smoother conversion between q tables and Python. See preExFunc example.
Parallel execution of Python UDFs¶
There are two methods for parallel execution of UDFs. This means that Python RT UDFs that operate on the same set of data set can be run simultaneously. This will be determined by UDFs that share a trigger function, data requirement, and pre-execution function.
Multithreading¶
The most optimal method of paralellisation makes use of the peach operator. By setting the realtime-udf-threads variable to number of threads in the system.yaml config, the real-time UDF node can use multithreading.
Note
Global operations cannot be used with multithreading.
Python multiprocessing¶
If the system is restricted for kdb cores, then the alternative method of parallel execution makes use of the multiprocessing module in Python to have a pool of Python processes running behind each embedPy worker node.
Note
The Python multiprocessing pool appears as child processes of the q worker node when viewed through ps -ef.
This can be turned on using the realtime-udf-python-muliprocess=true flag in the system.yaml config file. By default (no flag) this is false. If set to true, there will be no multithreading use of peach.
Python library management¶
Each Python real-time UDF may require different Python libraries, as well as different versions of the same libraries. To manage this UDFs can be stored in a nested tree under udf/realtime alongside a requirements.txt file.
A Python UDF will inherit the requirements.txt lower down in the tree if it does not have one higher up the tree (e.g. in its directory).
user@machineName:~/data/refinery/refinery/udf/realtime> tree
.
├── foo
│ ├── fizz
│ │ ├── pyfunc6.py
│ │ └── requirements.txt
│ ├── buzz
│ │ └── pyfunc5.py
│ ├── pyfunc4.p
│ ├── pyfunc3.p
│ └── requirements.txt
├── bar
│ ├── pyfunc2.p
│ ├── pyfunc1.p
│ └── requirements.txt
├── myInitFunc.q
└── myInitFunc.txt
In the above example, pyfunc1 and pyfunc2 will share a requirements.txt in bar. pyfunc3 , pyfunc4 and pyfunc5 will share a different requirements.txt in foo, whereas pyfunc6 will have its own, from foo/fizz. This is achieved with the use of venv.
When the UDF directory is set up as above, the UDF daemon will group UDFs by their requirements.txt and run the node under a venv.
This means that pyfunc1 and pyfunc3 cannot run on the same node, even if they share a data dependency. Therefore, procNo allocation must either be -1 (auto allocate) or set such that they are on seperate nodes. For example, the following allocations are valid:
| UDF | procNo |
|---|---|
| pyfunc1 | -1 |
| pyfunc2 | -1 |
| pyfunc3 | -1 |
| UDF | procNo |
|---|---|
| pyfunc1 | 1 |
| pyfunc2 | 1 |
| pyfunc3 | 2 |
| UDF | procNo |
|---|---|
| pyfunc1 | 1 |
| pyfunc2 | 2 |
| pyfunc3 | 3 |
Whereas, the following is not. The UDF deamon will error and refuse to boot.
| UDF | procNo |
|---|---|
| pyfunc1 | 1 |
| pyfunc2 | 2 |
| pyfunc3 | 2 |
venv creation is managed by the UDF daemon on startup. The venvs will live under refinery/venv. Any new requirements that are added to a requirement.txt will be installed/updated on the next startup of the UDF daemon.
Adding new functions¶
New UDFs can be added to the system and managed using the Refinery CLI. Usage of the refinery udf command is as follows:
--add Add new UDF
--funcName Name of the function
--funcType realtime / static / trigger
--file Location of UDF function
[--description] Location of text file with description OR written string description
[--language] To specify the language the UDF is written in. Parameter sub
--modify Update a UDF
--funcName
[--file] Location of new UDF function
[--description] Location of text file with description OR written string description
[--language] To specify the language the UDF is written in. Parameter sub
--delete Delete a UDF
--funcName Name of UDF to delete
--info List information on all UDFs available, including function type, code and description
[--funcName] Specify function name for information on this function only
--export Package up entire contents of UDF directory into an installable .tgz file
[--dir] Directory output package to (default is current working directory)
--import Install a .tgz UDF package
--file Location of .tgz file
--reload Force load any underling UDF directory changes into UDF processes. Automatically done by CLI operations
For example, adding my real-time UDF:
refinery udf --add --funcName qUDF --funcType realtime --file qUDF.q
And adding the trigger function to accompany it:
refinery udf --add --funcName qTriggerFunc --funcType trigger --file qTriggerFunc.q
Note
Initialization funcType is `realtime.
refinery udf --add --funcName qInitFunc --funcType realtime --file qInitFunc.q
Note
UDFs are parsed on loading into system to verify that they are structured correctly. Comment lines in q files may cause an error. It is suggested the q files be comment free, the accompanying .txt description file can be used to document.
Configuration¶
The realtime config describes the setup of real-time user-defined functions. It is named .daas.udf.realtimeConfig.
| Parameter | Description | Example |
|---|---|---|
| UDF | Name of function | myRealtimeUDF |
| dataReq | Table(s) required | `test`quote |
| dataTabChannel | Channel of the tickerplant that is publishing the required data | refinitivEquity.0.ctp.0 |
| triggered | 1b - runs the UDF when the trigger condition/triggerFunc is satisfied. 0b - runs the UDF on every update of the dataReq table |
1b |
| trigTab | Table to run trigger against | metadata |
| trigTabChannel | Channel of the tickerplant that is publishing the trigger table | refinitivEquity.0.ctp.0 |
| trigCond | kdb where clause parse tree as condition to satisfy on the trigger table. Should be left blank if the trigger function is used | enlist(=;`sym;enlist`VOD.L) |
| trigFunc | Name of the trigger function UDF to run on the trigger table | myTrigFunc |
| initFunc | UDF function to be run on boot of UDF process. Used to load in any configuration or dependencies once on startup | myInitFunc |
| procNo | Which process to allocate the UDF to. If set to -1 the system will auto-allocate for best performance. | -1 |
Configuration management¶
The realtimeConfig can be managed using the configuration management CLI with the Refinery configuration command and following syntax:
--list List editable config parameters
[--category] Only display config parameters of a specified category
--show Print out configuration parameter
--param Configuration parameter name
--export Export the configuration parameter to .csv
--param Configuration parameter name
[--dir] Output to specified location
--import Import and overwrite configuration parameter with new file
--param Configuration parameter name
--file Specified .csv file
[--do-not-validate] Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)
--delete-rows Deletes the last row of the configuration parameter
--param Configuration parameter name
[--index] Deletes specified indicies
--add-rows Adds rows to configuration parameter
--param Configuration parameter name
[--file] Loads rows from specified .csv file
[--<CONFIG_COLUMN_1>]
[--<CONFIG_COLUMN_2>]
[... ]
[--<CONFIG_COLUMN_N>] For configuration not loaded from file, arguments must be specified with a flag per column
[--do-not-validate] Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)
To see what the current real-time config looks like, use the command:
refinery configuration --show --param .daas.udf.realtimeConfig
You can then replace it with your new copy:
refinery configuration --import --param .daas.udf.realtimeConfig --file myNewConfig.csv
Querying results¶
You can access outputs of real-time UDFs through the getUDF API call. Parameter inputs are as follows:
| Parameter | Description | Mandatory | Example | Type |
|---|---|---|---|---|
| funcType | Must be set to `realtime | ✓ | realtime |
Symbol |
| funcName | The name of the UDF that produced the output | ✓ | qUDF |
Symbol |
| startTime | Start point of query | .z.p-01:00:00 |
timestamp | |
| endTime | End point of query | .z.p |
timestamp |
Note
If startTime and endTime are not provided, the most recent value for today for the requested funcName will be returned.
Note
If multiple results are within the startTime and endTime, they will joined via uj. If this fails due to incompatible outputs, they will be returned as a list.
Streaming results of real-time UDFs¶
Streaming subscriptions can be set up for real-time UDFs similarly to any other streaming call. The only parameters to hand to the getUDF call are funcName and funcType.
.stream.regSub[`getUDF;`funcName`funcType!(`qUDF;`realtime);period;callback;`;`user]
- Streaming calls will output in the full results table format, with timestamp, udfName, and result columns.
Framework architecture notes¶
The framework consists of a group of worker nodes that publish to a standard tick stack. The group of worker nodes are managed by a singular daemon. The group of worker nodes will be dynamically scaled to
- Minimize memory utilization
- Maximize CPU parallelization
This means that if two real-time UDFs share a common data set, they will be assigned to the same worker node to reduce data duplication. If they have distinct data sets, they will be on separate nodes to enable parallel execution.
You can overwritte this with manual configuration of the procNo field in the real-time configuration.
Data flushing within worker nodes
Subscribed data is held in memory and 'reserved' by a real-time UDF. Once a real-time UDF is executed, it will then only 'reserve' data from that time onwards. If no other UDF has claim to any previous data, it will be flushed from memory.
This means that each real-time UDF will only have access to data since its last execution.