Machine Learning
Stream Processor Machine Learning Operators.
Machine Learning (ML) operators provide additional functionality to the Stream Processor for data science use-cases. This functionality spans the entire ML workflow from preprocessing to model deployment and validation.
kxi.sp.ml.drop_constant
    Drop columns with constant values.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str], dict] | Columns to drop. Either a column name as string to drop a single column, or a list
of strings to drop several columns, or a dictionary with column names and expected
constant values as the key-value pairs to drop several columns, or  | None | 
| buffer_size | int | The number of data points which must amass before columns are dropped. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a 'drop_constant' operator, which can be joined to other pipelines. | 
Examples:
Drop a constant column from a batch the data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.drop_constant('x')
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.ones(10),
        'a': np.random.randn(10),
        'b': np.random.randn(10)
    })
>>> kx.q('publish' , data)
a           b
----------------------
-0.04631683 0.06053949
2.087667    1.566185
1.284214    -1.854408
-0.6991486  0.8490234
0.1423437   -0.4395799
0.6394896   0.1380445
0.62278     0.3418176
2.258521    0.1042979
0.3900605   1.552235
0.1963714   0.3146865
pykx.Identity(pykx.q('::'))
kxi.sp.ml.fit
    Fit a model to a batch of data.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| features | Union[str, List[str], Callable] | Column name of the predictor variable, or the function required to generate predictors from a batch. | required | 
| targets | Union[str, Callable] | Column name of the target variable, or the function required to generate
predictors from a batch, or  | required | 
| untrained | Function | Untrained model. | required | 
| model_type | ModelType | A  | required | 
| udf | Union[str, Callable] | User-defined function used to score the quality of the model or to join predictions into the batch. | required | 
| registry | Union[str, dict] | Local/cloud registry to save the model to. | None | 
| experiment | Optional[str] | Name of the experiment within the registry to save the current model to. | None | 
| model | Optional[str] | Name of the current model to be stored within the registry. If 'model' is omitted or
 | None | 
| data | If provided with 'data', the function will attempt to parse out relevant statistical information associated with the data for use within model deployment. | None | |
| requirements | Union[bool, List[str], str] | Option to add Python requirements information associated with a model. Either
a boolean value  | None | 
| major | bool | Boolean value indicating if model version is to be incremented as a 'major' version
i.e. should the model be incremented from  | False | 
| major_version | Optional[int] | The major version to be incremented. By default the function will increment major versions based on the current maximum model version present within the registry. However, 'major_version' allows users to define which major version of a model they wish to be incremented. | None | 
| code | Union[str, List[str]] | Reference to the location of any  | None | 
| axis | bool | Boolean value indicating the required data format for model training, i.e. should the
data have  | False | 
| supervise | Optional[List[str]] | List of metrics to be used for supervised monitoring of the model. | None | 
| model_args | Optional[List] | List of arguments to pass to the model after X and y. If there is only a single argument, it must be enlisted. | None | 
| buffer_size | int | Number of records to buffer before training the model, or False to fit on the first batch. When the batch size is exceeded, any additional records will also be included when training. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Fit a model on a batch of data and view model in model store:
>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.fit('x',
                    'y',
                    sk.LogisticRegression(),
                    'q',
                    'yhat',
                    registry='/tmp',
                    model='qLC',
                    buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': 5*[False]+5*[True]
    })
>>> kx.q('publish' , data)
x          y yhat
-----------------
-0.6874269 0 0
-0.4545067 0 0
-0.3909506 0 0
-0.2482032 0 0
-0.2073246 0 0
-0.2059224 1 0
0.02122773 1 0
0.1623181  1 1
1.539742   1 1
2.056621   1 1
pykx.Identity(pykx.q('::'))
>>> ml.registry.get.model_store("/tmp")
               registrationTime experimentName  ... version description
0 2022-02-04 09:58:38.738060800   b'undefined'  ...  [1, 0]         b''
kxi.sp.ml.fresh_create
    Apply FRESH feature creation to a batch of data.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str]] | Columns on which FRESH feature creation is to be applied, or  | None | 
| features | Union[str, List[str]] | Functions to be applied to each column in a batch as either a string naming the
function to be applied, or list of strings naming the functions to be applied, or
 | None | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Setting up a stream to compute three features, with a window to batch the data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.window.timer(np.timedelta64(10, 's'), count_trigger = 1)
        | sp.ml.fresh_create('x', ['min', 'max', 'absEnergy'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({'x': np.random.randn(10)})
>>> kx.q('publish', data)
x_absEnergy x_max    x_min
------------------------------
14.30635    0.917339 -2.102371
pykx.Identity(pykx.q('::'))
kxi.sp.ml.label_encode
    Encode symbolic values as a numerical integer representation.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str], dict] | String column name of column to be symbol encoded, or a list of strings containing
the names of the columns to be encoded, or a dictionary with column names and their
expected string/symbol values as the key-value pairs, or  | None | 
| buffer_size | int | Number of data points which must amass before label encoding is applied. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Label encode two columns with labels given by a dictionary:
>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.label_encode({'x': ['a', 'b', 'c'], 'x1': ['d', 'e', 'f']})
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': ['a','a','b','c','c','a','a','b','c','c'],
        'x1': ['e','d','e','f','e','d','e','f','f','f'],
        'x2': np.random.randn(10)
    })
>>> kx.q('publish', data)
x x1 x2
----------------
0 1  0.5395699
0 0  1.725395
1 1  0.2059994
2 2  -0.01809984
2 1  -0.04201411
0 0  0.116805
0 1  0.6533205
1 2  1.394677
2 2  0.08629806
2 2  -0.6235565
pykx.Identity(pykx.q('::'))
kxi.sp.ml.linear_regression
    Apply online linear regression model to a supplied dataset.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| features | Union[str, List[str], Callable] | Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features. | required | 
| labels | Union[str, Callable] | Column to be used as label data. String column name, or a callable function. | required | 
| udf | Union[str, Callable] | A user-defined function as a string, or callable function used to append predictions to a batch of data. | required | 
| trend | bool | Boolean indicating if the model has a trend coefficient. | True | 
| alpha | float | Learning rate applied. | 0.01 | 
| max_iter | int | Maximum possible number of iterations before the run is terminated. This does not guarantee convergence. | 100 | 
| g_tol | float | Gradient tolerance, below which the run is terminated. | 1e-05 | 
| theta | Optional[List[float]] | Initial starting weights. | None | 
| kk | Optional[int] | Number of batches used or random points chosen each iteration. | None | 
| seed | Optional[int] | Random seed. | None | 
| batch_type | BatchType | A  | <BatchType.shuffle: 'shuffle'> | 
| penalty | Penalty | Regularization term as a  | <Penalty.l2: 'l2'> | 
| reg_lambda | float | Regularization coefficient. | 0.001 | 
| l1_ratio | float | Elastic net mixing parameter. This is only used if penalty type  | 0.5 | 
| decay | float | Decay coefficient. | 0.0 | 
| p | float | Momentum coefficient. | 0.0 | 
| verbose | bool | Boolean indicating if information about the fitting process is to be printed after every epoch. | False | 
| buffer_size | int | Integer value which defines the number of data points which must amass before linear regression is applied. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a 'linear_regression' operator, which can be joined to other pipelines. | 
Examples:
Fit, predict, and update an online linear regression model on a stream:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.linear_regression('x', 'y', 'yHat', trend=True, buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': sorted(np.random.randn(10))
    })
>>> kx.q('publish', data)
x          y          yHat
---------------------------------
-1.65824   -1.451782  -1.403835
-1.122569  -1.024654  -1.078429
-0.7118562 -0.7974354 -0.8289319
-0.4585539 -0.6012899 -0.6750576
-0.301018  -0.5226072 -0.5793588
-0.1893682 -0.5012154 -0.5115346
0.05424706 -0.4776461 -0.3635449
0.4934012  -0.2861974 -0.09677065
1.237137   0.49091    0.3550285
1.468013   0.4976716  0.4952795
pykx.Identity(pykx.q('::'))
kxi.sp.ml.log_classifier
    Apply online logistic classifier model to a supplied dataset.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| features | Union[str, List[str], Callable] | Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features. | required | 
| labels | Union[str, Callable] | Column to be used as label data. String column name, or a callable function. | required | 
| udf | Union[str, Callable] | A user-defined function as a string, or callable function used to append predictions to a batch of data. | required | 
| trend | bool | Boolean indicating if the model has a trend coefficient. | True | 
| alpha | float | Learning rate applied. | 0.01 | 
| max_iter | int | Maximum possible number of iterations before the run is terminated. This does not guarantee convergence. | 100 | 
| g_tol | float | Gradient tolerance, below which the run is terminated. | 1e-05 | 
| theta | Optional[List[float]] | Initial starting weights. | None | 
| kk | Optional[int] | Number of batches used or random points chosen each iteration. | None | 
| seed | Optional[int] | Random seed. | None | 
| batch_type | BatchType | A  | <BatchType.shuffle: 'shuffle'> | 
| penalty | Penalty | Regularization term as a  | <Penalty.l2: 'l2'> | 
| reg_lambda | float | Regularization coefficient. | 0.001 | 
| l1_ratio | float | Elastic net mixing parameter. This is only used if penalty type  | 0.5 | 
| decay | float | Decay coefficient. | 0.0 | 
| p | float | Momentum coefficient. | 0.0 | 
| verbose | bool | Boolean indicating if information about the fitting process is to be printed after every epoch. | False | 
| buffer_size | int | Integer value which defines the number of data points which must amass before linear regression is applied. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a 'log_classifier' operator, which can be joined to other pipelines. | 
Examples:
Fit, predict, and update an online log classifier model on a stream:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.log_classifier('x', 'y', 'yHat', trend=True, buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': 5*[False]+5*[True]
    })
>>> kx.q('publish', data)
x          y yHat
-----------------
-1.053813  0 0
-0.1715366 0 0
-0.149147  0 0
0.06220506 0 0
0.3122359  0 0
0.8119877  1 1
0.9594937  1 1
1.093567   1 1
1.117492   1 1
1.778675   1 1
pykx.Identity(pykx.q('::'))
kxi.sp.ml.min_max_scaler
    Apply min-max scaling to columns in a supplied dataset.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str], dict] | Column(s) to be min-max scaled in string format, or a dictionary with column names
and  | None | 
| buffer_size | int | Number of data points which must amass before min-max scaling is applied. | 0 | 
| range_error | bool | Boolean indicating whether an error should be raised if new data falls outside the min-max range used for scaling. | False | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Performs min-max scaling on a column of a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.min_max_scaler('x')
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10)
    })
>>> kx.q('publish', data)
x         x1
---------------------
0.1424131 -2.641588
1         -2.180433
0.6399939 -0.05007621
0.1463686 -1.386112
0         2.358236
0.1877115 0.2539063
0.1577937 -0.7334433
0.5413414 0.3316929
0.7087663 0.09127055
0.3445104 1.343761
pykx.Identity(pykx.q('::'))
kxi.sp.ml.one_hot_encode
    Encode symbolic/string values as a boolean a numeric representation.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str], dict] | Column(s) to be one-hot encoded in string format, or a dictionary with column
names and expected symbol/string values as the key-value pairs to be encoded, or  | None | 
| buffer_size | int | The number of data points which must amass before symbolic data is encoded. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Performs one-hot encoding on a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.one_hot_encode()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': ['a','a','b','c','c','a','b','c','a','b'],
        'x1': ['a','b','b','c','b','a','b','c','b','b'],
        'x2': np.random.randn(10),
        'x3': 5*["abc","def"]
    })
>>> kx.q('publish', data)
x2         x_a x_b x_c x1_a x1_b x1_c x3_abc x3_def
---------------------------------------------------
0.3340958  1   0   0   1    0    0    1      0
-1.71888   1   0   0   0    1    0    0      1
-0.8172669 0   1   0   0    1    0    1      0
-0.606363  0   0   1   0    0    1    0      1
1.246069   0   0   1   0    1    0    1      0
-0.1144463 1   0   0   1    0    0    0      1
-0.5357472 0   1   0   0    1    0    1      0
-1.103191  0   0   1   0    0    1    0      1
-1.013031  1   0   0   0    1    0    1      0
0.4447144  0   1   0   0    1    0    0      1
pykx.Identity(pykx.q('::'))
kxi.sp.ml.predict
    Predicts a target variable using a model from the registry.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| features | Union[str, List[str], Callable] | Column name of the predictor variable, or the function required to generate predictors from a batch. | required | 
| udf | Union[str, Callable] | User-defined function for integrating the predictions into the batch, or a column name to join predictions to the table. | required | 
| registry | Union[str, dict] | Registry to load models from. | None | 
| experiment | Optional[str] | Experiment within the registry to load models from. | None | 
| model | Optional[str] | Model within the registry to use for prediction. | None | 
| version | Optional[List[int]] | Model version to load in for prediction. | None | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Save a model to registry then retrieve it to serve predictions:
>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> data = pd.DataFrame({
        'x': np.random.randn(500),
        'x1': sorted(np.random.randn(500)),
        'x2': np.random.randn(500),
        'y': 250*[False]+250*[True]
    })
>>> X=data[['x','x1','x2']].to_numpy()
>>> y=data['y'].to_numpy()
>>> model=sk.LogisticRegression().fit(X,y)
>>> ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp")
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.predict(['x','x1','x2'],
                        'yhat',
                        registry='/tmp',
                        model='skLC')
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', data[['x','x1','x2']].iloc[:10])
x           x1        x2         yhat
-------------------------------------
1.327591    -3.220784 0.5501495  0
0.1169892   -2.989278 -0.4238209 0
0.2993643   -2.960673 -0.4045395 0
-0.06935565 -2.777971 -0.4855451 0
1.270259    -2.600012 -0.5126426 0
-0.04783622 -2.387729 0.1639452  0
2.075141    -2.307207 0.3014842  0
-0.3411683  -2.127389 0.3295938  0
-0.7522204  -2.08618  -0.9008618 0
-0.6811135  -2.065622 0.8068994  0
pykx.Identity(pykx.q('::'))
kxi.sp.ml.score
    Evaluate the quality of predictions produced in a pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| y_true | Union[str, Callable] | Column containing the true values as a string, or a callable function to retrieve the true values from the batch. | required | 
| y_pred | Union[str, Callable] | Column containing the predicted values as a string, or a callable function to retrieve the predicted values from the batch. | required | 
| metric | Metric | A  | required | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Computes the mean squared error between true and predicted results:
>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> data = pd.DataFrame({
        'x' :np.random.randn(500),
        'x1':sorted(np.random.randn(500)),
        'x2':np.random.randn(500),
        'y' :250*[False] + 250*[True]
    })
>>> training = data.iloc[:450].reset_index(drop=True)
>>> testing = data.iloc[450:].reset_index(drop=True)
>>> X = training[['x','x1','x2']].to_numpy()
>>> y = training['y'].to_numpy()
>>> model = sk.LogisticRegression().fit(X,y)
>>> ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp")
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.predict(['x', 'x1', 'x2'], 'yhat', registry='/tmp', model='skLC')
        | sp.ml.score('y', 'yhat', 'mse')
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', testing)
0f
pykx.Identity(pykx.q('::'))
kxi.sp.ml.sequential_k_means
    Apply sequential K-Means to columns in a supplied dataset.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str], Callable] | Columns to extract from data for clustering. | None | 
| distance_function | Distance | Similarity measure used for clustering points; a  | <Distance.e2dist: 'e2dist'> | 
| centroids | int | Number of cluster centers. | 3 | 
| init_centers | Optional[List[List[float]]] | Centers used for initialization of algorithm. | None | 
| init | bool | Boolean indicating how to initialize cluster centroids - K-means++ ( | True | 
| a | float | Learning rate value between 0-1 used to define how much past centroid information to retain. | 0.1 | 
| forgetful | bool | Apply forgetful ( | True | 
| buffer_size | int | Integer value defining the number of data points which must amass before initial clustering is applied. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a 'sequential_k_means' operator, which can be joined to other pipelines. | 
Examples:
Discover clusters in a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.sequential_k_means(['x', 'x1', 'x2'], buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> kx.q('publish', data)
x           x1         x2         cluster
-----------------------------------------
0.3682323   1.062062   -0.3968269 0
-0.02565208 0.686225   0.7747558  2
-0.6600282  -2.173931  1.659348   1
-1.488911   -0.5495659 1.058788   1
1.003232    1.376033   0.7314441  2
-1.171932   0.02992996 0.3506971  0
0.4667136   1.436048   -0.5011791 0
0.4810127   0.6619446  -0.7347636 0
-0.6200231  0.03087721 -0.1960447 0
0.5987606   0.6250107  -0.4962533 0
pykx.Identity(pykx.q('::'))
kxi.sp.ml.standardize
    Apply standard scaling to columns in a supplied dataset.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| columns | Union[str, List[str]] | Either column(s) to be standard scaled in string format, or  | None | 
| buffer_size | int | The number of data points which must amass before standard scaling is applied. | 0 | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | A pipeline comprised of a  | 
Examples:
Apply standard scaling to a batch of data:
>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.standardize()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': 10*np.random.randn(10),
        'x1': 100*np.random.randn(10),
        'x2': 20*np.random.randn(10)
    })
>>> kx.q('publish', data)
x          x1         x2
--------------------------------
0.1535412  -1.69486   -0.9795338
-0.1140848 0.2757027  -0.6737665
0.9148957  1.232703   -0.3103389
0.5907271  0.8553826  -0.9874558
1.594768   0.07883309 -0.6399559
-1.289598  -1.63561   0.6606353
-0.3946993 -0.7768724 1.251814
-1.970569  1.105309   2.120371
0.7044533  0.01792667 0.2833964
-0.1894337 0.5414847  -0.725166
pykx.Identity(pykx.q('::'))