User-Defined Functions
A User-Defined Function within the stream processor is a function providing a user the ability to deploy custom logic to the Stream Processor which is contained within a versioned package. This functionality provides an option for a user to deploy logic to the Stream Processor which has been defined and bundled externally to the system and which can replace a users manually defined custom map, filter or merge nodes.
.qsp.udf
A variadic api for retrieving user defined functions
.qsp.udf[name;package]
.qsp.udf[name;package;.qsp.use enlist[`version]!enlist version]
Parameters:
name | q type | description |
---|---|---|
name | string | User Defined Function name. |
package | string | Package where the UDF is located. |
options:
name | q type | description | default |
---|---|---|---|
version | string | Version of package e.g. "1.2.4" . |
:: , if this parameter is not supplied the latest package is used. |
params | dict | Parameters associated to the UDF. | ()!() , if this parameter is not supplied an empty dictionary is used as final parameter. |
Returns:
type | description |
---|---|
fn |
Returns a User-Defined Function. |
This allows the user to import custom code from a package directory into
a stream processor worker to be deployed via the various sp operators
that act on functions. For example, .qsp.map
, .qsp.filter
,
and .qsp.merge
.
In order to retrieve the UDFs stored in a package directory we must
specify the KX_PACKAGE_PATH
this can be done either prior to starting
a stream processor or in process via the q command
setenv[`KX_PACKAGE_PATH;"path/to/packages"]
Here we show a simple example, of calling a UDF named mid
within the fin
package, taking the latest version, with an empty set of
parameters ()!()
:
tob:([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247);
udf:.qsp.udf["mid";"fin"]
udf tob
Returning the output:
mid
------
1.2455
1.246
1.2465
These UDFs are designed to be deployed within stream processor operators as
follows. In this example we deploy the above UDF via the .qsp.map
operator.
pipeline: .qsp.read.fromCallback[`publish]
.qsp.map[.qsp.udf["mid";"fin";.qsp.use enlist[`version]!enlist "1.0.0"]]
.qsp.write.toConsole[];
.qsp.run pipeline;
publish ([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247)
Returning the output:
| mid
-----------------------------| ------
2021.10.02D11:54:10.622299200| 1.2455
2021.10.02D11:54:10.622299200| 1.246
2021.10.02D11:54:10.622299200| 1.2465