Skip to content

Data Transforms

Stream Processor Transforms

kxi.sp.transform.replace_infinity

Replace infinite values in columns based on previously seen data.

Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.

Parameters:

Name Type Description Default
columns Union[str, List[str]]

Column(s) to apply infinity replace to in string format.

None
new_column bool

Boolean indicating whether to add new column (True) which indicates the presence of infinite values for a given row.

False

Returns:

Type Description
Pipeline

A pipeline comprised of a infinity_replace operator, which can be joined to other pipelines.

Examples:

Replaces the infinite values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_infinity(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.inf
>>> kx.q('publish' , data)
x          x1         x2
--------------------------------
0.1407207  0.6799529  1.376385
0.7520082  0.6998367  2.29289
1.487602   1.733495   0.587773
-1.211405  -0.5740276 -0.7183643
-1.20242   0.09841603 -0.8136622
1.029497   -0.6584508 0.8406603
-0.7056529 0.8348059  0.7559578
0.5293029  -0.1984016 -1.599482
-0.2440444 -0.9557535 0.3108312
1.487602   0.4727299  1.181564
pykx.Identity(pykx.q('::'))

kxi.sp.transform.replace_null

Replace null values with the median of a supplied batch.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

Column(s) to apply null replace to in string format, or None replace nulls in all float64 columns within a batch.

None
buffer_size int

Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied.

0
new_column bool

Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row.

False

Returns:

Type Description
Pipeline

A pipeline comprised of a null_replace operator, which can be joined to other pipelines.

Examples:

Replaces the null values in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_null(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.nan
>>> kx.q('publish', data)
x           x1         x2
---------------------------------
0.5002494   0.9197352  -1.698508
2.444905    0.5126744  2.35529
0.967191    0.2152119  1.141427
-0.7111563  -3.095967  2.266795
-0.02537276 0.7186668  -1.436224
0.8257473   -1.169573  -0.9088152
-1.88747    -0.8489675 -0.8930248
0.884007    -0.39412   0.6800597
0.4455145   0.7518979  -0.6053683
0.4728819   -0.8396647 0.178036
pykx.Identity(pykx.q('::'))

kxi.sp.transform.time_split

Decompose time/date columns into respective parts (mins/hours etc.)

Parameters:

Name Type Description Default
columns Union[str, List[str]]

Either column(s) to be temporally decomposed in string format, or None to decompose all time/date columns.

None
delete bool

Boolean indicating if the original temporal columns which have been decomposed should be removed from the batch.

True

Returns:

Type Description
Pipeline

A pipeline comprised of a time_split operator, which can be joined to other pipelines.

Examples:

Apply time split to reduce times to seasonal components:

>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.time_split()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x':[datetime(2000,1,1),datetime(2000,2,2)],
        'x1':['a','b']
    })
>>> kx.q('publish', data)
x1 x_dayOfWeek x_year x_month x_day x_quarter x_weekday x_hour x_minute x_second # noqa: E501
-------------------------------------------------------------------------------- # noqa: E501
a  0           2000   1       1     1         0         0      0        0        # noqa: E501
b  4           2000   2       2     1         1         0      0        0        # noqa: E501
pykx.Identity(pykx.q('::'))