User-defined functions (UDFs)¶
Introduction¶
The platform supports the implementation of user-defined functions (UDFs) within the system. This allows you to perform additional logic on multiple API query result sets (e.g. joining a getTicks return to a getStats return, then filtering) before returning results to the client.
UDFs come in two varieties: static UDFs and real-time UDFs.
Static UDFs operate as 'request/reply' function calls or, from another perspective, in the manner you would expect a traditional database query to operate. You execute the query (function call), you wait whilst that query is executed, and then the system returns your result.
Realtime UDFs operate in a streaming fashion. These functions are called each and every time new data enters the system and they also publish in real-time. They are designed for high performance real-time analytics and operate on a Publish/Subscribe ("pub/sub") paradigm.
UDF functional restrictions¶
In order to help prevent user code that could permanently harm the platform from being executed, all incoming UDF code is parsed and checked. The following restrictions are placed on UDFs:
- must take 1 argument
- at execution time, will only accept a dictionary input
- no global variable/function references besides API functions (getStats, getTicks, etc.) and basic q functions (joins, some
.Qutilities, some.zreferences) - no on-disk operations
- no opening handles between processes (hopen)
- no system calls
- no string parsing (e.g. get"hopen 5346" will fail)
- no "exit" calls
Static UDFs ¶
UDFs may be inserted into the system with saveUDF, removed with deleteUDF, and executed with getUDF. The function getUDFInfo allows you to retrieve information (function names, function code, and descriptions) about extant UDFs. getUDFDescription returns the description of the UDF in a reader-friendly format. This is useful for commenting UDF code. All of these analytics are API calls that are accessible through both gateway and query router ingress points.
Administrative users have the ability to batch export/import UDFs through the Admin Dashboards; see the Admin Guide section for additional information.
Insert UDF¶
saveUDF is called to insert a user-defined function into the platform. Prior to insertion, the function code provided must be checked to ensure that it meets the conditions outlined in the UDF Functional Restrictions List above.
funcName argument¶
| Argument ID: | `funcName |
|---|---|
| Argument Type: | Mandatory |
| Format: | `functionName <atom> |
| Datatype: | <symbol> |
The name of the user-defined function.
Note
Reusing a name will overwrite the previous function instance.
func argument¶
| Argument ID: | `func |
|---|---|
| Argument Type: | Mandatory |
| Datatype: | <string> or <function> |
| Valid Inputs: | "{ x[`a]+x[`b]}" or {[params]a:5+2;`testOutput} |
The q code defining the UDF. This should either be a string parsing to a function or a function object.
Note
If this code defines a function that violates the UDF Functional Restrictions list, an error will be thrown during the saveUDF call and the function will not be saved.
Note
The code must be comment free as comments cause errors within a UDF. If commented code is required, it is recommended to insert the commented code into the description argument.
Code can be retrieved with getUDFInfo. See the examples section below for example function calls.
description argument¶
| Argument ID: | `description |
|---|---|
| Argument Type: | Mandatory |
| Format: | "Test description" <string> |
| Datatype: | <string> |
| Valid Inputs: | "Test description" |
The description for the user-defined function; this should detail the input parameters and output for the UDF. Can be retrieved with getUDFInfo or getUDFDescription.
Note
If using the description argument to comment the UDF code, it is recommended to use getUDFDescription as this prints the argument in a reader-friendly format
Execute UDF¶
getUDF executes a stored UDF against a set of input parameters.
Note
If the input provided is not a single dictionary, then getUDF will throw an error and will not run.
funcName argument¶
| Argument ID: | `funcName |
|---|---|
| Argument Type: | Mandatory |
| Format: | `functionName <atom> |
| Datatype: | <symbol> |
This is the name of the user-defined function to execute. Referencing an undefined UDF will result in an error. See the examples section below for example function calls.
params argument¶
| Argument ID: | `params |
|---|---|
| Argument Type: | Mandatory |
| Datatype: | <dictionary> |
| Valid Inputs: | `a`b`c ! ("this is argument 1"; `arg2; 20) |
This is the input to the UDF.
`params must be a dictionary or the getUDF call will return an error.
Delete UDF¶
deleteUDF removes user-defined function(s) from the platform. The names must be specified directly; the standard `reference for "all functions" does not work here by design. The names of all currently defined UDFs can be obtained via getUDFInfo.
funcNames argument¶
| Argument ID: | `funcNames |
|---|---|
| Argument Type: | Mandatory |
| Format: | `functionName <atom> or `funcName1`funcName2... <symbol list> |
| Datatype: | <symbol> or <symbol list> |
The names of the user-defined function to delete. See the examples section below for example function calls.
Retrieve information on UDFs¶
getUDFInfo retrieves information on currently defined user-defined functions. This call will return a table with columns:
| funcName: | Name of the UDF <symbol> |
|---|---|
| funcExists: | whether the given UDF exists in the system <Boolean> |
| funcCode: | code for the UDF (if it exists) <string> |
| description: | description of the UDF (if it exists) <string> |
funcNames argument¶
| Argument ID: | `funcNames |
|---|---|
| Argument Type: | Mandatory |
| Format: | `functionName <atom> or `funcName1`funcName2... <symbol list> |
| Datatype: | <symbol> or <symbol list> |
The names of the user-defined function(s) about which to retrieve information.
Note
The empty symbol ` can be used to see info on all defined UDFs.
In q, single key dictionaries need to be enlisted, e.g. enlist[`funcNames]!enlist[`myUDF]. See the examples section below for example function calls.
Retrieve UDF description¶
getUDFDescription retrieves the description(s) of the provided UDF function name(s). It returns the description(s) in a reader-friendly format. This is useful for UDF descriptions that contain the UDF code commented.
| Argument ID: | `funcNames |
|---|---|
| Argument Type: | Mandatory |
| Format: | `functionName <atom> or `funcName1`funcName2... <symbol list> |
| Datatype: | <symbol> or <symbol list> |
The name(s) of the UDF(s) to retrieve the description of.
Example UDF calls¶
saveUDF[`funcName`func`description!(`testGetTicks;"{[dict]sD:dict`sD;eD:dict`eD;sT:dict`sT;eT:dict`eT;t:getTicks[`symList`dataType`assetClass`startDate`endDate`startTime`endTime!(`;`quote;`moneyMarket;sD;eD;sT;eT)]}";"This function is a wrapper for getTicks, it has 4 inputs, sD, ed, sT, eT which are startDate, endDate, startTime and endTime respectively.")]
Result: The UDF named testGetTicks is saved to disk, allowing it to be used by Refinery at a later date. A description for testGetTicks is also saved.
getUDF[`funcName`params!(`testGetTicks;(`sD`eD`sT`eT!(2018.08.08;2018.08.08;09:00:00.000;10:00:00.000)))]
Result: The newly created testGetTicks UDF is run using params as the function arguments.
getUDFInfo[(enlist`funcNames)!(enlist `testGetTicks)]
Result: The metadata of testGetTicks is returned to the user.
deleteUDF[(enlist`funcNames)!(enlist `testGetTicks)]
Result: The testGetTicks is deleted from disk.
Note
In q, single key dictionaries need to be enlisted, e.g. enlist[`funcNames]!enlist[`myUDF].
getUDFDescription[(enlist `funcNames)!(enlist `testGetTicks)]
Result: The description of testGetTicks is returned in a reader-friendly format.
Real-time UDFs ¶
The real-time user-defined function (RTUDF) framework enables Refinery to run custom functions on data in real-time. The setup of this involves defining functions and setting configuration parameters appropriately. Once set up the results of these real-time UDFs will flow through a standard tick capture system, available for streaming and persistence.
The data is accessible historically via the getUDF API.
q Real-time UDFs¶
-
Real-time UDF functions can either take no arguments, or the argument set of (tableName;tableData).
-
Real-time UDFs will have access to the get* APIs, that will operate on a local set of data. Temporal arguments are not required (startDate, endDate, startTime, endTime)
-
The data available to a RTUDF will be the set of data since the last trigger of the RTUDF
-
RTUDFs should output a table. If the output is not a table, it will be put into a 1x1 table with column name result
q syntax taking arguments - qUDF¶
{[tabName;data]
select avg price from data
}
q syntax, no arguments, using getTicks api - qUDF¶
{[]
select avg price from getTicks[`symList`assetClass`dataType!`VOD.L`equity`trade]
}
Trigger functions¶
-
Trigger functions operate on a table to determine whether a UDF should be run.
-
Trigger functions take a singular argument which is the table listed in the trigTab field.
-
Trigger functions must output a boolean true or false, representing run udf or not respectively.
Example trigger function - qTriggerFunc¶
{[data]
if[`mySym in data`sym;
:1b;
];
0b
}
In this example, we would execute our UDF in the event of my trigTab containing the sym `mySym.
Initialization functions¶
Initialization functions are run on boot of the UDF worker nodes to prepare the process for any UDF execution. These are useful for doing any one-time loads of configuration, libraries, or reference data.
- Initialization functions must take no arguments
Example initialization function - qInitFunc¶
{[]
system"l ",getenv[`UDF_LIBS_DIR],"/myDependency.q";
.myUDF.referenceTable:([]sym:`a`b;num:1 2)
}
In this example, we load in a q script containing a dependency and set a table that we can later reference in a namespace.
- Note: dependencies can be placed in the
UDF_LIBS_DIR, which is located in the udfdir. Dependencies will then be included when the UDF package is exported via the CLI
Python real-time UDFs¶
The real-time UDF framework supports Python 3 integration out of the box. Due to the need to map kdb tables to more useful formats, Python real-time UDFs support pre and post execution functions. These are q functions that operate on the inbound data and outbound result respectively before and after execution of the Python RTUDF. A basic use case is to convert a q table to a pandas dataframe.
Pre- and post-execution functions are optional. If unconfigured or left blank, no function will be run and input/output will be directly handed between q and Ppython.
Parallel execution of Python UDFs¶
An added benefit of Python UDFs is the ability to parallelize the execution. The framework makes use of the multiprocessing module in Python to have a pool of Python processes running behind each embedPy worker node.
This means that Python RT UDFs that operate on identical datasets can be run simultaneously. This will be determined by UDFs that share a trigger function, data requirement, and pre-execution function.
Note
The Python multiprocessing pool appears as child processes of the q worker node when viewed through ps -ef.
Defining Python real-time UDFs¶
Python UDFs can be defined in two formats:
- A simple function definition
- A method of a class
Python function syntax taking argument - pyUDF¶
def pyUDF(data):
return data
This function takes a data frame and returns it, but represents the data flowing through a Python function.
Note
The function name must match the file name.
Python method syntax taking argument - myMethod¶
class pyUDF:
def myMethod(self,data):
return data
Note
The class name must match the file name.
Note
The class should be instantiated either in the initialization function using the ml.q library, or at the bottom of the Python UDF script.
Note
Python UDFs must have .p extension to be loaded by embedpy (as oppose to .py).
Pre execution function - qToDataframePreExFunc¶
{[t;d] .ml.tab2df d}
This function makes use of the ml.q utility function to convert a q table to a dataframe.
Post execution function - dataframeToQPostExFunc¶
{[t;d] .ml.df2tab d}
This function makes use of the ml.q utility function to convert a dataframe to a q table.
Configuring pre and post execution functions for Python UDFs¶
The configuration parameter for pre/post execution functions on Python UDFs is separate to the main real-time config. The parameter is .daas.udf.pythonRTUDFConfig.
| Parameter | Description | Example |
|---|---|---|
| udfName | The name of the Python UDF | pythonUDF |
| preExFunc | Function to run before execution. Leave blank for none. | qToDataframePreExFunc |
| postExFunc | Function to run after execution. Leave blank for none. | dataframeToQPostExFunc |
| method | Method of class to be run, if not using a function. Leave blank if using function | myMethod |
This can be managed via the Refinery CLI.
Dependencies¶
Required - Python 3.6/3.7, KxEmbedPy (installable .tgz), Multiprocessing (python library).
Recommended - ml.q (Kx machine learning library), Pandas (python library).
Recommended dependencies will enable smoother conversion between q tables and Python. See preExFunc example.
title: Add New Functions
Adding new functions¶
New UDFs can be added to the system and managed using the Refinery CLI. Usage of the refinery udf command is as follows:
--add Add new UDF
--funcName Name of the function
--funcType real-time / static / trigger
--file Location of udf function
[--description] Location of text file with description OR written string description
[--language] To specify the language the udf is written in. Parameter sub
--modify Update a UDF
--funcName
[--file] Location of new udf function
[--description] Location of text file with description OR written string description
[--language] To specify the language the udf is written in. Parameter sub
--delete Delete a UDF
--funcName Name of udf to delete
--info List information on all udfs available, including function type, code and description
[--funcName] Specify function name for information on this function only
--export Package up entire contents of udf directory into an installable .tgz file
[--dir] Directory output package to (default is current working directory)
--import Install a .tgz udf package
--file Location of .tgz file
--reload Force load any underling udf directory changes into udf processes. Automatically done by CLI operations
For example, adding my real-time UDF:
refinery udf --add --funcName qUDF --funcType realtime --file qUDF.q
And adding the trigger function to accompany it:
refinery udf --add --funcName qTriggerFunc --funcType trigger --file qTriggerFunc.q
Note
Initialization funcType is `realtime.
refinery udf --add --funcName qInitFunc --funcType realtime --file qInitFunc.q
Note
UDFs are parsed on loading into system to verify that they are structured correctly. Comment lines in q files may cause an error. It is recommended that the q files be comment free; the accompanying .txt description file can be used for documentation purposes.
Configuration¶
The real-time config describes the setup of real-time user-defined functions. It is named .daas.udf.realtimeConfig.
| Parameter | Description | Example |
|---|---|---|
| UDF | Name of function | myRealtimeUDF |
| dataReq | Table(s) required | `test`quote |
| dataTabMeta | Dictionary of metadata for the required tables. Can be left blank if no metadata is needed | `assetClass`region!`index`emea |
| triggered | 1b - runs the UDF when the trigger condition/triggerFunc is satisfied. 0b - runs the UDF on every update of the dataReq table |
1b |
| trigTab | Table to run trigger against | metadata |
| trigTabMeta | Dictionary of metadata for the trigger table. Can be left blank if no metadata is needed | `assetClass`region!`index`emea |
| trigCond | kdb where clause parse tree as condition to satisfy on the trigger table. Should be left blank if the trigger function is used | enlist(=;`sym;enlist`VOD.L) |
| trigFunc | Name of the trigger function udf to run on the trigger table | myTrigFunc |
| initFunc | UDF function to be run on boot of udf process. Used to load in any configuration or dependencies once on startup | myInitFunc |
| procNo | Which process to allocate the UDF to. If set to -1 the system will auto allocate for best performance. | -1 |
Configuration management¶
The realtimeConfig can be managed using the configuration management CLI with the Refinery configuration command and following syntax:
--list List editable config parameters
[--category] Only display config parameters of a specified category
--show Print out configuration parameter
--param Configuration parameter name
--export Export the configuration parameter to .csv
--param Configuration parameter name
[--dir] Output to specified location
--import Import and overwrite configuration parameter with new file
--param Configuration parameter name
--file Specified .csv file
[--do-not-validate] Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)
--delete-rows Deletes the last row of the configuration parameter
--param Configuration parameter name
[--index] Deletes specified indicies
--add-rows Adds rows to configuration parameter
--param Configuration parameter name
[--file] Loads rows from specified .csv file
[--<CONFIG_COLUMN_1>]
[--<CONFIG_COLUMN_2>]
[... ]
[--<CONFIG_COLUMN_N>] For configuration not loaded from file, arguments must be specified with a flag per column
[--do-not-validate] Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)
To see what the current real-time config looks like, use the command:
refinery configuration --show --param .daas.udf.realtimeConfig
You can then replace it with your new copy:
refinery configuration --import --param .daas.udf.realtimeConfig --file myNewConfig.csv
Querying results¶
Outputs of real-time UDFs can be accessed through the getUDF API call. Parameter inputs are as follows:
| Parameter | Description | Mandatory | Example | Type |
|---|---|---|---|---|
| funcType | Must be set to `realtime |
✓ | `realtime |
Symbol |
| funcName | The name of the UDF that produced the output | ✓ | `qUDF |
Symbol |
| startTime | Start point of query | `.z.p-01:00:00 |
timestamp | |
| endTime | End point of query | `.z.p |
timestamp |
Note
If startTime and endTime are not provided, the most recent value for today for the requested funcName will be returned.
Note
If multiple results are within the startTime and endTime, they will joined via uj. If this fails due to incompatible outputs, they will be returned as a list.
Streaming results of real-time UDFs¶
Streaming subscriptions can be set up for real-time UDFs, similarly to any other streaming call. The only parameters to hand to the getUDF call are funcName and funcType.
.stream.regSub[`getUDF;`funcName`funcType!(`qUDF;`realtime);period;callback;`;`user]
- Streaming calls will output in the full results table format, with timestamp, udfName, and result columns.
Framework architecture notes¶
The framework consists of a group of worker nodes that publish to a standard tick stack. The group of worker nodes are managed by a singular daemon. The group of worker nodes will be dynamically scaled to
- Minimise memory utilisation
- Maximise CPU parallelization
This means if two real-time UDFs share a common dataset, they will be assigned to the same worker node to reduce data duplication. If they have distinct datasets, they will be on separate nodes to enable parallel execution. This can be overwritten with manual configuration of the procNo field in the real-time configuration.
Data flushing within worker nodes¶
Subscribed data is held in memory and 'reserved' by a real-time UDF. Once a real-time UDF is executed, it will then only 'reserve' data from that time onwards. If no other UDF has claim to any previous data, it will be flushed from memory.
This means that each real-time UDF will only have access to data since it's last execution.