kxi.sp.transform
Stream Processor Transforms
InputType Objects
class InputType(AutoNameEnum)
Input type of data to schema plugin.
arrays
data is always array type
table
data is always table type
auto
data can be of mixed type
Parse Objects
class Parse(AutoNameEnum)
Parse string data to other types.
on
on
off
off
auto
auto
schema
@Transformer
def schema(table: dict,
           *,
           input_type: InputType = InputType.auto,
           parse: Parse = Parse.auto) -> Transformer
Apply schema to the incoming data, imposing the outgoing table format.
Arguments:
- table- Dictionary of keys and values to define schema. Keys specify the columns names. The value associated with a key is the pykx type of the column e.g.- kx.FloatAtom.
- input_type- The data type of the stream (one of arrays, table or auto), Choosing array or table will result in evaluation optimization at compile time.
- parse- Indicates whether parsing of input string data into other datatypes is required. One of auto, on or off. If on or off, optimization of processing can be done.
Returns:
A schema transformer, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> sp.run(sp.read.from_callback('publish')
            | sp.transform.schema({'name': kx.SymbolAtom, 'date': kx.DateAtom})
            | sp.write.to_variable('out'))
>>> kx.q('publish', [{'name': 'Apple', 'date': datetime(2023,1,4)},
        {'name': 'Google', 'date': datetime(2023,1,4)}])
>>> kx.q('out')
name   date
-----------------
Apple  2023.01.04
Google 2023.01.04
fill
@Transformer
def fill(defaults: dict, mode: str = "static") -> Transformer
(Beta Feature)Fill null values in streaming data.
Notes:
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to
  true.
Arguments:
- defaults- Dictionary of keys and values to replace nulls. Keys specify the columns to replace null values for. The value associated with a key is what's used as the default fill for the corresponding column.
- mode- Alternatively you may specify a mode for replacement of nulls. Specifies the approach to use when filling. One of- static,- down, or- up. In static mode every null value in a column is replaced with its corresponding default. In up mode we do an up (backward) fill, where if the last entry in the current column is null it is replaced with its corresponding default. Finally, in down mode we do a down (forward) fill, where for the first table passed in, any column whose first entry is null has it replaced with the column's corresponding default
Returns:
A fill transformer, which can be joined to other operators or pipelines.
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> df = pd.DataFrame({'x': [None, 2, None]})
>>> sp_pipeline(sp.transform.fill({'x': 1}))
>>> kx.q("publish", df)
>>> kx.q(".test.cache")
x
-
1
2
1
rename_columns
@Transformer
def rename_columns(nameScheme: dict) -> Transformer
Rename columns according to the dictionary provided
Arguments:
- nameScheme- A dictionary defining the columns to be renamed and their new names.
Returns:
A rename transformer, which can be joined to other operators or pipelines.
Examples:
Renames the columns in a batch of data (must be a table)
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.rename_columns({'a': 'x', 'b': 'y'})
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'a': np.random.randn(10),
        'b': np.random.randn(10),
        'z': np.random.randn(10)
    })
>>> kx.q('publish', data)
x          y          z
---------------------------------
-0.8346907 -0.6609932 -1.807997
0.9907844  -0.2233735 0.7030451
2.021636   0.8098854  0.3018809
0.2035373  1.373164   0.8300498
-0.1250729 1.424175   -0.07679308
2.156653   -0.2182297 -0.458493
-0.2572752 -2.013144  1.027509
2.010659   1.397499   -0.7732002
-1.143055  -1.739128  0.8732918
-0.4201101 0.635765   0.5426539
replace_infinity
@Transformer
def replace_infinity(columns: Optional[Union[str, List[str]]] = None,
                     *,
                     new_column: bool = False) -> Transformer
Replace infinite values in columns based on previously seen data.
Positive infinity is replaced with the largest previously seen value in its column. Negative infinity is replaced with the smallest previously seen value in its column.
Arguments:
- columns- Column(s) to apply infinity replace to in string format.
- new_column- Boolean indicating whether to add new column (- True) which indicates the presence of infinite values for a given row.
Returns:
An infinity_replace transformer, which can be joined to other operators or pipelines.
Examples:
Replaces the infinite values in a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_infinity(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.inf
>>> kx.q('publish' , data)
    x          x1         x2
---------------------------------
1.122557   1.091329   -0.9192559
-0.2272307 0.1437045  -1.314342
1.852872   -0.8079581 -0.04237484
-1.311179  -0.4326425 1.383893
-0.7943273 -1.13575   -0.437631
0.1900052  -1.223518  -0.9580604
0.2486013  -1.104549  0.8776486
1.109844   0.380195   0.898798
0.9993286  0.4093832  -1.477295
1.852872   0.7642802  0.4787892
replace_null
@Transformer
def replace_null(columns: Optional[Union[str, List[str], dict]] = None,
                 *,
                 buffer_size: int = 0,
                 new_column: bool = False) -> Transformer
Replace null values with the median of a supplied batch.
Arguments:
- columns- Column(s) to apply null replace to in string format, or- Nonereplace nulls in all float64 columns within a batch.
- buffer_size- Number of data points which must amass before a determination is made of the median value for specified columns and after which null replacement is applied.
- new_column- Boolean indicating whether a new column is to be added to indicate the presence of null values for a given row.
Returns:
A null_replace transformer, which can be joined to other operators or pipelines.
Examples:
Replaces the null values in a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.replace_null(['x', 'x1'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> data['x'][9] = np.nan
>>> kx.q('publish', data)
    x          x1         x2
--------------------------------
0.9570144  0.3184851  1.184641
-2.361744  -0.2467937 -0.84709
0.483128   0.1582659  0.3794346
-0.9834509 0.3171456  0.5043843
2.389175   0.118148   -0.5278073
0.9906622  -1.987247  -0.831912
0.7275034  0.05414338 0.9702141
0.2681021  1.785524   1.305275
0.4028538  1.23407    0.2663236
0.483128   -1.194168  1.019275
time_split
@Transformer
def time_split(columns: Optional[Union[str, List[str]]] = None,
               *,
               delete: bool = True) -> Transformer
Decompose date/time columns into respective parts (minutes, hours, days etc.)
Arguments:
- columns- The names of the columns to be temporally decomposed, or- Noneto decompose all date/time columns.
- delete- Whether the original temporal columns which have been decomposed should be removed from the batch.
Returns:
A time_split transformer, which can be joined to other operators or pipelines.
Examples:
Apply time split to reduce times to seasonal components:
>>> from kxi import sp
>>> import pykx as kx
>>> from datetime import datetime
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.transform.time_split()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x':[datetime(2000,1,1), datetime(2000,2,2)],
        'x1':['a', 'b']
    })
>>> kx.q('publish', data)
a 0 2000 1 1 1 0 0 0 0 b 4 2000 2 2 1 1 0 0 0 ```