Skip to content

kxi.sp.transform

Stream Processor Transforms

InputType Objects

class InputType(AutoNameEnum)

Input type of data to schema plugin.

arrays

data is always array type

table

data is always table type

auto

data can be of mixed type

Parse Objects

class Parse(AutoNameEnum)

Parse string data to other types.

on

on

off

off

auto

auto

schema

@Transformer
def schema(table: dict,
           *,
           input_type: InputType = InputType.auto,
           parse: Parse = Parse.auto) -> Transformer

Apply schema to the incoming data, imposing the outgoing table format.

Arguments:

  • table - Dictionary of keys and values to define schema. Keys specify the columns names. The value associated with a key is the pykx type of the column e.g. kx.FloatAtom.
  • input_type - The data type of the stream (one of arrays, table or auto), Choosing array or table will result in evaluation optimization at compile time.
  • parse - Indicates whether parsing of input string data into other datatypes is required. One of auto, on or off. If on or off, optimization of processing can be done.

Returns:

A schema transformer, which can be joined to other operators or pipelines.

Notes:

The parse option allows for string representations to be converted to typed values. For numeric values to be parsed correctly, they must be provided in the expected format. String values in unexpected formats may be processed incorrectly.

  • Strings representing bytes are expected as exactly two base 16 digits, e.g. "00" to "ff"
  • Strings representing integers are expected in base 10, e.g. "255"
  • Strings representing boolean values have a number of supported options, e.g. "t", "1"
  • More information on the available formats.

>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime

>>> sp.run(sp.read.from_callback('publish')
            | sp.transform.schema({'name': kx.SymbolAtom, 'date': kx.DateAtom})
            | sp.write.to_variable('out'))
>>> kx.q('publish', [{'name': 'Apple', 'date': datetime(2023,1,4)},
        {'name': 'Google', 'date': datetime(2023,1,4)}])
>>> kx.q('out')
name   date
-----------------
Apple  2023.01.04
Google 2023.01.04

fill

@Transformer
def fill(defaults: dict, mode: str = "static") -> Transformer

(Beta Feature)Fill null values in streaming data.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • defaults - Dictionary of keys and values to replace nulls. Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column.
  • mode - Alternatively you may specify a mode for replacement of nulls. Specifies the approach to use when filling. One of static, down, or up. In static mode every null value in a column is replaced with its corresponding default. In up mode we do an up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default. Finally, in down mode we do a down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default

Returns:

A fill transformer, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd

>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.fill({'x': 1})
        | sp.write.to_variable('out'))
>>> kx.q("publish", pd.DataFrame({'x': [None, 2, None]}))
>>> kx.q('out')
x
-
1
2
1

rename_columns

@Transformer
def rename_columns(nameScheme: dict) -> Transformer

Rename columns according to the dictionary provided

Arguments:

  • nameScheme - A dictionary defining the columns to be renamed and their new names.

Returns:

A rename transformer, which can be joined to other operators or pipelines.

Examples:

Renames the columns in a batch of data (must be a table)

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd

>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.rename_columns({'a': 'x', 'b': 'y'})
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'a': np.random.randn(10),
        'b': np.random.randn(10),
        'z': np.random.randn(10)
    })
>>> kx.q('publish', data)
>>> kx.q('out')
x          y          z
---------------------------------
-0.8346907 -0.6609932 -1.807997
0.9907844  -0.2233735 0.7030451
2.021636   0.8098854  0.3018809
0.2035373  1.373164   0.8300498
-0.1250729 1.424175   -0.07679308
2.156653   -0.2182297 -0.458493
-0.2572752 -2.013144  1.027509
2.010659   1.397499   -0.7732002
-1.143055  -1.739128  0.8732918
-0.4201101 0.635765   0.5426539

replace_infinity

@Transformer
def replace_infinity(columns: Optional[Union[str, List[str]]] = None,
                     *,
                     new_column: bool = False) -> Transformer

Replace infinite values in columns based on previously seen data.

Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.

Arguments:

  • columns - Column(s) to apply infinity replace to in string format.
  • new_column - Boolean indicating whether to add new column (True) which indicates the presence of infinite values for a given row.

Returns:

An infinity_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the infinite values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_infinity(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.inf
>>> kx.q('publish' , data)
>>> kx.q('out')
    x          x1         x2
---------------------------------
1.122557   1.091329   -0.9192559
-0.2272307 0.1437045  -1.314342
1.852872   -0.8079581 -0.04237484
-1.311179  -0.4326425 1.383893
-0.7943273 -1.13575   -0.437631
0.1900052  -1.223518  -0.9580604
0.2486013  -1.104549  0.8776486
1.109844   0.380195   0.898798
0.9993286  0.4093832  -1.477295
1.852872   0.7642802  0.4787892

replace_null

@Transformer
def replace_null(columns: Optional[Union[str, List[str], dict]] = None,
                 *,
                 buffer_size: int = 0,
                 new_column: bool = False) -> Transformer

Replace null values with the median of a supplied batch.

Arguments:

  • columns - Column(s) to apply null replace to in string format, or None replace nulls in all float64 columns within a batch.
  • buffer_size - Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied.
  • new_column - Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row.

Returns:

A null_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the null values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd

>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_null(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.nan
>>> kx.q('publish', data)
>>> kx.q('out')
    x          x1         x2
--------------------------------
0.9570144  0.3184851  1.184641
-2.361744  -0.2467937 -0.84709
0.483128   0.1582659  0.3794346
-0.9834509 0.3171456  0.5043843
2.389175   0.118148   -0.5278073
0.9906622  -1.987247  -0.831912
0.7275034  0.05414338 0.9702141
0.2681021  1.785524   1.305275
0.4028538  1.23407    0.2663236
0.483128   -1.194168  1.019275

time_split

@Transformer
def time_split(columns: Optional[Union[str, List[str]]] = None,
               *,
               delete: bool = True) -> Transformer

Decompose date/time columns into respective parts (minutes, hours, days etc.)

Arguments:

  • columns - The names of the columns to be temporally decomposed, or None to decompose all date/time columns.
  • delete - Whether the original temporal columns which have been decomposed should be removed from the batch.

Returns:

A time_split transformer, which can be joined to other operators or pipelines.

Examples:

Apply time split to reduce times to seasonal components:

>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd

>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.time_split()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x':[datetime(2000,1,1), datetime(2000,2,2)],
        'x1':['a', 'b']
    })
>>> kx.q('publish', data)
>>> kx.q('out')
x1 x_dayOfWeek x_year x_month x_day x_quarter x_weekday x_hour x_minute x_second
--------------------------------------------------------------------------------
a  0           2000   1       1     1         0         0      0        0
b  4           2000   2       2     1         1         0      0        0