Skip to content

Data Transforms

Stream Processor Transforms

kxi.sp.transform.ReplaceInfinityTransformer (Transformer)

kxi.sp.transform.ReplaceNullTransformer (Transformer)

kxi.sp.transform.TimeSplitTransformer (Transformer)

kxi.sp.transform.Transformer (Operator)

kxi.sp.transform.replace_infinity

Replace infinite values in columns based on previously seen data.

Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.

Parameters:

Name Type Description Default
columns Union[str, List[str]]

Column(s) to apply infinity replace to in string format.

None
new_column bool

Boolean indicating whether to add new column (True) which indicates the presence of infinite values for a given row.

False

Returns:

Type Description
Transformer

An infinity_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the infinite values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_infinity(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.inf
>>> kx.q('publish' , data)
x          x1         x2
--------------------------------
0.1407207  0.6799529  1.376385
0.7520082  0.6998367  2.29289
1.487602   1.733495   0.587773
-1.211405  -0.5740276 -0.7183643
-1.20242   0.09841603 -0.8136622
1.029497   -0.6584508 0.8406603
-0.7056529 0.8348059  0.7559578
0.5293029  -0.1984016 -1.599482
-0.2440444 -0.9557535 0.3108312
1.487602   0.4727299  1.181564
pykx.Identity(pykx.q('::'))

kxi.sp.transform.replace_null

Replace null values with the median of a supplied batch.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

Column(s) to apply null replace to in string format, or None replace nulls in all float64 columns within a batch.

None
buffer_size int

Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied.

0
new_column bool

Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row.

False

Returns:

Type Description
Transformer

A null_replace transformer, which can be joined to other operators or pipelines.

Examples:

Replaces the null values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_null(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.nan
>>> kx.q('publish', data)
x           x1         x2
---------------------------------
0.5002494   0.9197352  -1.698508
2.444905    0.5126744  2.35529
0.967191    0.2152119  1.141427
-0.7111563  -3.095967  2.266795
-0.02537276 0.7186668  -1.436224
0.8257473   -1.169573  -0.9088152
-1.88747    -0.8489675 -0.8930248
0.884007    -0.39412   0.6800597
0.4455145   0.7518979  -0.6053683
0.4728819   -0.8396647 0.178036
pykx.Identity(pykx.q('::'))

kxi.sp.transform.time_split

Decompose date/time columns into respective parts (minutes, hours, days etc.)

Parameters:

Name Type Description Default
columns Union[str, List[str]]

The names of the columns to be temporally decomposed, or None to decompose all date/time columns.

None
delete bool

Whether the original temporal columns which have been decomposed should be removed from the batch.

True

Returns:

Type Description
Transformer

A time_split transformer, which can be joined to other operators or pipelines.

Examples:

Apply time split to reduce times to seasonal components:

>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.time_split()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x':[datetime(2000,1,1), datetime(2000,2,2)],
        'x1':['a', 'b']
    })
>>> kx.q('publish', data)
x1 x_dayOfWeek x_year x_month x_day x_quarter x_weekday x_hour x_minute x_second
--------------------------------------------------------------------------------
a  0           2000   1       1     1         0         0      0        0
b  4           2000   2       2     1         1         0      0        0
pykx.Identity(pykx.q('::'))