Skip to content

kxi.sp.transform

Stream Processor Transforms

fill

@Transformer
def fill(defaults: dict, mode: str = "static") -> Transformer

Fill null values in streaming data.

Arguments:

  • defaults - Dictionary of keys and values to replace nulls. Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column.
  • mode - Alternatively you may specify a mode for replacement of nulls. Specifies the approach to use when filling. One of static, down, or up. In static mode every null value in a column is replaced with its corresponding default. In up mode we do an up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default. Finally, in down mode we do a down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default

Returns:

A fill transformer, which can be joined to other operators or pipelines.

replace_infinity

@Transformer
def replace_infinity(columns: Optional[Union[str, List[str]]] = None,
                     *,
                     new_column: bool = False) -> Transformer

Replace infinite values in columns based on previously seen data.

Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.

Arguments:

  • columns - Column(s) to apply infinity replace to in string format.
  • new_column - Boolean indicating whether to add new column (True) which indicates the presence of infinite values for a given row.

Returns:

An infinity_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the infinite values in a batch of data:

```python

from kxi import sp import pykx as kx import pandas as pd import numpy as np sp.run(sp.read.from_callback('publish') | sp.transform.replace_infinity(['x', 'x1']) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - np.random.randn(10), - 'x1' - np.random.randn(10), - 'x2' - np.random.randn(10) }) data['x'][9] = np.inf kx.q('publish' , data) ```

replace_null

@Transformer
def replace_null(columns: Optional[Union[str, List[str], dict]] = None,
                 *,
                 buffer_size: int = 0,
                 new_column: bool = False) -> Transformer

Replace null values with the median of a supplied batch.

Arguments:

  • columns - Column(s) to apply null replace to in string format, or None replace nulls in all float64 columns within a batch.
  • buffer_size - Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied.
  • new_column - Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row.

Returns:

A null_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the null values in a batch of data:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.transform.replace_null(['x', 'x1']) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - np.random.randn(10), - 'x1' - np.random.randn(10), - 'x2' - np.random.randn(10) }) data['x'][9] = np.nan kx.q('publish', data) ```

time_split

@Transformer
def time_split(columns: Optional[Union[str, List[str]]] = None,
               *,
               delete: bool = True) -> Transformer

Decompose date/time columns into respective parts (minutes, hours, days etc.)

Arguments:

  • columns - The names of the columns to be temporally decomposed, or None to decompose all date/time columns.
  • delete - Whether the original temporal columns which have been decomposed should be removed from the batch.

Returns:

A time_split transformer, which can be joined to other operators or pipelines.

Examples:

Apply time split to reduce times to seasonal components:

>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.transform.time_split()
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
'x':[datetime(2000,1,1), datetime(2000,2,2)],
'x1':['a', 'b']
})
>>> kx.q('publish', data)
x1 x_dayOfWeek x_year x_month x_day x_quarter x_weekday x_hour x_minute x_second
--------------------------------------------------------------------------------
a  0           2000   1       1     1         0         0      0        0
b  4           2000   2       2     1         1         0      0        0
pykx.Identity(pykx.q('::'))