Skip to content

Machine Learning

Stream Processor Machine Learning Operators.

Machine Learning (ML) operators provide additional functionality to the Stream Processor for data science use-cases. This functionality spans the entire ML workflow from preprocessing to model deployment and validation.

kxi.sp.ml.drop_constant

Drop columns with constant values.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

Columns to drop. Either a column name as string to drop a single column, or a list of strings to drop several columns, or a dictionary with column names and expected constant values as the key-value pairs to drop several columns, or None to drop all constant columns. Note: The columns to be dropped will be determined by the first batch and stored for application to subsequent batches.

None
buffer_size int

The number of data points which must amass before columns are dropped.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a 'drop_constant' operator, which can be joined to other pipelines.

Examples:

Drop a constant column from a batch the data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.drop_constant('x')
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.ones(10),
        'a': np.random.randn(10),
        'b': np.random.randn(10)
    })
>>> kx.q('publish' , data)
a           b
----------------------
-0.04631683 0.06053949
2.087667    1.566185
1.284214    -1.854408
-0.6991486  0.8490234
0.1423437   -0.4395799
0.6394896   0.1380445
0.62278     0.3418176
2.258521    0.1042979
0.3900605   1.552235
0.1963714   0.3146865
pykx.Identity(pykx.q('::'))

kxi.sp.ml.fit

Fit a model to a batch of data.

Parameters:

Name Type Description Default
features Union[str, List[str], Callable]

Column name of the predictor variable, or the function required to generate predictors from a batch.

required
targets Union[str, Callable]

Column name of the target variable, or the function required to generate predictors from a batch, or None if an unsupervised model is to be trained.

required
untrained Function

Untrained model.

required
model_type ModelType

A ModelType enum value, or the string equivalent.

required
udf Union[str, Callable]

User-defined function used to score the quality of the model or to join predictions into the batch.

required
registry Union[str, dict]

Local/cloud registry to save the model to.

None
experiment Optional[str]

Name of the experiment within the registry to save the current model to.

None
model Optional[str]

Name of the current model to be stored within the registry. If 'model' is omitted or None then the current model will not be saved.

None
data

If provided with 'data', the function will attempt to parse out relevant statistical information associated with the data for use within model deployment.

None
requirements Union[bool, List[str], str]

Option to add Python requirements information associated with a model. Either a boolean value True indicating the use of pip freeze, or a symbol indicating the path to a requirements.txt file, or a list of strings defining the requirements to be added.

None
major bool

Boolean value indicating if model version is to be incremented as a 'major' version i.e. should the model be incremented from 1 0 to 2 0 (True) rather than 1 0 to 1 1 as is default (False).

False
major_version Optional[int]

The major version to be incremented. By default the function will increment major versions based on the current maximum model version present within the registry. However, 'major_version' allows users to define which major version of a model they wish to be incremented.

None
code Union[str, List[str]]

Reference to the location of any *.py, *.p, *.k or *.q files. These files are then automatically loaded on retrieval of the models using the *.get.* functionality.

None
axis bool

Boolean value indicating the required data format for model training, i.e. should the data have value flip (True) or flip value flip (False) applied before training.

False
supervise Optional[List[str]]

List of metrics to be used for supervised monitoring of the model.

None
model_args Optional[List]

List of arguments to pass to the model after X and y. If there is only a single argument, it must be enlisted.

None
buffer_size int

Number of records to buffer before training the model, or False to fit on the first batch. When the batch size is exceeded, any additional records will also be included when training.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a fit operator, which can be joined to other pipelines.

Examples:

Fit a model on a batch of data and view model in model store:

>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.fit('x',
                    'y',
                    sk.LogisticRegression(),
                    'q',
                    'yhat',
                    registry='/tmp',
                    model='qLC',
                    buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': 5*[False]+5*[True]
    })
>>> kx.q('publish' , data)
x          y yhat
-----------------
-0.6874269 0 0
-0.4545067 0 0
-0.3909506 0 0
-0.2482032 0 0
-0.2073246 0 0
-0.2059224 1 0
0.02122773 1 0
0.1623181  1 1
1.539742   1 1
2.056621   1 1
pykx.Identity(pykx.q('::'))
>>> ml.registry.get.model_store("/tmp")
               registrationTime experimentName  ... version description
0 2022-02-04 09:58:38.738060800   b'undefined'  ...  [1, 0]         b''

kxi.sp.ml.fresh_create

Apply FRESH feature creation to a batch of data.

Parameters:

Name Type Description Default
columns Union[str, List[str]]

Columns on which FRESH feature creation is to be applied, or None to apply to all columns within a batch.

None
features Union[str, List[str]]

Functions to be applied to each column in a batch as either a string naming the function to be applied, or list of strings naming the functions to be applied, or None to apply all applicable functions defined within 'kx.q.ml.fresh.params'. Note: A number of bespoke string values are supported including: 1. 'regression': Apply any functions appropriate for application on floating point data. 2. 'classification': Apply any functions appropriate for application on columns which contain 'classes'. 3. 'noHyperparameters': Apply any underlying functions which do not use configurable parameters.

None

Returns:

Type Description
Pipeline

A pipeline comprised of a fresh_create operator, which can be joined to other pipelines.

Examples:

Setting up a stream to compute three features, with a window to batch the data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.window.timer(np.timedelta64(10, 's'), count_trigger = 1)
        | sp.ml.fresh_create('x', ['min', 'max', 'absEnergy'])
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({'x': np.random.randn(10)})
>>> kx.q('publish', data)
x_absEnergy x_max    x_min
------------------------------
14.30635    0.917339 -2.102371
pykx.Identity(pykx.q('::'))

kxi.sp.ml.label_encode

Encode symbolic values as a numerical integer representation.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

String column name of column to be symbol encoded, or a list of strings containing the names of the columns to be encoded, or a dictionary with column names and their expected string/symbol values as the key-value pairs, or None to encode all symbolic columns.

None
buffer_size int

Number of data points which must amass before label encoding is applied.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a label_encode operator, which can be joined to other pipelines.

Examples:

Label encode two columns with labels given by a dictionary:

>>> from kxi import sp
>>> import pykx as kx
>>> import pandas as pd
>>> import numpy as np
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.label_encode({'x': ['a', 'b', 'c'], 'x1': ['d', 'e', 'f']})
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': ['a','a','b','c','c','a','a','b','c','c'],
        'x1': ['e','d','e','f','e','d','e','f','f','f'],
        'x2': np.random.randn(10)
    })
>>> kx.q('publish', data)
x x1 x2
----------------
0 1  0.5395699
0 0  1.725395
1 1  0.2059994
2 2  -0.01809984
2 1  -0.04201411
0 0  0.116805
0 1  0.6533205
1 2  1.394677
2 2  0.08629806
2 2  -0.6235565
pykx.Identity(pykx.q('::'))

kxi.sp.ml.linear_regression

Apply online linear regression model to a supplied dataset.

Parameters:

Name Type Description Default
features Union[str, List[str], Callable]

Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features.

required
labels Union[str, Callable]

Column to be used as label data. String column name, or a callable function.

required
udf Union[str, Callable]

A user-defined function as a string, or callable function used to append predictions to a batch of data.

required
trend bool

Boolean indicating if the model has a trend coefficient.

True
alpha float

Learning rate applied.

0.01
max_iter int

Maximum possible number of iterations before the run is terminated. This does not guarantee convergence.

100
g_tol float

Gradient tolerance, below which the run is terminated.

1e-05
theta Optional[List[float]]

Initial starting weights.

None
kk Optional[int]

Number of batches used or random points chosen each iteration.

None
seed Optional[int]

Random seed.

None
batch_type BatchType

A BatchType enum value, or the string equivalent.

<BatchType.shuffle: 'shuffle'>
penalty Penalty

Regularization term as a Penalty enum value, or the string equivalent.

<Penalty.l2: 'l2'>
reg_lambda float

Regularization coefficient.

0.001
l1_ratio float

Elastic net mixing parameter. This is only used if penalty type 'elasticNet' is applied.

0.5
decay float

Decay coefficient.

0.0
p float

Momentum coefficient.

0.0
verbose bool

Boolean indicating if information about the fitting process is to be printed after every epoch.

False
buffer_size int

Integer value which defines the number of data points which must amass before linear regression is applied.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a 'linear_regression' operator, which can be joined to other pipelines.

Examples:

Fit, predict, and update an online linear regression model on a stream:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.linear_regression('x', 'y', 'yHat', trend=True, buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': sorted(np.random.randn(10))
    })
>>> kx.q('publish', data)
x          y          yHat
---------------------------------
-1.65824   -1.451782  -1.403835
-1.122569  -1.024654  -1.078429
-0.7118562 -0.7974354 -0.8289319
-0.4585539 -0.6012899 -0.6750576
-0.301018  -0.5226072 -0.5793588
-0.1893682 -0.5012154 -0.5115346
0.05424706 -0.4776461 -0.3635449
0.4934012  -0.2861974 -0.09677065
1.237137   0.49091    0.3550285
1.468013   0.4976716  0.4952795
pykx.Identity(pykx.q('::'))

kxi.sp.ml.log_classifier

Apply online logistic classifier model to a supplied dataset.

Parameters:

Name Type Description Default
features Union[str, List[str], Callable]

Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features.

required
labels Union[str, Callable]

Column to be used as label data. String column name, or a callable function.

required
udf Union[str, Callable]

A user-defined function as a string, or callable function used to append predictions to a batch of data.

required
trend bool

Boolean indicating if the model has a trend coefficient.

True
alpha float

Learning rate applied.

0.01
max_iter int

Maximum possible number of iterations before the run is terminated. This does not guarantee convergence.

100
g_tol float

Gradient tolerance, below which the run is terminated.

1e-05
theta Optional[List[float]]

Initial starting weights.

None
kk Optional[int]

Number of batches used or random points chosen each iteration.

None
seed Optional[int]

Random seed.

None
batch_type BatchType

A BatchType enum value, or the string equivalent.

<BatchType.shuffle: 'shuffle'>
penalty Penalty

Regularization term as a Penalty enum value, or the string equivalent.

<Penalty.l2: 'l2'>
reg_lambda float

Regularization coefficient.

0.001
l1_ratio float

Elastic net mixing parameter. This is only used if penalty type 'elasticNet' is applied.

0.5
decay float

Decay coefficient.

0.0
p float

Momentum coefficient.

0.0
verbose bool

Boolean indicating if information about the fitting process is to be printed after every epoch.

False
buffer_size int

Integer value which defines the number of data points which must amass before linear regression is applied.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a 'log_classifier' operator, which can be joined to other pipelines.

Examples:

Fit, predict, and update an online log classifier model on a stream:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.log_classifier('x', 'y', 'yHat', trend=True, buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': sorted(np.random.randn(10)),
        'y': 5*[False]+5*[True]
    })
>>> kx.q('publish', data)
x          y yHat
-----------------
-1.053813  0 0
-0.1715366 0 0
-0.149147  0 0
0.06220506 0 0
0.3122359  0 0
0.8119877  1 1
0.9594937  1 1
1.093567   1 1
1.117492   1 1
1.778675   1 1
pykx.Identity(pykx.q('::'))

kxi.sp.ml.min_max_scaler

Apply min-max scaling to columns in a supplied dataset.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

Column(s) to be min-max scaled in string format, or a dictionary with column names and (min, max) tuples as the key-value pairs to min-max scale several columns, or None to min-max scale all columns.

None
buffer_size int

Number of data points which must amass before min-max scaling is applied.

0
range_error bool

Boolean indicating whether an error should be raised if new data falls outside the min-max range used for scaling.

False

Returns:

Type Description
Pipeline

A pipeline comprised of a min_max_scaler operator, which can be joined to other pipelines.

Examples:

Performs min-max scaling on a column of a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.min_max_scaler('x')
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10)
    })
>>> kx.q('publish', data)
x         x1
---------------------
0.1424131 -2.641588
1         -2.180433
0.6399939 -0.05007621
0.1463686 -1.386112
0         2.358236
0.1877115 0.2539063
0.1577937 -0.7334433
0.5413414 0.3316929
0.7087663 0.09127055
0.3445104 1.343761
pykx.Identity(pykx.q('::'))

kxi.sp.ml.one_hot_encode

Encode symbolic/string values as a boolean a numeric representation.

Parameters:

Name Type Description Default
columns Union[str, List[str], dict]

Column(s) to be one-hot encoded in string format, or a dictionary with column names and expected symbol/string values as the key-value pairs to be encoded, or None to encode all symbolic columns.

None
buffer_size int

The number of data points which must amass before symbolic data is encoded.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a one_hot_encode operator, which can be joined to other pipelines.

Examples:

Performs one-hot encoding on a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.one_hot_encode()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': ['a','a','b','c','c','a','b','c','a','b'],
        'x1': ['a','b','b','c','b','a','b','c','b','b'],
        'x2': np.random.randn(10),
        'x3': 5*["abc","def"]
    })
>>> kx.q('publish', data)
x2         x_a x_b x_c x1_a x1_b x1_c x3_abc x3_def
---------------------------------------------------
0.3340958  1   0   0   1    0    0    1      0
-1.71888   1   0   0   0    1    0    0      1
-0.8172669 0   1   0   0    1    0    1      0
-0.606363  0   0   1   0    0    1    0      1
1.246069   0   0   1   0    1    0    1      0
-0.1144463 1   0   0   1    0    0    0      1
-0.5357472 0   1   0   0    1    0    1      0
-1.103191  0   0   1   0    0    1    0      1
-1.013031  1   0   0   0    1    0    1      0
0.4447144  0   1   0   0    1    0    0      1
pykx.Identity(pykx.q('::'))

kxi.sp.ml.predict

Predicts a target variable using a model from the registry.

Parameters:

Name Type Description Default
features Union[str, List[str], Callable]

Column name of the predictor variable, or the function required to generate predictors from a batch.

required
udf Union[str, Callable]

User-defined function for integrating the predictions into the batch, or a column name to join predictions to the table.

required
registry Union[str, dict]

Registry to load models from.

None
experiment Optional[str]

Experiment within the registry to load models from.

None
model Optional[str]

Model within the registry to use for prediction.

None
version Optional[List[int]]

Model version to load in for prediction.

None

Returns:

Type Description
Pipeline

A pipeline comprised of a predict operator, which can be joined to other pipelines.

Examples:

Save a model to registry then retrieve it to serve predictions:

>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> data = pd.DataFrame({
        'x': np.random.randn(500),
        'x1': sorted(np.random.randn(500)),
        'x2': np.random.randn(500),
        'y': 250*[False]+250*[True]
    })
>>> X=data[['x','x1','x2']].to_numpy()
>>> y=data['y'].to_numpy()
>>> model=sk.LogisticRegression().fit(X,y)
>>> ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp")
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.predict(['x','x1','x2'],
                        'yhat',
                        registry='/tmp',
                        model='skLC')
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', data[['x','x1','x2']].iloc[:10])
x           x1        x2         yhat
-------------------------------------
1.327591    -3.220784 0.5501495  0
0.1169892   -2.989278 -0.4238209 0
0.2993643   -2.960673 -0.4045395 0
-0.06935565 -2.777971 -0.4855451 0
1.270259    -2.600012 -0.5126426 0
-0.04783622 -2.387729 0.1639452  0
2.075141    -2.307207 0.3014842  0
-0.3411683  -2.127389 0.3295938  0
-0.7522204  -2.08618  -0.9008618 0
-0.6811135  -2.065622 0.8068994  0
pykx.Identity(pykx.q('::'))

kxi.sp.ml.score

Evaluate the quality of predictions produced in a pipeline.

Parameters:

Name Type Description Default
y_true Union[str, Callable]

Column containing the true values as a string, or a callable function to retrieve the true values from the batch.

required
y_pred Union[str, Callable]

Column containing the predicted values as a string, or a callable function to retrieve the predicted values from the batch.

required
metric Metric

A Metric enum value, or the string equivalent.

required

Returns:

Type Description
Pipeline

A pipeline comprised of a score operator, which can be joined to other pipelines.

Examples:

Computes the mean squared error between true and predicted results:

>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> data = pd.DataFrame({
        'x' :np.random.randn(500),
        'x1':sorted(np.random.randn(500)),
        'x2':np.random.randn(500),
        'y' :250*[False] + 250*[True]
    })
>>> training = data.iloc[:450].reset_index(drop=True)
>>> testing = data.iloc[450:].reset_index(drop=True)
>>> X = training[['x','x1','x2']].to_numpy()
>>> y = training['y'].to_numpy()
>>> model = sk.LogisticRegression().fit(X,y)
>>> ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp")
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.predict(['x', 'x1', 'x2'], 'yhat', registry='/tmp', model='skLC')
        | sp.ml.score('y', 'yhat', 'mse')
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', testing)
0f
pykx.Identity(pykx.q('::'))

kxi.sp.ml.sequential_k_means

Apply sequential K-Means to columns in a supplied dataset.

Parameters:

Name Type Description Default
columns Union[str, List[str], Callable]

Columns to extract from data for clustering.

None
distance_function Distance

Similarity measure used for clustering points; a Distance enum value, or the string equivalent.

<Distance.e2dist: 'e2dist'>
centroids int

Number of cluster centers.

3
init_centers Optional[List[List[float]]]

Centers used for initialization of algorithm.

None
init bool

Boolean indicating how to initialize cluster centroids - K-means++ (True) or randomized (False).

True
a float

Learning rate value between 0-1 used to define how much past centroid information to retain.

0.1
forgetful bool

Apply forgetful (True) or normal sequential K-Means (False).

True
buffer_size int

Integer value defining the number of data points which must amass before initial clustering is applied.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a 'sequential_k_means' operator, which can be joined to other pipelines.

Examples:

Discover clusters in a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.sequential_k_means(['x', 'x1', 'x2'], buffer_size=10)
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': np.random.randn(10),
        'x1': np.random.randn(10),
        'x2': np.random.randn(10)
    })
>>> kx.q('publish', data)
x           x1         x2         cluster
-----------------------------------------
0.3682323   1.062062   -0.3968269 0
-0.02565208 0.686225   0.7747558  2
-0.6600282  -2.173931  1.659348   1
-1.488911   -0.5495659 1.058788   1
1.003232    1.376033   0.7314441  2
-1.171932   0.02992996 0.3506971  0
0.4667136   1.436048   -0.5011791 0
0.4810127   0.6619446  -0.7347636 0
-0.6200231  0.03087721 -0.1960447 0
0.5987606   0.6250107  -0.4962533 0
pykx.Identity(pykx.q('::'))

kxi.sp.ml.standardize

Apply standard scaling to columns in a supplied dataset.

Parameters:

Name Type Description Default
columns Union[str, List[str]]

Either column(s) to be standard scaled in string format, or None standard scale all columns.

None
buffer_size int

The number of data points which must amass before standard scaling is applied.

0

Returns:

Type Description
Pipeline

A pipeline comprised of a standardize operator, which can be joined to other pipelines.

Examples:

Apply standard scaling to a batch of data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
        | sp.ml.standardize()
        | sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({
        'x': 10*np.random.randn(10),
        'x1': 100*np.random.randn(10),
        'x2': 20*np.random.randn(10)
    })
>>> kx.q('publish', data)
x          x1         x2
--------------------------------
0.1535412  -1.69486   -0.9795338
-0.1140848 0.2757027  -0.6737665
0.9148957  1.232703   -0.3103389
0.5907271  0.8553826  -0.9874558
1.594768   0.07883309 -0.6399559
-1.289598  -1.63561   0.6606353
-0.3946993 -0.7768724 1.251814
-1.970569  1.105309   2.120371
0.7044533  0.01792667 0.2833964
-0.1894337 0.5414847  -0.725166
pykx.Identity(pykx.q('::'))