Skip to content

User-Defined Functions

This page explains how to use user-defined functions in a pipeline.

A User-Defined Function within the stream processor is a function providing a user the ability to deploy custom logic to the Stream Processor which is contained within a versioned package. This functionality provides an option for a user to deploy logic to the Stream Processor which has been defined and bundled externally to the system and which can replace a users manually defined apply, filter, map, and merge nodes.

User-Defined Functions

Retrieve a User-Defined-Function (UDF) from a defined KX_PACKAGE_PATH package directory.

.qsp.udf[name;package]
.qsp.udf[name;package;.qsp.use enlist[`version]!enlist version]

Parameters:

name q type description
name string User Defined Function name.
package string Package where the UDF is located.

options:

name q type description default
version string Version of package e.g. "1.2.4". ::, if this parameter is not supplied the latest package is used.
params dict Parameters associated to the UDF. ()!(), if this parameter is not supplied an empty dictionary is used as final parameter.

Returns:

type description
fn Returns a User-Defined Function.

This allows the user to import custom code from a package directory into a stream processor worker to be deployed via the various sp operators that act on functions. For example, .qsp.map, .qsp.filter, and .qsp.merge.

In order to retrieve the UDFs stored in a package directory we must specify the KX_PACKAGE_PATH this can be done either prior to starting a stream processor or in process via the q command

setenv[`KX_PACKAGE_PATH;"path/to/packages"]

Here we show a simple example, of calling a UDF named mid within the fin package, taking the latest version, with an empty set of parameters ()!():

tob:([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247);
udf:.qsp.udf["mid";"fin"]
udf tob

Returning the output:

mid
------
1.2455
1.246
1.2465

These UDFs are designed to be deployed within stream processor operators as follows. In this example we deploy the above UDF via the .qsp.map operator.

pipeline: .qsp.read.fromCallback[`publish]
          .qsp.map[.qsp.udf["mid";"fin";.qsp.use enlist[`version]!enlist "1.0.0"]]
          .qsp.write.toConsole[];

.qsp.run pipeline;

publish ([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247)

Returning the output:

                             | mid 
-----------------------------| ------
2021.10.02D11:54:10.622299200| 1.2455
2021.10.02D11:54:10.622299200| 1.246
2021.10.02D11:54:10.622299200| 1.2465
sp.udf('map_udf', 'test', '1.0.0', {'param':10})

Parameters:

name q type description
name string Name of the UDF that is to be retrieved
package string Name of the package from which to retrieve a UDF

options:

name q type description default
version string Version of the package from which to retrieve the UDF ::, if this parameter is not supplied the latest package is used.
params dict Additional optional input information to the UDF passed as the final argument to the function ()!(), if this parameter is not supplied an empty dictionary is used as final parameter.

Returns: A function which is to be used as a UDF within a merge, filter, map operator

Examples: Retrieve a UDF providing all necessary information

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function (in this example the UDF adds 10 to input)
>>> udf = sp.udf('map_udf', 'test', '1.0.0', {'param':10})
>>> # Use the UDF within SP operator
>>> sp.run(sp.read.from_callback('publish')
        | sp.map(udf)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.rand(5),
        'x1': np.random.rand(5)
    })
>>> kx.q('publish', data)
x        x1
-----------------
10.49318 10.39275
10.57852 10.51709
10.08389 10.51598
10.19599 10.40666
10.37564 10.17808
Retrieve a UDF using latest package and default parameters
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function
>>> udf = sp.udf('filter_udf', 'test')
>>> # Use the UDF within SP operator (this UDF filters data for x<0.5)
>>> sp.run(sp.read.from_callback('publish')
        | sp.filter(udf)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.rand(5),
        'x1': np.random.rand(5)
    })
>>> kx.q('publish', data)
x          x1
--------------------
0.07347808 0.7263142
0.3159526  0.9216436
0.3410485  0.1809536