User-Defined Functions
This page explains how to use user-defined functions in a pipeline.
A User-Defined Function within the stream processor is a function providing a user the ability to deploy custom logic to the Stream Processor which is contained within a versioned package. This functionality provides an option for a user to deploy logic to the Stream Processor which has been defined and bundled externally to the system and which can replace a users manually defined apply, filter, map, and merge nodes.
User-Defined Functions
Retrieve a User-Defined-Function (UDF) from a defined KX_PACKAGE_PATH
package directory.
.qsp.udf[name;package]
.qsp.udf[name;package;.qsp.use enlist[`version]!enlist version]
Parameters:
name | q type | description |
---|---|---|
name | string | User Defined Function name. |
package | string | Package where the UDF is located. |
options:
name | q type | description | default |
---|---|---|---|
version | string | Version of package e.g. "1.2.4" . |
:: , if this parameter is not supplied the latest package is used. |
params | dict | Parameters associated to the UDF. | ()!() , if this parameter is not supplied an empty dictionary is used as final parameter. |
Returns:
type | description |
---|---|
fn |
Returns a User-Defined Function. |
This allows the user to import custom code from a package directory into
a stream processor worker to be deployed via the various sp operators
that act on functions. For example, .qsp.map
, .qsp.filter
,
and .qsp.merge
.
In order to retrieve the UDFs stored in a package directory we must
specify the KX_PACKAGE_PATH
this can be done either prior to starting
a stream processor or in process via the q command
setenv[`KX_PACKAGE_PATH;"path/to/packages"]
Here we show a simple example, of calling a UDF named mid
within the fin
package, taking the latest version, with an empty set of
parameters ()!()
:
tob:([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247);
udf:.qsp.udf["mid";"fin"]
udf tob
Returning the output:
mid
------
1.2455
1.246
1.2465
These UDFs are designed to be deployed within stream processor operators as
follows. In this example we deploy the above UDF via the .qsp.map
operator.
pipeline: .qsp.read.fromCallback[`publish]
.qsp.map[.qsp.udf["mid";"fin";.qsp.use enlist[`version]!enlist "1.0.0"]]
.qsp.write.toConsole[];
.qsp.run pipeline;
publish ([]bid:1.245 1.245 1.246;ask:1.246 1.247 1.247)
Returning the output:
| mid
-----------------------------| ------
2021.10.02D11:54:10.622299200| 1.2455
2021.10.02D11:54:10.622299200| 1.246
2021.10.02D11:54:10.622299200| 1.2465
sp.udf('map_udf', 'test', '1.0.0', {'param':10})
Parameters:
name | q type | description |
---|---|---|
name | string | Name of the UDF that is to be retrieved |
package | string | Name of the package from which to retrieve a UDF |
options:
name | q type | description | default |
---|---|---|---|
version | string | Version of the package from which to retrieve the UDF | :: , if this parameter is not supplied the latest package is used. |
params | dict | Additional optional input information to the UDF passed as the final argument to the function | ()!() , if this parameter is not supplied an empty dictionary is used as final parameter. |
Returns: A function which is to be used as a UDF within a merge, filter, map operator
Examples: Retrieve a UDF providing all necessary information
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function (in this example the UDF adds 10 to input)
>>> udf = sp.udf('map_udf', 'test', '1.0.0', {'param':10})
>>> # Use the UDF within SP operator
>>> sp.run(sp.read.from_callback('publish')
| sp.map(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x x1
-----------------
10.49318 10.39275
10.57852 10.51709
10.08389 10.51598
10.19599 10.40666
10.37564 10.17808
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> # Set path for package retrieval
>>> kx.q.setenv('KX_PACKAGE_PATH', b'/tmp/packages')
>>> # Retrieve the UDF function
>>> udf = sp.udf('filter_udf', 'test')
>>> # Use the UDF within SP operator (this UDF filters data for x<0.5)
>>> sp.run(sp.read.from_callback('publish')
| sp.filter(udf)
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.rand(5),
'x1': np.random.rand(5)
})
>>> kx.q('publish', data)
x x1
--------------------
0.07347808 0.7263142
0.3159526 0.9216436
0.3410485 0.1809536