Transform
This page provides the pre-defined data transformation methods available. For custom transformations, a Map Node can be used.
Fill
(Beta Feature) Fills null values in table columns.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
. See Beta Feature Usage Terms.
.qsp.transform.fill[defaults]
.qsp.transform.fill[defaults;mode]
Parameters:
name | type | description | default |
---|---|---|---|
defaults | dict | Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column. | Required |
mode | symbol | Specifies the approach to use when filling. One of static , down , or up . |
static |
For all common arguments, refer to configuring operators
This operator fills null values in a table either statically, up, or down, and utilizes specified defaults.
static
- every null value in a column is replaced with its corresponding default.up
- up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default.down
- down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default. Plugin state is managed to have down-fill carry over across batches. Currently, only filling vector columns is supported, and all default values to fill with must be atomic. The type of elements in a vector column should match the type of the atomic fill value, otherwise the column is implicitly cast to match the type of the fill value.
Using fill-static on a table with null entries:
input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3: 0N 5 0N 5 0N)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.fill[`val1`val2`val3!(-1;"_"; -10)]
.qsp.write.toVariable[`output];
publish input
output
val1 val2 val3
--------------
-1 a -10
1 _ 5
2 b -10
-1 _ 5
3 c -10
Using fill-down on a table with null entries:
input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3: 0N 5 0N 5 0N)
dict: `val1`val2`val3!(-1;"_"; -10)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.fill[dict;`down]
.qsp.write.toVariable[`output];
publish input
output
val1 val2 val3
--------------
-1 a -10
1 a 5
2 b 5
2 b 5
3 c 5
sp.transform.fill({'price': 0.0})
Parameters:
name | type | description | default |
---|---|---|---|
defaults | dict | Dictionary of keys and values to replace nulls. Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column. | Required |
mode | symbol | Alternatively you may specify a mode for replacement of nulls. Specifies the approach to use when filling. One of static , down , or up . In static mode every null value in a column is replaced with its corresponding default. In up mode we do an up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default. Finally, in down mode we do a down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default |
static |
Returns:
A fill
transformer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.fill({'x': 1})
| sp.write.to_variable('out'))
>>> kx.q("publish", pd.DataFrame({'x': [None, 2, None]}))
>>> kx.q('out')
x
-
1
2
1
Rename Columns
Renames columns in the incoming data
.qsp.transform.renameColumns[nameScheme]
Parameters:
name | type | description | default |
---|---|---|---|
nameScheme | dictionary | A dictionary mapping current column names to what they should be renamed to | Required |
For all common arguments, refer to configuring operators
This example transforms data with column names "a" and "b" to have columns "c" and "d"
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.renameColumns[`a`b!`c`d]
.qsp.write.toVariable[`output];
publish ([] a: til 5; b: til 5);
output
c d
---
0 0
1 1
2 2
3 3
4 4
sp.transform.rename_columns({'price': 'x', 'quantity': 'y'})
Parameters:
name | type | description | default |
---|---|---|---|
nameScheme | dictionary | A dictionary defining the columns to be renamed and their new names. | Required |
Returns:
A rename
transformer, which can be joined to other operators or pipelines.
Examples: Renames the columns in a batch of data (must be a table)
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.rename_columns({'a': 'x', 'b': 'y'})
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'a': np.random.randn(10),
'b': np.random.randn(10),
'z': np.random.randn(10)
})
>>> kx.q('publish', data)
>>> kx.q('out')
x y z
---------------------------------
-0.8346907 -0.6609932 -1.807997
0.9907844 -0.2233735 0.7030451
2.021636 0.8098854 0.3018809
0.2035373 1.373164 0.8300498
-0.1250729 1.424175 -0.07679308
2.156653 -0.2182297 -0.458493
-0.2572752 -2.013144 1.027509
2.010659 1.397499 -0.7732002
-1.143055 -1.739128 0.8732918
-0.4201101 0.635765 0.5426539
Replace Infinity
Replaces positive/negative infinite values with the max/min values from each columns.
.qsp.transform.replaceInfinity[X]
.qsp.transform.replaceInfinity[X; .qsp.use enlist[`newCol]!enlist newCol]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] | Symbol or list of symbols indicating the columns to act on. |
options:
name | type | description | default |
---|---|---|---|
newCol | boolean | Boolean indicating if additional columns are to be added indicating which entries were infinities. 1b for yes 0b for no. |
0b |
For all common arguments, refer to configuring operators
This operator replaces infinite values in the specified columns. This allows the ± infinities to be replaced by the running maximum/minimum for the column.
Operating restrictions
If the first value received for a column is infinite, an error will be thrown as there is no value to replace it with.
This pipeline replaces infinities in the columns x
and x1
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceInfinity[`x`x1; .qsp.use ``newCol!11b]
.qsp.write.toConsole[];
publish ([] x: 1 3 4 0w; x1: 1 -0W 0 -0W; x2: til 4);
sp.transform.replace_infinity(['price', 'quantity']
Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.
Parameters:
name | type | description |
---|---|---|
columns | symbol or symbol[] | Column(s) to apply infinity replace to in string format. |
options:
name | type | description | default |
---|---|---|---|
new_column | boolean | Boolean indicating whether to add new column (True ) which indicates the presence of infinite values for a given row. |
0b |
Returns:
An infinity_replace
transformer, which can be joined to other operators or pipelines.
Examples: Replaces the infinite values in a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.replace_infinity(['x', 'x1'])
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.randn(10),
'x1': np.random.randn(10),
'x2': np.random.randn(10)
})
>>> data['x'][9] = np.inf
>>> kx.q('publish' , data)
>>> kx.q('out')
x x1 x2
---------------------------------
1.122557 1.091329 -0.9192559
-0.2272307 0.1437045 -1.314342
1.852872 -0.8079581 -0.04237484
-1.311179 -0.4326425 1.383893
-0.7943273 -1.13575 -0.437631
0.1900052 -1.223518 -0.9580604
0.2486013 -1.104549 0.8776486
1.109844 0.380195 0.898798
0.9993286 0.4093832 -1.477295
1.852872 0.7642802 0.4787892
Replace Null
Replaces null values using the median from each of the columns.
.qsp.transform.replaceNull[X]
.qsp.transform.replaceNull[X; .qsp.use (!) . flip (
(`newCol ; newCol);
(`bufferSize; bufferSize))]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] or dictionary | Symbol or list of symbols indicating the columns to act on, or a dictionary of column names and replacement values. |
options:
name | type | description | default |
---|---|---|---|
newCol | boolean | Boolean indicating if additional columns are to be added indicating which entries were null. 1b for yes 0b for no. |
0b |
bufferSize | long | Number of data points that must amass before calculating the median. | 0 |
For all common arguments, refer to configuring operators
This operator replace null values in the selected columns.
If X
is a symbol or list of symbols, nulls will be replaced by the median for the
column, as calculated from the buffered data, or from the first batch if bufferSize
is not specified.
Initially, the batches are collected in a buffer until the required size is exceeded.
If this buffer contains only null values for the column, an error will be logged, and
null values will not be replaced.
If X
is a dictionary of column names and replacement values, nulls will be replaced
by the value given for the column. When replacement values are given, no buffering is
done.
This pipeline replaces nulls columns x
and y
with the median values.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceNull[`x`y;.qsp.use`bufferSize`newCol!(10;1b)]
.qsp.write.toConsole[];
publish ([] x: 0n,10?1f; y: 11?1f; z: til 11);
This pipeline replaces nulls in columns x
and x1
with specific values.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceNull[`x`x1!0 .5;.qsp.use ``newCol!11b]
.qsp.write.toConsole[];
publish ([] x: 0n , 10?1f; x1: 0n 0n , (8?1f) , 0n);
sp.transform.replace_null(['price', 'quantity']
Parameters:
name | type | description |
---|---|---|
columns | symbol or symbol[] or dictionary | Column(s) to apply null replace to in string format, or None replace nulls in all float64 columns within a batch. |
options:
name | type | description | default |
---|---|---|---|
buffer_size | boolean | Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied. | 0b |
new_column | long | Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row. | 0 |
Returns:
A null_replace
transformer, which can be joined to other operators or pipelines.
Examples: Replaces the null values in a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.replace_null(['x', 'x1'])
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x': np.random.randn(10),
'x1': np.random.randn(10),
'x2': np.random.randn(10)
})
>>> data['x'][9] = np.nan
>>> kx.q('publish', data)
>>> kx.q('out')
x x1 x2
--------------------------------
0.9570144 0.3184851 1.184641
-2.361744 -0.2467937 -0.84709
0.483128 0.1582659 0.3794346
-0.9834509 0.3171456 0.5043843
2.389175 0.118148 -0.5278073
0.9906622 -1.987247 -0.831912
0.7275034 0.05414338 0.9702141
0.2681021 1.785524 1.305275
0.4028538 1.23407 0.2663236
0.483128 -1.194168 1.019275
Schema
Applies a table schema to data passing through the operator.
.qsp.transform.schema[schema]
.qsp.transform.schema[schema; .qsp.use (!) . flip (
(`inputType ; inputType);
(`schemaType ; schemaType);
(`parse ; parse))]
Parameters:
name | type | description | default |
---|---|---|---|
schema | symbol or table or dictionary | A schema to apply to the incoming data. This can be a table name represented in a mounted Insights assembly, an empty kdb+ table, or a map of column names to column types. | Required |
options:
name | type | description | default |
---|---|---|---|
inputType | symbol | The data type of the stream (one of arrays , table or auto ), if this is fixed for all events, this allows for the SP to make optimization at compilation time. |
auto |
schemaType | symbol | How to interpret the provided schema object. By default the schema is treated as the desired literal output, alternatively, this can be set to schema. When using schema the provided schema object should be in a special table format of ([] name: $(); datatype: short$()). |
literal |
parse | symbol | Indicates whether parsing of input string data into other datatypes is required. One of auto , on or off . If on or off , optimization of processing can be done. |
auto |
For all common arguments, refer to configuring operators
This applies a given schema to the data passing through the operator.
Input Considerations
Events containing table data
Columns in incoming messages that are not in the target schema will be dropped. Columns in the target schema
that are missing will be added with nulls. If no common columns are found, or incoming data cannot
be cast to the to the target datatype, a null event will be returned and an error will be logged
to stdout
with the corresponding message offset, if available.
Events containing array data
Incoming array data must have the same length as the number of columns in the target schema. Additionally, each element of the array can be a typed list or a scalar that can be scalar-extended to the length of the outbound table data. The schema will be applied to the incoming data in the column order specified in the schema configuration (whether it's a table or a dictionary). If the schema cannot be applied to the incoming data due to array length or invalid type conversions the operator will push a null event further down the pipeline and log an error with message offset, if available.
Expected type formats
The parse
option allows for string representations to be converted to typed values. For numeric values to
be parsed correctly, they must be provided in the expected format. String values in unexpected formats may
be processed incorrectly.
- Strings representing bytes are expected as exactly two base 16 digits, e.g.
"ff"
- Strings representing integers are expected in base 10, e.g.
"255"
- Strings representing boolean values have a number of supported options, e.g.
"t"
,"1"
- More information on the available formats.
Specifying tokenization using the schema object (schemaType=schema
)
All entries in the tokenization column must be one of auto
, on
, or off
.
auto
- In each batch of data, check if the corresponding column needs to be tokenized, and if so have this done.on
- Always tokenize the corresponding column in the incoming data, without doing any checks. Offers performance improvements overauto
.off
- Never tokenize the corresponding column in the incoming data. Offers performance improvements overauto
.
Note: the parse
option is ignored when a schema object is passed in.
Specifying tokenization using the parse
option
The parse
option is only used when a schema literal is passed in (schemaType=literal
). It defines
the same tokenization option for all input columns:
auto
- In each batch of data, detect and parse columns that require tokenization.parse
- In each batch of data, parse all columns without doing any checks. Offers performance improvements overauto
.off
- No parsing is done for input data. Offers performance improvements overauto
.
Note: Providing parse
as a boolean is deprecated. For backwards-compatibility, 1b maps to auto
and 0b maps to off
.
Providing a symbol as the input schema
If a symbol is provided as the input schema to this operator, the table name must exist in a mounted assembly in the SP Worker.
When the Stream Processor is deployed inside of an assembly, that information will be automatically mounted so any table in
the assembly in which the SP is also deployed can be referenced. If the Stream Processor is deployed outside of an assembly,
an assembly configuration map must be mounted using the kubeConfig
parameter of our deployment REST API.
Convert an input stream to the correct datatypes:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.json[]
.qsp.transform.schema[([] date: `date$(); price: `float$(); sym:`$()); .qsp.use``parse!(::;1b)]
.qsp.write.toConsole[]
publish enlist "[{\"date\":\"2020-05-22\",\"price\":\"178.12\",\"sym\":\"AAPL\"}]"
| date price sym
-----------------------------| ----------------------
2021.11.25D17:46:03.169434200| 2020.05.22 178.12 AAPL
sp.transform.schema({'name': kx.SymbolAtom, 'date': kx.DateAtom})
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or table or dictionary | Dictionary of keys and values to define schema. Keys specify the columns names. The value associated with a key is the pykx type of the column e.g. kx.FloatAtom . |
Required |
options:
name | type | description | default |
---|---|---|---|
input_type | symbol | The data type of the stream (one of arrays, table or auto), Choosing array or table will result in evaluation optimization at compile time. | auto |
parse | symbol | Indicates whether parsing of input string data into other datatypes is required. One of auto, on or off. If on or off, optimization of processing can be done. | auto |
Returns:
A schema
transformer, which can be joined to other operators or pipelines.
parse option
The parse
option allows for string representations to be converted to typed values.
For numeric values to be parsed correctly, they must be provided in the expected format.
String values in unexpected formats may be processed incorrectly.
- Strings representing bytes are expected as exactly two base 16 digits, e.g.
"00" to "ff"
- Strings representing integers are expected in base 10, e.g.
"255"
- Strings representing boolean values have a number of supported options, e.g.
"t"
,"1"
- More information on the available formats.
Examples:
>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.schema({'name': kx.SymbolAtom, 'date': kx.DateAtom})
| sp.write.to_variable('out'))
>>> kx.q('publish', [{'name': 'Apple', 'date': datetime(2023,1,4)},
{'name': 'Google', 'date': datetime(2023,1,4)}])
>>> kx.q('out')
name date
-----------------
Apple 2023.01.04
Google 2023.01.04
Time Split
Decompose temporal (e.g. date/time) columns into constituent parts (e.g. minutes, hours, days etc.).
.qsp.transform.timeSplit[X]
.qsp.transform.timeSplit[X; .qsp.use enlist[`delete]!enlist delete]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] or :: | The columns to act on. Alternatively use :: to convert all temporal columns. |
options:
name | type | description | default |
---|---|---|---|
delete | boolean | Boolean indicating if the original temporal column is to be deleted post conversion. This is true by default as many ML algorithms cannot discern the meaning of temporal types. | 1b |
For all common arguments, refer to configuring operators
This operator is used to decompose temporal columns within kdb+ tables (date/month/timestamp etc.) into a set of constituent elements which can be used to represent this data. This functionality is described here, and is intended to allow the information in temporal data to be better utilized within ML algorithms by providing contextual information such as day of the week/quarter information alongside information such as month/day/hour/second etc.
The operator can be configured to convert data on a per column(s) basis or used on all temporal columns. By default the original column is deleted, this can be modified to allow the original column to be maintained if required by the use-case.
This pipeline replaces all temporal columns (date/month) with decomposed representations.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[::]
.qsp.write.toConsole[];
publish ([] month: asc "m"$10?5; date: asc "d"$10?60; number: 10?1f)
This pipeline replaces a specified column with the decomposed representation.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[`x]
.qsp.write.toConsole[];
publish ([] x: 10?2020.01.01D00:00:00; y: 10?00:00:00)
This pipeline replaces specified columns with decomposed representation, retaining the originals column.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[`x`y; .qsp.use ``delete!00b]
.qsp.write.toConsole[];
publish ([] x: 10?00:00; y: 10?00:00; z: til 10)
sp.transform.time_split()
Parameters:
name | type | description |
---|---|---|
columns | symbol or symbol[] or :: | The names of the columns to be temporally decomposed, or None to decompose all date/time columns. |
options:
name | type | description | default |
---|---|---|---|
delete | boolean | Whether the original temporal columns which have been decomposed should be removed from the batch. | 1b |
Returns:
A time_split
transformer, which can be joined to other operators or pipelines.
Examples: Apply time split to reduce times to seasonal components:
>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.time_split()
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x':[datetime(2000,1,1), datetime(2000,2,2)],
'x1':['a', 'b']
})
>>> kx.q('publish', data)
>>> kx.q('out')
x1 x_dayOfWeek x_year x_month x_day x_quarter x_weekday x_hour x_minute x_second
--------------------------------------------------------------------------------
a 0 2000 1 1 1 0 0 0 0
b 4 2000 2 2 1 1 0 0 0