Skip to content

Data Transformations

.qsp.transform.fill

(Beta Feature) Fill null values in table columns

Beta Features

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

.qsp.transform.fill[defaults]
.qsp.transform.fill[defaults;mode]

Parameters:

name type description default
defaults dict Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column. Required
mode symbol Specifies the approach to use when filling. One of static, down, or up. static

For all common arguments, refer to configuring operators

This operator fills null values in a table either statically, up, or down, and utilizes specified defaults.

  • static - every null value in a column is replaced with its corresponding default.
  • up - up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default.
  • down - down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default. Plugin state is managed to have down-fill carry over across batches. Currently, only filling vector columns is supported, and all default values to fill with must be atomic. The type of elements in a vector column should match the type of the atomic fill value, otherwise the column is implicitly cast to match the type of the fill value.

Using fill-static on a table with null entries:

input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3:  0N 5 0N 5 0N)

.qsp.run
   .qsp.read.fromCallback[`publish]
   .qsp.transform.fill[`val1`val2`val3!(-1;"_"; -10)]
   .qsp.write.toVariable[`output];

publish input
output
val1 val2 val3
--------------
-1   a    -10
1    _    5
2    b    -10
-1   _    5
3    c    -10

Using fill-down on a table with null entries:

input: ([] val1: 0N 1 2 0N 3; val2: "a b c"; val3:  0N 5 0N 5 0N)
dict: `val1`val2`val3!(-1;"_"; -10)

.qsp.run
   .qsp.read.fromCallback[`publish]
   .qsp.transform.fill[dict;`down]
   .qsp.write.toVariable[`output];

publish input
output
val1 val2 val3
--------------
-1   a    -10
1    a    5
2    b    5
2    b    5
3    c    5

.qsp.transform.renameColumns

Renames columns in the incoming data

.qsp.transform.renameColumns[nameScheme]

Parameters:

name type description default
nameScheme dictionary A dictionary mapping current column names to what they should be renamed to Required

For all common arguments, refer to configuring operators

This example transforms data with column names "a" and "b" to have columns "c" and "d"

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.transform.renameColumns[`a`b!`c`d]
    .qsp.write.toVariable[`output];

publish ([] a: til 5; b: til 5);
output
c d
---
0 0
1 1
2 2
3 3
4 4

.qsp.transform.replaceInfinity

Replaces positive/negative infinite values with the max/min values from each columns

.qsp.transform.replaceInfinity[X]
.qsp.transform.replaceInfinity[X; .qsp.use enlist[`newCol]!enlist newCol]

Parameters:

name type description
X symbol or symbol[] Symbol or list of symbols indicating the columns to act on.

options:

name type description default
newCol boolean Boolean indicating if additional columns are to be added indicating which entries were infinities. 1b for yes 0b for no. 0b

For all common arguments, refer to configuring operators

This operator replaces infinite values in the specified columns. This allows the ± infinities to be replaced by the running maximum/minimum for the column.

Operating restrictions

If the first value received for a column is infinite, an error will be thrown as there is no value to replace it with.

This pipeline replaces infinities in the columns x and x1

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceInfinity[`x`x1; .qsp.use ``newCol!11b]
  .qsp.write.toConsole[];

publish ([] x: 1 3 4 0w; x1: 1 -0W 0 -0W; x2: til 4);

.qsp.transform.replaceNull

Replaces null values using the median from each of the columns

.qsp.transform.replaceNull[X]
.qsp.transform.replaceNull[X; .qsp.use (!) . flip (
    (`newCol    ; newCol);
    (`bufferSize; bufferSize))]

Parameters:

name type description
X symbol or symbol[] or dictionary Symbol or list of symbols indicating the columns to act on, or a dictionary of column names and replacement values.

options:

name type description default
newCol boolean Boolean indicating if additional columns are to be added indicating which entries were null. 1b for yes 0b for no. 0b
bufferSize long Number of data points that must amass before calculating the median. 0

For all common arguments, refer to configuring operators

This operator replace null values in the selected columns. If X is a symbol or list of symbols, nulls will be replaced by the median for the column, as calculated from the buffered data, or from the first batch if bufferSize is not specified. Initially, the batches are collected in a buffer until the required size is exceeded. If this buffer contains only null values for the column, an error will be logged, and null values will not be replaced. If X is a dictionary of column names and replacement values, nulls will be replaced by the value given for the column. When replacement values are given, no buffering is done.

This pipeline replaces nulls columns x and y with the median values.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceNull[`x`y;.qsp.use`bufferSize`newCol!(10;1b)]
  .qsp.write.toConsole[];

publish ([] x: 0n,10?1f; y: 11?1f; z: til 11);

This pipeline replaces nulls in columns x and x1 with specific values.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceNull[`x`x1!0 .5;.qsp.use ``newCol!11b]
  .qsp.write.toConsole[];

publish ([] x: 0n , 10?1f; x1: 0n 0n , (8?1f) , 0n);

.qsp.transform.schema

Apply a table schema to data passing through the operator

.qsp.transform.schema[schema]
.qsp.transform.schema[schema; .qsp.use (!) . flip (
    (`inputType  ; inputType);
    (`schemaType ; schemaType);
    (`parse      ; parse))]

Parameters:

name type description default
schema symbol or table or dictionary A schema to apply to the incoming data. This can be a table name represented in a mounted Insights assembly, an empty kdb+ table, or a map of column names to column types. Required

options:

name type description default
inputType symbol The data type of the stream (one of arrays, table or auto), if this is fixed for all events, this allows for the SP to make optimization at compilation time. auto
schemaType symbol How to interpret the provided schema object. By default the schema is treated as the desired literal output, alternatively, this can be set to schema. When using schema the provided schema object should be in a special table format of ([] name: $(); datatype:short$()). literal
parse symbol Indicates whether parsing of input string data into other datatypes is required. One of auto, on or off. If on or off, optimization of processing can be done. auto

For all common arguments, refer to configuring operators

This applies a given schema to the data passing through the operator.

Input Considerations

Events containing table data

Columns in incoming messages that are not in the target schema will be dropped. Columns in the target schema that are missing will be added with nulls. If no common columns are found, or incoming data cannot be cast to the to the target datatype, a null event will be returned and an error will be logged to stdout with the corresponding message offset, if available.

Events containing array data

Incoming array data must have the same length as the number of columns in the target schema. Additionally, each element of the array can be a typed list or a scalar that can be scalar-extended to the length of the outbound table data. The schema will be applied to the incoming data in the column order specified in the schema configuration (whether it's a table or a dictionary). If the schema cannot be applied to the incoming data due to array length or invalid type conversions the operator will push a null event further down the pipeline and log an error with message offset, if available.

Expected type formats

The parse option allows for string representations to be converted to typed values. For numeric values to be parsed correctly, they must be provided in the expected format. String values in unexpected formats may be processed incorrectly.

  • Strings representing bytes are expected as exactly two base 16 digits, e.g. "ff"
  • Strings representing integers are expected in base 10, e.g. "255"
  • Strings representing boolean values have a number of supported options, e.g. "t", "1"
  • More information on the available formats.
Specifying tokenization using the schema object (schemaType=schema)

All entries in the tokenization column must be one of auto, on, or off.

  • auto - In each batch of data, check if the corresponding column needs to be tokenized, and if so have this done.
  • on - Always tokenize the corresponding column in the incoming data, without doing any checks. Offers performance improvements over auto.
  • off - Never tokenize the corresponding column in the incoming data. Offers performance improvements over auto.

Note: the parse option is ignored when a schema object is passed in.

Specifying tokenization using the parse option

The parse option is only used when a schema literal is passed in (schemaType=literal). It defines the same tokenization option for all input columns:

  • auto - In each batch of data, detect and parse columns that require tokenization.
  • parse - In each batch of data, parse all columns without doing any checks. Offers performance improvements over auto.
  • off - No parsing is done for input data. Offers performance improvements over auto.

Note: Providing parse as a boolean is deprecated. For backwards-compatibility, 1b maps to auto and 0b maps to off.

Providing a symbol as the input schema

If a symbol is provided as the input schema to this operator, the table name must exist in a mounted assembly in the SP Worker. When the Stream Processor is deployed inside of an assembly, that information will be automatically mounted so any table in the assembly in which the SP is also deployed can be referenced. If the Stream Processor is deployed outside of an assembly, an assembly configuration map must be mounted using the kubeConfig parameter of our deployment REST API.

Convert an input stream to the correct datatypes:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.decode.json[]
  .qsp.transform.schema[([] date: `date$(); price: `float$(); sym:`$()); .qsp.use``parse!(::;1b)]
  .qsp.write.toConsole[]

publish enlist "[{\"date\":\"2020-05-22\",\"price\":\"178.12\",\"sym\":\"AAPL\"}]"
                             | date       price  sym
-----------------------------| ----------------------
2021.11.25D17:46:03.169434200| 2020.05.22 178.12 AAPL

.qsp.transform.timeSplit

Decompose temporal columns into constituent elements

.qsp.transform.timeSplit[X]
.qsp.transform.timeSplit[X; .qsp.use enlist[`delete]!enlist delete]

Parameters:

name type description
X symbol or symbol[] or :: The columns to act on. Alternatively use :: to convert all temporal columns.

options:

name type description default
delete boolean Boolean indicating if the original temporal column is to be deleted post conversion. This is true by default as many ML algorithms cannot discern the meaning of temporal types. 1b

For all common arguments, refer to configuring operators

This operator is used to decompose temporal columns within kdb+ tables (date/month/timestamp etc.) into a set of constituent elements which can be used to represent this data. This functionality is described here, and is intended to allow the information in temporal data to be better utilized within ML algorithms by providing contextual information such as day of the week/quarter information alongside information such as month/day/hour/second etc.

The operator can be configured to convert data on a per column(s) basis or used on all temporal columns. By default the original column is deleted, this can be modified to allow the original column to be maintained if required by the use-case.

This pipeline replaces all temporal columns (date/month) with decomposed representations.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[::]
  .qsp.write.toConsole[];

publish ([] month: asc "m"$10?5; date: asc "d"$10?60; number: 10?1f)

This pipeline replaces a specified column with the decomposed representation.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[`x]
  .qsp.write.toConsole[];

publish ([] x: 10?2020.01.01D00:00:00; y: 10?00:00:00)

This pipeline replaces specified columns with decomposed representation, retaining the originals column.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[`x`y; .qsp.use ``delete!00b]
  .qsp.write.toConsole[];

publish ([] x: 10?00:00; y: 10?00:00; z: til 10)