Skip to content

kxi.sp.ml

Stream Processor Machine Learning Operators.

Machine Learning (ML) operators provide additional functionality to the Stream Processor for data science use-cases. This functionality spans the entire ML workflow from preprocessing to model deployment and validation.

BatchType Objects

class BatchType(AutoNameEnum)

Enum for the type of batches to use when training a model.

single

Single batch of k data points.

shuffle

Shuffle the dataset and split it into k batches.

shuffle_rep

Shuffle the dataset and create k batches with potential repeated data points.

non_shuffle

Keep the natural order of the dataset and take k batches.

no_batch

Take the whole dataset with its natural order2.

Distance Objects

class Distance(AutoNameEnum)

Enum to specify the distance metric to use with a model.

edist

Euclidean distance.

e2dist

Squared euclidean distance.

Metric Objects

class Metric(AutoNameEnum)

Enum to specify the metric to use to evaluate a model.

f1

F1 score.

accuracy

Accuracy score.

mse

Mean squared error.

rmse

Root mean squared error.

ModelType Objects

class ModelType(AutoNameEnum)

Enum to specify the type of ML model.

q

Q native model.

sklearn

Scikit-learn model.

Penalty Objects

class Penalty(AutoNameEnum)

Enum to specify the penalty/regularization to use when training a model.

l1

L1 regularization.

l2

L2 regularization.

elastic_net

Elastic Net regularization.

format_dict_string

def format_dict_string(folder_path: Union[str, dict] = None)

Format a folder_path into an appropriate representation.

Arguments:

  • folder_path - The folder_path as a str or appropriate dict.

Returns:

The dict or str formatted as an appropriate k object.

format_string

def format_string(name: str = None)

Convert a str object to a q character vector.

Arguments:

  • name - The name of the item to be converted.

Returns:

The name object converted from an str to a pykx.CharVector or unchanged.

drop_constant

@MLOperator
def drop_constant(columns: Optional[Union[str, List[str], dict]] = None,
                  *,
                  buffer_size: int = 0) -> MLOperator

Drop columns with constant values.

Arguments:

  • columns - Columns to drop. Either a column name as string to drop a single column, or a list of strings to drop several columns, or a dictionary with column names and expected constant values as the key-value pairs to drop several columns, or None to drop all constant columns. Note: The columns to be dropped will be determined by the first batch and stored for application to subsequent batches.
  • buffer_size - The number of data points which must amass before columns are dropped.

Returns:

A pipeline comprised of a 'drop_constant' operator, which can be joined to other pipelines.

Examples:

Drop a constant column from a batch the data:

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.drop_constant('x') | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - np.ones(10), - 'a' - np.random.randn(10), - 'b' - np.random.randn(10) }) kx.q('publish' , data)

fit

@MLOperator
def fit(features: Union[str, List[str], Callable],
        targets: Union[str, Callable],
        untrained: kx.Function,
        model_type: ModelType,
        prediction: Union[str, Callable],
        *,
        udf: Union[str, Callable] = None,
        buffer_size: int = 0,
        model_args: Optional[List] = None,
        model: Optional[str] = None,
        registry: Optional[Union[str, dict]] = None,
        experiment: Optional[str] = None,
        data: Optional[pd.DataFrame] = None,
        requirements: Optional[Union[bool, List[str], str]] = None,
        major: bool = False,
        major_version: Optional[int] = None,
        code: Optional[Union[str, List[str]]] = None,
        axis: bool = False,
        supervise: Optional[List[str]] = None) -> MLOperator

Fit a model to a batch of data.

Arguments:

  • features - Column name of the predictor variable, or the function required to generate predictors from a batch.
  • targets - Column name of the target variable, or the function required to generate predictors from a batch, or None if an unsupervised model is to be trained.
  • untrained - Untrained model.
  • model_type - A ModelType enum value, or the string equivalent.
  • prediction - Function used to score the quality of the model or to join predictions into the batch.
  • udf - Deprecated, replaced by prediction.
  • buffer_size - Number of records to buffer before training the model, or False to fit on the first batch. When the batch size is exceeded, any additional records will also be included when training.
  • model_args - List of arguments to pass to the model after X and y. If there is only a single argument, it must be enlisted.
  • model - Name of the current model to be stored within the registry. If 'model' is omitted or None then the current model will not be saved.
  • registry - Local/cloud registry to save the model to.
  • experiment - Name of the experiment within the registry to save the current model to.
  • data - If provided with 'data', the function will attempt to parse out relevant statistical information associated with the data for use within model deployment.
  • requirements - Option to add Python requirements information associated with a model. Either a boolean value True indicating the use of pip freeze, or a symbol indicating the path to a requirements.txt file, or a list of strings defining the requirements to be added.
  • major - Boolean value indicating if model version is to be incremented as a 'major' version i.e. should the model be incremented from 1 0 to 2 0 (True) rather than 1 0 to 1 1 as is default (False).
  • major_version - The major version to be incremented. By default the function will increment major versions based on the current maximum model version present within the registry. However, 'major_version' allows users to define which major version of a model they wish to be incremented.
  • code - Reference to the location of any *.py, *.p, *.k or *.q files. These files are then automatically loaded on retrieval of the models using the *.get.* functionality.
  • axis - Boolean value indicating the required data format for model training, i.e. should the data have value flip (True) or flip value flip (False) applied before training.
  • supervise - List of metrics to be used for supervised monitoring of the model.

Returns:

A fit operator, which can be joined to other operators or pipelines.

Examples:

Fit a model on a batch of data and view model in model store:

```python

from kxi import sp, ml import pykx as kx import numpy as np import pandas as pd import sklearn.linear_model as sk sp.run(sp.read.from_callback('publish') | sp.ml.fit('x', 'y', sk.LogisticRegression(), 'q', 'yhat', registry='/tmp', model='qLC', buffer_size=10) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - sorted(np.random.randn(10)), - 'y' - 5[False]+5[True] }) kx.q('publish' , data) ml.registry.get.model_store("/tmp") registrationTime experimentName ... version description 0 2022-02-04 09:58:38.738060800 b'undefined' ... [1, 0] b'' ```

fresh_create

@MLOperator
def fresh_create(
        columns: Optional[Union[str, List[str]]] = None,
        features: Optional[Union[str, List[str]]] = None) -> MLOperator

Apply FRESH feature creation to a batch of data.

Arguments:

  • columns - Columns on which FRESH feature creation is to be applied, or None to apply to all columns within a batch.
  • features - Functions to be applied to each column in a batch as either a string naming the function to be applied, or list of strings naming the functions to be applied, or None to apply all applicable functions defined within 'kx.q.ml.fresh.params'.
  • Note - A number of bespoke string values are supported including:
  • 'regression': Apply any functions appropriate for application on floating point data.
  • 'classification': Apply any functions appropriate for application on columns which contain 'classes'.
  • 'noHyperparameters': Apply any underlying functions which do not use configurable parameters.

Returns:

A fresh_create operator, which can be joined to other operators or pipelines.

Examples:

Setting up a stream to compute three features, with a window to batch the data:

>>> from kxi import sp
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> sp.run(sp.read.from_callback('publish')
| sp.window.timer(np.timedelta64(10, 's'), count_trigger = 1)
| sp.ml.fresh_create('x', ['min', 'max', 'absEnergy'])
| sp.write.to_console(timestamp='none'))
>>> data = pd.DataFrame({'x': np.random.randn(10)})
>>> kx.q('publish', data)
x_absEnergy x_max    x_min
------------------------------
14.30635    0.917339 -2.102371
pykx.Identity(pykx.q('::'))

label_encode

@MLOperator
def label_encode(
        columns: Optional[Union[str, List[str], dict]] = None) -> MLOperator

Encode symbolic values as a numerical integer representation.

Arguments:

  • columns - String column name of column to be symbol encoded, or a list of strings containing the names of the columns to be encoded, or a dictionary with column names and their expected string/symbol values as the key-value pairs, or None to encode all symbolic columns.

Returns:

A label_encode operator, which can be joined to other operators or pipelines.

Examples:

Label encode two columns with labels given by a dictionary:

```python

from kxi import sp import pykx as kx import pandas as pd import numpy as np sp.run(sp.read.from_callback('publish') | sp.ml.label_encode({'x': ['a', 'b', 'c'], 'x1': ['d', 'e', 'f']}) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - ['a','a','b','c','c','a','a','b','c','c'], - 'x1' - ['e','d','e','f','e','d','e','f','f','f'], - 'x2' - np.random.randn(10) }) kx.q('publish', data) ```

linear_regression

@MLOperator
def linear_regression(features: Union[str, List[str], Callable],
                      labels: Union[str, Callable],
                      prediction: Union[str, Callable],
                      *,
                      udf: Union[str, Callable] = None,
                      trend: bool = True,
                      alpha: float = 0.01,
                      max_iter: int = 100,
                      g_tol: float = 1e-5,
                      seed: Optional[int] = None,
                      penalty: Penalty = Penalty.l2,
                      lambda_: float = 0.001,
                      l1_ratio: float = 0.5,
                      decay: float = 0.0,
                      p: float = 0.0,
                      buffer_size: int = 0) -> MLOperator

Apply online linear regression model to a supplied dataset.

Arguments:

  • features - Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features.
  • labels - Column to be used as label data. String column name, or a callable function.
  • prediction - Function as a string, or callable function used to append predictions to a batch of data.
  • udf - Deprecated, replaced by prediction.
  • trend - Boolean indicating if the model has a trend coefficient.
  • alpha - Learning rate applied.
  • max_iter - Maximum possible number of iterations before the run is terminated. This does not guarantee convergence.
  • g_tol - Gradient tolerance, below which the run is terminated.
  • seed - Random seed.
  • penalty - Regularization term as a Penalty enum value, or the string equivalent.
  • lambda_ - Regularization coefficient.
  • l1_ratio - Elastic net mixing parameter. This is only used if penalty type 'elasticNet' is applied.
  • decay - Decay coefficient.
  • p - Momentum coefficient.
  • buffer_size - Integer value which defines the number of data points which must amass before linear regression is applied.

Returns:

A pipeline comprised of a 'linear_regression' operator, which can be joined to other pipelines.

Examples:

Fit, predict, and update an online linear regression model on a stream:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.linear_regression('x', 'y', 'yHat', trend=True, buffer_size=10) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - sorted(np.random.randn(10)), - 'y' - sorted(np.random.randn(10)) }) kx.q('publish', data) ```

log_classifier

@MLOperator
def log_classifier(features: Union[str, List[str], Callable],
                   labels: Union[str, Callable],
                   prediction: Union[str, Callable],
                   *,
                   udf: Union[str, Callable] = None,
                   trend: bool = True,
                   alpha: float = 0.01,
                   max_iter: int = 100,
                   g_tol: float = 1e-5,
                   seed: Optional[int] = None,
                   penalty: Penalty = Penalty.l2,
                   lambda_: float = 0.001,
                   l1_ratio: float = 0.5,
                   decay: float = 0.0,
                   p: float = 0.0,
                   buffer_size: int = 0) -> MLOperator

Apply online logistic classifier model to a supplied dataset.

Arguments:

  • features - Columns to be extracted as features. Either the individual column name as a string, or the list of column names as strings, or a callable function to extract the relevant features.
  • labels - Column to be used as label data. String column name, or a callable function.
  • prediction - A function as a string, or callable function used to append predictions to a batch of data.
  • udf - Deprecated, replaced by prediction.
  • trend - Boolean indicating if the model has a trend coefficient.
  • alpha - Learning rate applied.
  • max_iter - Maximum possible number of iterations before the run is terminated. This does not guarantee convergence.
  • g_tol - Gradient tolerance, below which the run is terminated.
  • seed - Random seed.
  • penalty - Regularization term as a Penalty enum value, or the string equivalent.
  • lambda_ - Regularization coefficient.
  • l1_ratio - Elastic net mixing parameter. This is only used if penalty type 'elasticNet' is applied.
  • decay - Decay coefficient.
  • p - Momentum coefficient.
  • buffer_size - Integer value which defines the number of data points which must amass before linear regression is applied.

Returns:

A pipeline comprised of a 'log_classifier' operator, which can be joined to other pipelines.

Examples:

Fit, predict, and update an online log classifier model on a stream:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.log_classifier('x', 'y', 'yHat', trend=True, buffer_size=10) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - sorted(np.random.randn(10)), - 'y' - 5[False]+5[True] }) kx.q('publish', data) ```

min_max_scaler

@MLOperator
def min_max_scaler(columns: Optional[Union[str, List[str], dict]] = None,
                   *,
                   buffer_size: int = 0,
                   range_error: bool = False) -> MLOperator

Apply min-max scaling to columns in a supplied dataset.

Arguments:

  • columns - Column(s) to be min-max scaled in string format, or a dictionary with column names and (min, max) tuples as the key-value pairs to min-max scale several columns, or None to min-max scale all columns.
  • buffer_size - Number of data points which must amass before min-max scaling is applied.
  • range_error - Boolean indicating whether an error should be raised if new data falls outside the min-max range used for scaling.

Returns:

A min_max_scaler operator, which can be joined to other operators or pipelines.

Examples:

Performs min-max scaling on a column of a batch of data:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.min_max_scaler('x') | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - np.random.randn(10), - 'x1' - np.random.randn(10) }) kx.q('publish', data) ```

one_hot_encode

@MLOperator
def one_hot_encode(columns: Optional[Union[str, List[str], dict]] = None,
                   *,
                   buffer_size: int = 0) -> MLOperator

Encode symbolic/string values as a boolean a numeric representation.

Arguments:

  • columns - Column(s) to be one-hot encoded in string format, or a dictionary with column names and expected symbol/string values as the key-value pairs to be encoded, or None to encode all symbolic columns.
  • buffer_size - The number of data points which must amass before symbolic data is encoded.

Returns:

A one_hot_encode operator, which can be joined to other operators or pipelines.

Examples:

Performs one-hot encoding on a batch of data:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.one_hot_encode() | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - ['a','a','b','c','c','a','b','c','a','b'], - 'x1' - ['a','b','b','c','b','a','b','c','b','b'], - 'x2' - np.random.randn(10), - 'x3' - 5*["abc","def"] }) kx.q('publish', data) ```

predict

@MLOperator
def predict(features: Union[str, List[str], Callable],
            prediction: Union[str, Callable],
            *,
            udf: Union[str, Callable] = None,
            model: Optional[str] = None,
            registry: Optional[Union[str, dict]] = None,
            experiment: Optional[str] = None,
            version: Optional[List[int]] = None) -> MLOperator

Predicts a target variable using a model from the registry.

Arguments:

  • features - Column name of the predictor variable, or the function required to generate predictors from a batch.
  • prediction - Function for integrating the predictions into the batch, or a column name to join predictions to the table.
  • udf - Deprecated, replaced by prediction.
  • model - Model within the registry to use for prediction.
  • registry - Registry to load models from.
  • experiment - Experiment within the registry to load models from.
  • version - Model version to load in for prediction.

Returns:

A predict operator, which can be joined to other operators or pipelines.

Examples:

Save a model to registry then retrieve it to serve predictions:

```python

from kxi import sp, ml import pykx as kx import numpy as np import pandas as pd import sklearn.linear_model as sk data = pd.DataFrame({ - 'x' - np.random.randn(500), - 'x1' - sorted(np.random.randn(500)), - 'x2' - np.random.randn(500), - 'y' - 250[False]+250[True] }) X=data[['x','x1','x2']].to_numpy() y=data['y'].to_numpy() model=sk.LogisticRegression().fit(X,y) ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp") sp.run(sp.read.from_callback('publish') | sp.ml.predict(['x','x1','x2'], 'yhat', registry='/tmp', model='skLC') | sp.write.to_console(timestamp='none')) kx.q('publish', data[['x','x1','x2']].iloc[:10]) ```

score

@MLOperator
def score(y_true: Union[str, Callable], y_pred: Union[str, Callable],
          metric: Metric) -> MLOperator

Evaluate the quality of predictions produced in a pipeline.

Arguments:

  • y_true - Column containing the true values as a string, or a callable function to retrieve the true values from the batch.
  • y_pred - Column containing the predicted values as a string, or a callable function to retrieve the predicted values from the batch.
  • metric - A Metric enum value, or the string equivalent.

Returns:

A score operator, which can be joined to other operators or pipelines.

Examples:

Computes the mean squared error between true and predicted results:

>>> from kxi import sp, ml
>>> import pykx as kx
>>> import numpy as np
>>> import pandas as pd
>>> import sklearn.linear_model as sk
>>> data = pd.DataFrame({
'x' :np.random.randn(500),
'x1':sorted(np.random.randn(500)),
'x2':np.random.randn(500),
'y' :250*[False] + 250*[True]
})
>>> training = data.iloc[:450].reset_index(drop=True)
>>> testing = data.iloc[450:].reset_index(drop=True)
>>> X = training[['x','x1','x2']].to_numpy()
>>> y = training['y'].to_numpy()
>>> model = sk.LogisticRegression().fit(X,y)
>>> ml.registry.set.model(model,"skLC",'sklearn',folder_path="/tmp")
>>> sp.run(sp.read.from_callback('publish')
| sp.ml.predict(['x', 'x1', 'x2'], 'yhat', registry='/tmp', model='skLC')
| sp.ml.score('y', 'yhat', 'mse')
| sp.write.to_console(timestamp='none'))
>>> kx.q('publish', testing)
0f
pykx.Identity(pykx.q('::'))

sequential_k_means

@MLOperator
def sequential_k_means(columns: Optional[Union[str, List[str],
                                               Callable]] = None,
                       cluster: Optional[str] = None,
                       *,
                       udf: Optional[str] = None,
                       distance_function: Distance = Distance.e2dist,
                       centroids: int = 3,
                       init_centers: Optional[List[List[float]]] = None,
                       init: bool = True,
                       alpha: float = 0.1,
                       forgetful: bool = True,
                       buffer_size: int = 0) -> MLOperator

Apply sequential K-Means to columns in a supplied dataset.

Arguments:

  • columns - Columns to extract from data for clustering.
  • cluster - Column name for integrating the predictions into the batch.
  • udf - Deprecated, replaced by cluster.
  • distance_function - Similarity measure used for clustering points; a Distance enum value, or the string equivalent.
  • centroids - Number of cluster centers.
  • init_centers - Centers used for initialization of algorithm.
  • init - Boolean indicating how to initialize cluster centroids - K-means++ (True) or randomized (False).
  • alpha - Learning rate value between 0-1 used to define how much past centroid information to retain.
  • forgetful - Apply forgetful (True) or normal sequential K-Means (False).
  • buffer_size - Integer value defining the number of data points which must amass before initial clustering is applied.

Returns:

A pipeline comprised of a 'sequential_k_means' operator, which can be joined to other pipelines.

Examples:

Discover clusters in a batch of data:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.sequential_k_means(['x', 'x1', 'x2'], 'myClusters', buffer_size=10) | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - np.random.randn(10), - 'x1' - np.random.randn(10), - 'x2' - np.random.randn(10) }) kx.q('publish', data) ```

standardize

@MLOperator
def standardize(columns: Optional[Union[str, List[str]]] = None,
                *,
                buffer_size: int = 0) -> MLOperator

Apply standard scaling to columns in a supplied dataset.

Arguments:

  • columns - Either column(s) to be standard scaled in string format, or None standard scale all columns.
  • buffer_size - The number of data points which must amass before standard scaling is applied.

Returns:

A standardize operator, which can be joined to other operators or pipelines.

Examples:

Apply standard scaling to a batch of data:

```python

from kxi import sp import pykx as kx import numpy as np import pandas as pd sp.run(sp.read.from_callback('publish') | sp.ml.standardize() | sp.write.to_console(timestamp='none')) data = pd.DataFrame({ - 'x' - 10np.random.randn(10), - 'x1' - 100np.random.randn(10), - 'x2' - 20*np.random.randn(10) }) kx.q('publish', data) ```