Skip to content

User-Defined Functions

A User-Defined Function within the stream processor is a function providing a user the ability to deploy custom logic to the Stream Processor which is contained within a versioned package. This functionality provides an option for a user to deploy logic to the Stream Processor which has been defined and bundled externally to the system and which can replace a users manually defined custom map, filter or merge nodes.

.qsp.udf

A variadic api for retrieving user defined functions

.qsp.udf[name;package]
.qsp.udf[name;package;.qsp.use enlist[`version]!enlist version]

Parameters:

name q type description
name string User Defined Function name.
package string Package where the UDF is located.

options:

name q type description default
version string Version of package e.g. "1.2.4". ::, if this parameter is not supplied the latest package is used.
params dict Parameters associated to the UDF. ()!(), if this parameter is not supplied an empty dictionary is used as final parameter.

Returns:

type description
fn Returns a User-Defined Function.

This allows the user to import custom code from a package directory into a stream processor worker to be deployed via the various sp operators that act on functions. For example, .qsp.map, .qsp.filter, and .qsp.merge.

In order to retrieve the UDFs stored in a package directory we must specify the KX_PACKAGE_PATH this can be done either prior to starting a stream processor or in process via the q command

setenv[`KX_PACKAGE_PATH;"path/to/packages"]

Here we show a simple example, of calling a UDF named mid within the fin package, taking the latest version, with an empty set of parameters ()!():

tob:([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247);
udf:.qsp.udf["mid";"fin"]
udf tob

Returning the output:

mid
------
1.2455
1.246
1.2465

These UDFs are designed to be deployed within stream processor operators as follows. In this example we deploy the above UDF via the .qsp.map operator.

pipeline: .qsp.read.fromCallback[`publish]
          .qsp.map[.qsp.udf["mid";"fin";.qsp.use enlist[`version]!enlist "1.0.0"]]
          .qsp.write.toConsole[];

.qsp.run pipeline;

publish ([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247)

Returning the output:

                             | mid 
-----------------------------| ------
2021.10.02D11:54:10.622299200| 1.2455
2021.10.02D11:54:10.622299200| 1.246
2021.10.02D11:54:10.622299200| 1.2465