User-defined functions (UDFs)

Introduction

The platform supports the implementation of user-defined functions (UDFs) within the system. This allows you to perform additional logic on multiple API query result sets (e.g. joining a getTicks return to a getStats return, then filtering) before returning results to the client.

UDFs come in two varieties: static UDFs and real-time UDFs.

Static UDFs operate as 'request/reply' function calls or, from another perspective, in the manner you would expect a traditional database query to operate. You execute the query (function call), you wait whilst that query is executed, and then the system returns your result.

Realtime UDFs operate in a streaming fashion. These functions are called each and every time new data enters the system and they also publish in real-time. They are designed for high performance real-time analytics and operate on a Publish/Subscribe ("pub/sub") paradigm.

UDF functional restrictions

In order to help prevent user code that could permanently harm the platform from being executed, all incoming UDF code is parsed and checked. The following restrictions are placed on UDFs:

  • must take 1 argument
  • at execution time, will only accept a dictionary input
  • no global variable/function references besides API functions (getStats, getTicks, etc.) and basic q functions (joins, some .Q utilities, some .z references)
  • no on-disk operations
  • no opening handles between processes (hopen)
  • no system calls
  • no string parsing (e.g. get"hopen 5346" will fail)
  • no "exit" calls

Static UDFs

UDFs may be inserted into the system with saveUDF, removed with deleteUDF, and executed with getUDF. The function getUDFInfo allows you to retrieve information (function names, function code, and descriptions) about extant UDFs. getUDFDescription returns the description of the UDF in a reader-friendly format. This is useful for commenting UDF code. All of these analytics are API calls that are accessible through both gateway and query router ingress points.

Administrative users have the ability to batch export/import UDFs through the Admin Dashboards; see the Admin Guide section for additional information.

Insert UDF

saveUDF is called to insert a user-defined function into the platform. Prior to insertion, the function code provided must be checked to ensure that it meets the conditions outlined in the UDF Functional Restrictions List above.

funcName argument

Argument ID: `funcName
Argument Type: Mandatory
Format: `functionName <atom>
Datatype: <symbol>

The name of the user-defined function.

Note

Reusing a name will overwrite the previous function instance.

func argument

Argument ID: `func
Argument Type: Mandatory
Datatype: <string> or <function>
Valid Inputs: "{ x[`a]+x[`b]}" or {[params]a:5+2;`testOutput}

The q code defining the UDF. This should either be a string parsing to a function or a function object.

Note

If this code defines a function that violates the UDF Functional Restrictions list, an error will be thrown during the saveUDF call and the function will not be saved.

Note

The code must be comment free as comments cause errors within a UDF. If commented code is required, it is recommended to insert the commented code into the description argument.

Code can be retrieved with getUDFInfo. See the examples section below for example function calls.

description argument

Argument ID: `description
Argument Type: Mandatory
Format: "Test description" <string>
Datatype: <string>
Valid Inputs: "Test description"

The description for the user-defined function; this should detail the input parameters and output for the UDF. Can be retrieved with getUDFInfo or getUDFDescription.

Note

If using the description argument to comment the UDF code, it is recommended to use getUDFDescription as this prints the argument in a reader-friendly format

Execute UDF

getUDF executes a stored UDF against a set of input parameters.

Note

If the input provided is not a single dictionary, then getUDF will throw an error and will not run.

funcName argument

Argument ID: `funcName
Argument Type: Mandatory
Format: `functionName <atom>
Datatype: <symbol>

This is the name of the user-defined function to execute. Referencing an undefined UDF will result in an error. See the examples section below for example function calls.

params argument

Argument ID: `params
Argument Type: Mandatory
Datatype: <dictionary>
Valid Inputs: `a`b`c ! ("this is argument 1"; `arg2; 20)

This is the input to the UDF.

`params must be a dictionary or the getUDF call will return an error.

Delete UDF

deleteUDF removes user-defined function(s) from the platform. The names must be specified directly; the standard `reference for "all functions" does not work here by design. The names of all currently defined UDFs can be obtained via getUDFInfo.

funcNames argument

Argument ID: `funcNames
Argument Type: Mandatory
Format: `functionName <atom> or `funcName1`funcName2... <symbol list>
Datatype: <symbol> or <symbol list>

The names of the user-defined function to delete. See the examples section below for example function calls.

Retrieve information on UDFs

getUDFInfo retrieves information on currently defined user-defined functions. This call will return a table with columns:

funcName: Name of the UDF <symbol>
funcExists: whether the given UDF exists in the system <Boolean>
funcCode: code for the UDF (if it exists) <string>
description: description of the UDF (if it exists) <string>

funcNames argument

Argument ID: `funcNames
Argument Type: Mandatory
Format: `functionName <atom> or `funcName1`funcName2... <symbol list>
Datatype: <symbol> or <symbol list>

The names of the user-defined function(s) about which to retrieve information.

Note

The empty symbol ` can be used to see info on all defined UDFs.

In q, single key dictionaries need to be enlisted, e.g. enlist[`funcNames]!enlist[`myUDF]. See the examples section below for example function calls.

Retrieve UDF description

getUDFDescription retrieves the description(s) of the provided UDF function name(s). It returns the description(s) in a reader-friendly format. This is useful for UDF descriptions that contain the UDF code commented.

Argument ID: `funcNames
Argument Type: Mandatory
Format: `functionName <atom> or `funcName1`funcName2... <symbol list>
Datatype: <symbol> or <symbol list>

The name(s) of the UDF(s) to retrieve the description of.

Example UDF calls

saveUDF[`funcName`func`description!(`testGetTicks;"{[dict]sD:dict`sD;eD:dict`eD;sT:dict`sT;eT:dict`eT;t:getTicks[`symList`dataType`assetClass`startDate`endDate`startTime`endTime!(`;`quote;`moneyMarket;sD;eD;sT;eT)]}";"This function is a wrapper for getTicks, it has 4 inputs, sD, ed, sT, eT which are startDate, endDate, startTime and endTime respectively.")]

Result: The UDF named testGetTicks is saved to disk, allowing it to be used by Refinery at a later date. A description for testGetTicks is also saved.


getUDF[`funcName`params!(`testGetTicks;(`sD`eD`sT`eT!(2018.08.08;2018.08.08;09:00:00.000;10:00:00.000)))]

Result: The newly created testGetTicks UDF is run using params as the function arguments.


getUDFInfo[(enlist`funcNames)!(enlist `testGetTicks)]

Result: The metadata of testGetTicks is returned to the user.


deleteUDF[(enlist`funcNames)!(enlist `testGetTicks)]

Result: The testGetTicks is deleted from disk.

Note

In q, single key dictionaries need to be enlisted, e.g. enlist[`funcNames]!enlist[`myUDF].

getUDFDescription[(enlist `funcNames)!(enlist `testGetTicks)]

Result: The description of testGetTicks is returned in a reader-friendly format.

Real-time UDFs

The real-time user-defined function (RTUDF) framework enables Refinery to run custom functions on data in real-time. The setup of this involves defining functions and setting configuration parameters appropriately. Once set up the results of these real-time UDFs will flow through a standard tick capture system, available for streaming and persistence.

The data is accessible historically via the getUDF API.

q Real-time UDFs

  • Real-time UDF functions can either take no arguments, or the argument set of (tableName;tableData).

  • Real-time UDFs will have access to the get* APIs, that will operate on a local set of data. Temporal arguments are not required (startDate, endDate, startTime, endTime)

  • The data available to a RTUDF will be the set of data since the last trigger of the RTUDF

  • RTUDFs should output a table. If the output is not a table, it will be put into a 1x1 table with column name result

q syntax taking arguments - qUDF

{[tabName;data]
    select avg price from data
    }

q syntax, no arguments, using getTicks api - qUDF

{[]
    select avg price from getTicks[`symList`assetClass`dataType!`VOD.L`equity`trade]
    }

Trigger functions

  • Trigger functions operate on a table to determine whether a UDF should be run.

  • Trigger functions take a singular argument which is the table listed in the trigTab field.

  • Trigger functions must output a boolean true or false, representing run udf or not respectively.

Example trigger function - qTriggerFunc

{[data]
    if[`mySym in data`sym;
        :1b;
        ];
    0b
    }

In this example, we would execute our UDF in the event of my trigTab containing the sym `mySym.

Initialization functions

Initialization functions are run on boot of the UDF worker nodes to prepare the process for any UDF execution. These are useful for doing any one-time loads of configuration, libraries, or reference data.

  • Initialization functions must take no arguments

Example initialization function - qInitFunc

{[]
    system"l ",getenv[`UDF_LIBS_DIR],"/myDependency.q";

    .myUDF.referenceTable:([]sym:`a`b;num:1 2)
    }

In this example, we load in a q script containing a dependency and set a table that we can later reference in a namespace.

  • Note: dependencies can be placed in the UDF_LIBS_DIR, which is located in the udfdir. Dependencies will then be included when the UDF package is exported via the CLI

Python real-time UDFs

The real-time UDF framework supports Python 3 integration out of the box. Due to the need to map kdb tables to more useful formats, Python real-time UDFs support pre and post execution functions. These are q functions that operate on the inbound data and outbound result respectively before and after execution of the Python RTUDF. A basic use case is to convert a q table to a pandas dataframe.

Pre- and post-execution functions are optional. If unconfigured or left blank, no function will be run and input/output will be directly handed between q and Ppython.

Parallel execution of Python UDFs

An added benefit of Python UDFs is the ability to parallelize the execution. The framework makes use of the multiprocessing module in Python to have a pool of Python processes running behind each embedPy worker node.

This means that Python RT UDFs that operate on identical datasets can be run simultaneously. This will be determined by UDFs that share a trigger function, data requirement, and pre-execution function.

Note

The Python multiprocessing pool appears as child processes of the q worker node when viewed through ps -ef.

Defining Python real-time UDFs

Python UDFs can be defined in two formats:

  • A simple function definition
  • A method of a class

Python function syntax taking argument - pyUDF

def pyUDF(data):
    return data

This function takes a data frame and returns it, but represents the data flowing through a Python function.

Note

The function name must match the file name.

Python method syntax taking argument - myMethod

class pyUDF:

    def myMethod(self,data):
        return data

Note

The class name must match the file name.

Note

The class should be instantiated either in the initialization function using the ml.q library, or at the bottom of the Python UDF script.

Note

Python UDFs must have .p extension to be loaded by embedpy (as oppose to .py).

Pre execution function - qToDataframePreExFunc

{[t;d] .ml.tab2df d}

This function makes use of the ml.q utility function to convert a q table to a dataframe.

Post execution function - dataframeToQPostExFunc

{[t;d] .ml.df2tab d}

This function makes use of the ml.q utility function to convert a dataframe to a q table.

Configuring pre and post execution functions for Python UDFs

The configuration parameter for pre/post execution functions on Python UDFs is separate to the main real-time config. The parameter is .daas.udf.pythonRTUDFConfig.

Parameter Description Example
udfName The name of the Python UDF pythonUDF
preExFunc Function to run before execution. Leave blank for none. qToDataframePreExFunc
postExFunc Function to run after execution. Leave blank for none. dataframeToQPostExFunc
method Method of class to be run, if not using a function. Leave blank if using function myMethod

This can be managed via the Refinery CLI.

Dependencies

Required - Python 3.6/3.7, KxEmbedPy (installable .tgz), Multiprocessing (python library).

Recommended - ml.q (Kx machine learning library), Pandas (python library).

Recommended dependencies will enable smoother conversion between q tables and Python. See preExFunc example.

title: Add New Functions

Adding new functions

New UDFs can be added to the system and managed using the Refinery CLI. Usage of the refinery udf command is as follows:

    --add                                       Add new UDF
        --funcName                              Name of the function
        --funcType                              real-time / static / trigger
        --file                                  Location of udf function
        [--description]                         Location of text file with description OR written string description
        [--language]                            To specify the language the udf is written in. Parameter sub

    --modify                                    Update a UDF
        --funcName
        [--file]                                Location of new udf function
        [--description]                         Location of text file with description OR written string description
        [--language]                            To specify the language the udf is written in. Parameter sub

    --delete                                    Delete a UDF
        --funcName                              Name of udf to delete

    --info                                      List information on all udfs available, including function type, code and description
        [--funcName]                            Specify function name for information on this function only

    --export                                    Package up entire contents of udf directory into an installable .tgz file
        [--dir]                                 Directory output package to (default is current working directory)

    --import                                    Install a .tgz udf package
        --file                                  Location of .tgz file

    --reload                                    Force load any underling udf directory changes into udf processes. Automatically done by CLI operations

For example, adding my real-time UDF:

refinery udf --add --funcName qUDF --funcType realtime --file qUDF.q

And adding the trigger function to accompany it:

refinery udf --add --funcName qTriggerFunc --funcType trigger --file qTriggerFunc.q

Note

Initialization funcType is `realtime.

refinery udf --add --funcName qInitFunc --funcType realtime --file qInitFunc.q

Note

UDFs are parsed on loading into system to verify that they are structured correctly. Comment lines in q files may cause an error. It is recommended that the q files be comment free; the accompanying .txt description file can be used for documentation purposes.

Configuration

The real-time config describes the setup of real-time user-defined functions. It is named .daas.udf.realtimeConfig.

Parameter Description Example
UDF Name of function myRealtimeUDF
dataReq Table(s) required `test`quote
dataTabMeta Dictionary of metadata for the required tables. Can be left blank if no metadata is needed `assetClass`region!`index`emea
triggered 1b - runs the UDF when the trigger condition/triggerFunc is satisfied.
0b - runs the UDF on every update of the dataReq table
1b
trigTab Table to run trigger against metadata
trigTabMeta Dictionary of metadata for the trigger table. Can be left blank if no metadata is needed `assetClass`region!`index`emea
trigCond kdb where clause parse tree as condition to satisfy on the trigger table. Should be left blank if the trigger function is used enlist(=;`sym;enlist`VOD.L)
trigFunc Name of the trigger function udf to run on the trigger table myTrigFunc
initFunc UDF function to be run on boot of udf process. Used to load in any configuration or dependencies once on startup myInitFunc
procNo Which process to allocate the UDF to. If set to -1 the system will auto allocate for best performance. -1

Configuration management

The realtimeConfig can be managed using the configuration management CLI with the Refinery configuration command and following syntax:

    --list                          List editable config parameters
        [--category]                Only display config parameters of a specified category

    --show                          Print out configuration parameter
        --param                     Configuration parameter name

    --export                        Export the configuration parameter to .csv
        --param                     Configuration parameter name
        [--dir]                     Output to specified location

    --import                        Import and overwrite configuration parameter with new file
        --param                     Configuration parameter name
        --file                      Specified .csv file
        [--do-not-validate]         Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)

    --delete-rows                   Deletes the last row of the configuration parameter
        --param                     Configuration parameter name
        [--index]                   Deletes specified indicies

    --add-rows                      Adds rows to configuration parameter
        --param                     Configuration parameter name
        [--file]                    Loads rows from specified .csv file
        [--<CONFIG_COLUMN_1>]
        [--<CONFIG_COLUMN_2>]
        [...                ]
        [--<CONFIG_COLUMN_N>]       For configuration not loaded from file, arguments must be specified with a flag per column
        [--do-not-validate]         Ignore validation to force import (WARNING: INCORRECT FORMAT CONFIGURATION MAY CAUSE ISSUES)

To see what the current real-time config looks like, use the command:

refinery configuration --show --param .daas.udf.realtimeConfig

You can then replace it with your new copy:

refinery configuration --import --param .daas.udf.realtimeConfig --file myNewConfig.csv

Querying results

Outputs of real-time UDFs can be accessed through the getUDF API call. Parameter inputs are as follows:

Parameter Description Mandatory Example Type
funcType Must be set to `realtime `realtime Symbol
funcName The name of the UDF that produced the output `qUDF Symbol
startTime Start point of query `.z.p-01:00:00 timestamp
endTime End point of query `.z.p timestamp

Note

If startTime and endTime are not provided, the most recent value for today for the requested funcName will be returned.

Note

If multiple results are within the startTime and endTime, they will joined via uj. If this fails due to incompatible outputs, they will be returned as a list.

Streaming results of real-time UDFs

Streaming subscriptions can be set up for real-time UDFs, similarly to any other streaming call. The only parameters to hand to the getUDF call are funcName and funcType.

.stream.regSub[`getUDF;`funcName`funcType!(`qUDF;`realtime);period;callback;`;`user]
  • Streaming calls will output in the full results table format, with timestamp, udfName, and result columns.

Framework architecture notes

The framework consists of a group of worker nodes that publish to a standard tick stack. The group of worker nodes are managed by a singular daemon. The group of worker nodes will be dynamically scaled to

  1. Minimise memory utilisation
  2. Maximise CPU parallelization

This means if two real-time UDFs share a common dataset, they will be assigned to the same worker node to reduce data duplication. If they have distinct datasets, they will be on separate nodes to enable parallel execution. This can be overwritten with manual configuration of the procNo field in the real-time configuration.

Data flushing within worker nodes

Subscribed data is held in memory and 'reserved' by a real-time UDF. Once a real-time UDF is executed, it will then only 'reserve' data from that time onwards. If no other UDF has claim to any previous data, it will be flushed from memory.

This means that each real-time UDF will only have access to data since it's last execution.