Skip to content

Data Transformations

.qsp.transform.replaceInfinity

Replaces positive/negative infinite values with the max/min values from each columns

.qsp.transform.replaceInfinity[X]
.qsp.transform.replaceInfinity[X; .qsp.use enlist[`newCol]!enlist newCol]

Parameters:

name type description
X symbol or symbol[] Symbol or list of symbols indicating the columns to act on.

options:

name type description default
newCol boolean Boolean indicating if additional columns are to be added indicating which entries were infinities. 1b for yes 0b for no. 0b

For all common arguments, refer to configuring operators

This operator replaces infinite values in the specified columns. This allows the ± infinities to be replaced by the running maximum/minimum for the column.

Operating restrictions

If the first value received for a column is infinite, an error will be thrown as there is no value to replace it with.

This pipeline replaces infinities in the columns x and x1

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceInfinity[`x`x1; .qsp.use ``newCol!11b]
  .qsp.write.toConsole[];

publish ([] x: 1 3 4 0w; x1: 1 -0W 0 -0W; x2: til 4);

.qsp.transform.replaceNull

Replaces null values using the median from each of the columns

.qsp.transform.replaceNull[X]
.qsp.transform.replaceNull[X; .qsp.use (!) . flip (
    (`newCol    ; newCol);
    (`bufferSize; bufferSize))]

Parameters:

name type description
X symbol or symbol[] or dictionary Symbol or list of symbols indicating the columns to act on, or a dictionary of column names and replacement values.

options:

name type description default
newCol boolean Boolean indicating if additional columns are to be added indicating which entries were null. 1b for yes 0b for no. 0b
bufferSize long Number of data points that must amass before calculating the median. 0

For all common arguments, refer to configuring operators

This operator replace null values in the selected columns. If X is a symbol or list of symbols, nulls will be replaced by the median for the column, as calculated from the buffered data, or from the first batch if bufferSize is not specified. Initially, the batches are collected in a buffer until the required size is exceeded. If this buffer contains only null values for the column, an error will be logged, and null values will not be replaced. If X is a dictionary of column names and replacement values, nulls will be replaced by the value given for the column. When replacement values are given, no buffering is done.

This pipeline replaces nulls columns x and y with the median values.

.qsp.run 
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceNull[`x`y;.qsp.use`bufferSize`newCol!(10;1b)]
  .qsp.write.toConsole[];

publish ([] x: 0n,10?1f; y: 11?1f; z: til 11);

This pipeline replaces nulls in columns x and x1 with specific values.

.qsp.run 
  .qsp.read.fromCallback[`publish]
  .qsp.transform.replaceNull[`x`x1!0 .5;.qsp.use ``newCol!11b]
  .qsp.write.toConsole[];

publish ([] x: 0n , 10?1f; x1: 0n 0n , (8?1f) , 0n);

.qsp.transform.schema

Apply a table schema to data passing through the operator

.qsp.transform.schema[schema]
.qsp.transform.schema[schema; .qsp.use (!) . flip (
    (`inputType  ; inputType);
    (`schemaType ; schemaType);
    (`parse      ; parse))]

Parameters:

name type description default
schema symbol or table or dictionary A schema to apply to the incoming data. This can be a table name represented in a mounted Insights assembly, an empty kdb+ table, or a map of column names to column types. Required

options:

name type description default
inputType symbol The data type of the stream (one of arrays, table or auto), if this is fixed for all events, this allows for the SP to make optimization at compilation time. auto
schemaType symbol How to interpret the provided schema object. By default the schema is treated as the desired literal output. Alternatively, this can be set to be schema and a special table of ([] name: `$(); datatype: `short$(); tokenize: `boolean$()) can be provided describing the desired output. literal
parse boolean Indicates whether parsing of input string data into other datatypes is required. When this is not required, optimization of processing can be made. 0b

For all common arguments, refer to configuring operators

This applies a given schema to the data passing through the operator.

Input Considerations:

For events containing table data: Columns in incoming messages that are not in the target schema will be dropped. Columns in the target schema that are missing will be added with nulls. If no common columns are found, or incoming data cannot be cast to the to the target datatype, a null event will be returned and an error will be logged to stdout with the corresponding message offset, if available.

For events containing array data:

Incoming array data must have the same length as the number of columns in the target schema. Additionally, each element of the array can be a typed list or a scalar that can be scalar-extended to the length of the outbound table data. The schema will be applied to the incoming data in the column order specified in the schema configuration (whether its a table or a dictionary). If the schema cannot be applied to the incoming data due to array length or invalid type conversions the operator will push a null event further down the pipeline and log an error with message offset, if available.

If a symbol is provided as the input schema to this operator, the table name must exist in a mounted assembly in the SP Worker. When the Stream Processor is deployed inside of an assembly, that information will be automatically mounted so any table in the assembly in which the SP is also deployed can be referenced. If the Stream Processor is deployed outside of an assembly, an assembly configuration map must be mounted using the kubeConfig parameter of our deployment REST API.

Convert an input stream to the correct datatypes:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.decode.json[]
  .qsp.transform.schema[([] date: `date$(); price: `float$(); sym:`$()); .qsp.use``parse!(::;1b)]
  .qsp.write.toConsole[]

publish enlist "[{\"date\":\"2020-05-22\",\"price\":\"178.12\",\"sym\":\"AAPL\"}]"
                             | date       price  sym
-----------------------------| ----------------------
2021.11.25D17:46:03.169434200| 2020.05.22 178.12 AAPL

.qsp.transform.timeSplit

Decompose temporal columns into constituent elements

.qsp.transform.timeSplit[X]
.qsp.transform.timeSplit[X; .qsp.use enlist[`delete]!enlist delete]

Parameters:

name type description
X symbol or symbol[] or :: The columns to act on. Alternatively use :: to convert all temporal columns.

options:

name type description default
delete boolean Boolean indicating if the original temporal column is to be deleted post conversion. This is true by default as many ML algorithms cannot discern the meaning of temporal types. 1b

For all common arguments, refer to configuring operators

This operator is used to decompose temporal columns within kdb+ tables (date/month/timestamp etc.) into a set of constituent elements which can be used to represent this data. This functionality is described here, and is intended to allow the information in temporal data to be better utilized within ML algorithms by providing contextual information such as day of the week/quarter information alongside information such as month/day/hour/second etc.

The operator can be configured to convert data on a per column(s) basis or used on all temporal columns. By default the original column is deleted, this can be modified to allow the original column to be maintained if required by the use-case.

This pipeline replaces all temporal columns (date/month) with decomposed representations.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[::]
  .qsp.write.toConsole[];

publish ([] month: asc "m"$10?5; date: asc "d"$10?60; number: 10?1f)

This pipeline replaces a specified column with the decomposed representation.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[`x]
  .qsp.write.toConsole[];

publish ([] x: 10?2020.01.01D00:00:00; y: 10?00:00:00)

This pipeline replaces specified columns with decomposed representation, retaining the originals column.

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.transform.timeSplit[`x`y; .qsp.use ``delete!00b]
  .qsp.write.toConsole[];

publish ([] x: 10?00:00; y: 10?00:00; z: til 10)