Data Transformations
.qsp.transform replaceInfinity replaces infinite values with min/max values replaceNull replaces null values with the median value timeSplit decomposes time columns into subdivisions of mins/hours etc fill (Beta) fill in null values in a table schema transforms data to match a provided schema
.qsp.transform.fill
(Beta Feature) Fill null values in table columns
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
.
.qsp.transform.fill[defaults]
.qsp.transform.fill[defaults;mode]
Parameters:
name | type | description | default |
---|---|---|---|
defaults | dict | Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column. | Required |
mode | symbol | Specifies the approach to use when filling. One of static , down , or up . |
static |
For all common arguments, refer to configuring operators
This operator fills null values in a table either statically, up, or down, and utilizes specified defaults. static - every null value in a column is replaced with its corresponding default. up - up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default. down - down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default. Plugin state is managed to have down-fill carry over across batches. Currently, only filling vector columns is supported, and all default values to fill with must be atomic. The type of elements in a vector column should match the type of the atomic fill value, otherwise the column is implicitly cast to match the type of the fill value.
Using fill-static on a table with null entries:
input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3: 0N 5 0N 5 0N)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.fill[`val1`val2`val3!(-1;"_"; -10)]
.qsp.write.toVariable[`output];
publish input
output
val1 val2 val3
--------------
-1 a -10
1 _ 5
2 b -10
-1 _ 5
3 c -10
Using fill-down on a table with null entries:
input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3: 0N 5 0N 5 0N)
dict: `val1`val2`val3!(-1;"_"; -10)
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.fill[dict;`down]
.qsp.write.toVariable[`output];
publish input
output
val1 val2 val3
--------------
-1 a -10
1 a 5
2 b 5
2 b 5
3 c 5
.qsp.transform.replaceInfinity
Replaces positive/negative infinite values with the max/min values from each columns
.qsp.transform.replaceInfinity[X]
.qsp.transform.replaceInfinity[X; .qsp.use enlist[`newCol]!enlist newCol]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] | Symbol or list of symbols indicating the columns to act on. |
options:
name | type | description | default |
---|---|---|---|
newCol | boolean | Boolean indicating if additional columns are to be added indicating which entries were infinities. 1b for yes 0b for no. |
0b |
For all common arguments, refer to configuring operators
This operator replaces infinite values in the specified columns. This allows the ± infinities to be replaced by the running maximum/minimum for the column.
Operating restrictions
If the first value received for a column is infinite, an error will be thrown as there is no value to replace it with.
This pipeline replaces infinities in the columns x
and x1
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceInfinity[`x`x1; .qsp.use ``newCol!11b]
.qsp.write.toConsole[];
publish ([] x: 1 3 4 0w; x1: 1 -0W 0 -0W; x2: til 4);
.qsp.transform.replaceNull
Replaces null values using the median from each of the columns
.qsp.transform.replaceNull[X]
.qsp.transform.replaceNull[X; .qsp.use (!) . flip (
(`newCol ; newCol);
(`bufferSize; bufferSize))]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] or dictionary | Symbol or list of symbols indicating the columns to act on, or a dictionary of column names and replacement values. |
options:
name | type | description | default |
---|---|---|---|
newCol | boolean | Boolean indicating if additional columns are to be added indicating which entries were null. 1b for yes 0b for no. |
0b |
bufferSize | long | Number of data points that must amass before calculating the median. | 0 |
For all common arguments, refer to configuring operators
This operator replace null values in the selected columns.
If X
is a symbol or list of symbols, nulls will be replaced by the median for the
column, as calculated from the buffered data, or from the first batch if bufferSize
is not specified.
Initially, the batches are collected in a buffer until the required size is exceeded.
If this buffer contains only null values for the column, an error will be logged, and
null values will not be replaced.
If X
is a dictionary of column names and replacement values, nulls will be replaced
by the value given for the column. When replacement values are given, no buffering is
done.
This pipeline replaces nulls columns x
and y
with the median values.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceNull[`x`y;.qsp.use`bufferSize`newCol!(10;1b)]
.qsp.write.toConsole[];
publish ([] x: 0n,10?1f; y: 11?1f; z: til 11);
This pipeline replaces nulls in columns x
and x1
with specific values.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.replaceNull[`x`x1!0 .5;.qsp.use ``newCol!11b]
.qsp.write.toConsole[];
publish ([] x: 0n , 10?1f; x1: 0n 0n , (8?1f) , 0n);
.qsp.transform.schema
Apply a table schema to data passing through the operator
.qsp.transform.schema[schema]
.qsp.transform.schema[schema; .qsp.use (!) . flip (
(`inputType ; inputType);
(`schemaType ; schemaType);
(`parse ; parse))]
Parameters:
name | type | description | default |
---|---|---|---|
schema | symbol or table or dictionary | A schema to apply to the incoming data. This can be a table name represented in a mounted Insights assembly, an empty kdb+ table, or a map of column names to column types. | Required |
options:
name | type | description | default |
---|---|---|---|
inputType | symbol | The data type of the stream (one of arrays , table or auto ), if this is fixed for all events, this allows for the SP to make optimization at compilation time. |
auto |
schemaType | symbol | How to interpret the provided schema object. By default the schema is treated as the desired literal output. Alternatively, this can be set to be schema and a special table of ([] name: `$(); datatype: `short$(); tokenize: `$()) can be provided describing the desired output. |
literal |
parse | symbol | Indicates whether parsing of input string data into other datatypes is required. One of auto , on or off . If on or off , optimization of processing can be done. |
auto |
For all common arguments, refer to configuring operators
This applies a given schema to the data passing through the operator.
Input Considerations:
For events containing table data:
Columns in incoming messages that are not in the target schema will be dropped. Columns in the target schema
that are missing will be added with nulls. If no common columns are found, or incoming data cannot
be cast to the to the target datatype, a null event will be returned and an error will be logged
to stdout
with the corresponding message offset, if available.
For events containing array data:
Incoming array data must have the same length as the number of columns in the target schema. Additionally, each element of the array can be a typed list or a scalar that can be scalar-extended to the length of the outbound table data. The schema will be applied to the incoming data in the column order specified in the schema configuration (whether it's a table or a dictionary). If the schema cannot be applied to the incoming data due to array length or invalid type conversions the operator will push a null event further down the pipeline and log an error with message offset, if available.
Specifying tokenization in the schema object passed in (schemaType=schema
):
All entries in the tokenization column must be one of auto
, on
, or off
.
auto
→ In each batch of data, check if the corresponding column needs to be tokenized, and if so
have this done.
on
→ Always tokenize the corresponding column in the incoming data, without doing any checks.
Offers performance improvements over auto
.
off
→ Never tokenize the corresponding column in the incoming data.
Offers performance improvements over auto
.
Note: the parse
option is ignored when a schema object is passed in.
Specifying tokenization using the parse
option:
The parse
option is only used when a schema literal is passed in (schemaType=literal
). It defines
the same tokenization option for all input columns.
parse
=auto
→ In each batch of data, detect and parse columns that require tokenization.
parse
=on
→ In each batch of data, parse all columns without doing any checks.
Offers performance improvements over auto
.
parse
=off
→ No parsing is done for input data.
Offers performance improvements over auto
.
Note: Providing parse
as a boolean is deprecated. For backwards-compatibility, 1b maps to auto
and 0b maps to off
.
Providing a symbol as the input schema:
If a symbol is provided as the input schema to this operator, the table name must exist in a mounted assembly in the SP Worker.
When the Stream Processor is deployed inside of an assembly, that information will be automatically mounted so any table in
the assembly in which the SP is also deployed can be referenced. If the Stream Processor is deployed outside of an assembly,
an assembly configuration map must be mounted using the kubeConfig
parameter of our deployment REST API.
Convert an input stream to the correct datatypes:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.decode.json[]
.qsp.transform.schema[([] date: `date$(); price: `float$(); sym:`$()); .qsp.use``parse!(::;1b)]
.qsp.write.toConsole[]
publish enlist "[{\"date\":\"2020-05-22\",\"price\":\"178.12\",\"sym\":\"AAPL\"}]"
| date price sym
-----------------------------| ----------------------
2021.11.25D17:46:03.169434200| 2020.05.22 178.12 AAPL
.qsp.transform.timeSplit
Decompose temporal columns into constituent elements
.qsp.transform.timeSplit[X]
.qsp.transform.timeSplit[X; .qsp.use enlist[`delete]!enlist delete]
Parameters:
name | type | description |
---|---|---|
X | symbol or symbol[] or :: | The columns to act on. Alternatively use :: to convert all temporal columns. |
options:
name | type | description | default |
---|---|---|---|
delete | boolean | Boolean indicating if the original temporal column is to be deleted post conversion. This is true by default as many ML algorithms cannot discern the meaning of temporal types. | 1b |
For all common arguments, refer to configuring operators
This operator is used to decompose temporal columns within kdb+ tables (date/month/timestamp etc.) into a set of constituent elements which can be used to represent this data. This functionality is described here, and is intended to allow the information in temporal data to be better utilized within ML algorithms by providing contextual information such as day of the week/quarter information alongside information such as month/day/hour/second etc.
The operator can be configured to convert data on a per column(s) basis or used on all temporal columns. By default the original column is deleted, this can be modified to allow the original column to be maintained if required by the use-case.
This pipeline replaces all temporal columns (date/month) with decomposed representations.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[::]
.qsp.write.toConsole[];
publish ([] month: asc "m"$10?5; date: asc "d"$10?60; number: 10?1f)
This pipeline replaces a specified column with the decomposed representation.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[`x]
.qsp.write.toConsole[];
publish ([] x: 10?2020.01.01D00:00:00; y: 10?00:00:00)
This pipeline replaces specified columns with decomposed representation, retaining the originals column.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.transform.timeSplit[`x`y; .qsp.use ``delete!00b]
.qsp.write.toConsole[];
publish ([] x: 10?00:00; y: 10?00:00; z: til 10)