Skip to content

Stream Processor API

.qsp.ml. fit fit a model to a batch or buffer of data predict predict a target variable update train a model score score the performance of a model

.qsp.ml.fit

Fit a model to a batch or buffer of data

.qsp.ml.fit[X;y;untrained;modelType;udf;opt]

Where

argument type content
X symbol[], function predictor variable’s column names, or a function to generate the predictors from the batch
y symbol, function target variable column name, or a function to generate the predictors from the batch
untrained function an untrained q or Sklearn model
modelType string either q or sklearn
udf function, symbol function to score the quality of the model or join predictions into the batch; if symbol, append the predictions to the batch as a new column
opt dictionary, :: arguments from .qsp.use options where options is a dictionary with keys as below; or null

fits a model to a batch or buffer of data, optionally predicting the target variable for future batches after the model has been trained, and returns the current batch, modified in accordance with the udf argument.

Once fit, these models are published to the ML Registry according to the user-supplied configuration.

A udf function argument has syntax

udf[data;y;predictions;modelInfo]

where

argument type content
data any batch passed to the operator (only the data not the metadata)
y symbol, function target variable, as extracted by the y parameter
predictions predictions for each record in the batch
modelInfo :: currently unused and always set to ::

Valid options keys:

key type default description
registry string :: location of the registry to save a model to
experiment string :: name of the experiment a model is to be associated with
model string :: name to save the model with in the registry; if omitted, or ::, the model is not saved
predict boolean 0b if true, predict the target variable for all records; else error on any records received after the first batch
config dictionary ()!() The config parameter for .ml.registry.set.model
modelArgs list :: arguments to pass to the model after X and y
bufferSize long 0 number of records to buffer before training a model

If bufferSize is 0, the model will be fit on the first batch. If batch size is exceeded, additional records in that batch will also be included when training.

Tip

Use this functionality for fitting models using the Stream Processor that cannot be trained incrementally, for example

  • various clustering algorithms (CURE, Hierarchical Clustering, DBSCAN, etc.)
  • timeseries models (AR, ARMA, ARIMA etc.)

Fit a q model on a batch saving to a registry:

// Generate initial data to be used for fitting
a:500?1f
b:500?1f

data:([]a;b;y:a+b)

.qsp.onStart {publish data}

// Define execution pipeline
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.ml.fit[
    {delete y from x};
    {exec y from x};
    .ml.online.sgd.linearRegression;
    "q";
    `yhat;
    .qsp.use (!) . flip (
      (`registry ; "/tmp");
      (`model    ; "sgd");
      (`modelArgs; (1b; `maxIter`gTol`seed!(100;-0w;42)));
      (`predict  ; 1b)
    )
  ]
  .qsp.write.toConsole[]

Fit a Scikit-learn model saving to a registry:

// Generate initial data to be used for fitting
data:([]x:asc 100?1f;x1:100?1f;y:desc 100?5)

// Populate a random forest classifier expected
rfc:.p.import[`sklearn.ensemble][`:RandomForestClassifier][`max_depth pykw 2]

.qsp.onStart {publish data}

// Define execution pipeline
.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.ml.fit[
    {delete y from x};
    {exec y from x};
    rfc;
    "sklearn";
    `yhat;
    .qsp.use (!) . flip (
      (`registry; "/tmp");
      (`model   ; "rfc");
      (`predict ; 1b)
    )
  ]
  .qsp.write.toConsole[]

.qsp.ml.predict

Predict a target variable

.qsp.ml.predict[X;udf;opt]

Where

argument type content
X symbol[], function predictor variable’s column names, or a function to generate the predictors from the batch
udf function, symbol user-defined function for integrating the predictions into the batch, or a column name to join them to the table as a new column
opt dictionary, :: arguments from .qsp.use options where options is a dictionary with keys as below

predicts the target value for each record in the batch, using a model from the registry, and returns the current batch, modified in accordance with the udf parameter.

A user-defined function can join these predictions into the data, or do any arbitrary computation.

Lets you deploy a model saved within an ML-Registry to a Stream Processor pipeline.

A udf function argument has syntax

udf[data;predictions;modelInfo]

where

argument type content
data any batch passed to the operator (only the data not the metadata)
predictions predictions for each record in the batch
modelInfo :: (currently unused)

Valid options keys:

key type default description
registry string :: location of the registry to load from
experiment string :: name of the experiment a model is associated with
model string :: name of the model to be retrieved from the registry; if omitted, or ::, the latest model
version long[] :: version of model to be retrieved, major and minor as a pair of longs; if null or omitted, the latest version

Predict using a scikit-learn model from the registry adding predictions to initial data:

N:1000
data:([]x:asc N?1f;x1:desc N?10;x2:N?1f;y:asc N?5)

features:flip value flip delete y from data

clf1:.p.import[`sklearn.tree]`:DecisionTreeClassifier
clf1:clf1[`max_depth pykw 3]
clf1[`:fit][features;data`y]

clf2:.p.import[`sklearn.tree]`:DecisionTreeClassifier
clf2:clf2[`max_depth pykw 4]
clf2[`:fit][features;data`y]

// Save the model
.ml.registry.set.model["/tmp";clf1;"dtc";"sklearn";::]
.ml.registry.set.model["/tmp";clf2;"dtc";"sklearn";::]

.test.cache:()

.qsp.onStart {publish data}

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.ml.predict[
    {delete y from x};
    `yhat;
    .qsp.use (!) . flip (
      (`registry; "/tmp");
      (`model   ; "dtc");
      (`version ; 1 0 )
    )
  ]
  .qsp.write.toConsole[]

Predict using a q model from the registry adding predictions to the initial dataset:

// Define datasets for fitting the model
N:1000
data:([]x:N?1f;x1:N?1f;x2:N?1f)

// Fit a model
kmeansModel:.ml.clust.kmeans.fit[data`x`x1`x2;`e2dist;6;enlist[`iter]!enlist 1000]

// Set the model to the registry
.ml.registry.set.model["/tmp";kmeansModel;"kmModel";"q";::]

.qsp.onStart {publish data}

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.ml.predict[
    {x`x`x1`x2};
    `yhat;
    .qsp.use (!) . flip (
      (`registry; "/tmp");
      (`model   ; "kmModel")
    )
  ]
  .qsp.write.toConsole[]

.qsp.ml.score

Score the performance of a model

.qsp.ml.score[y;predictions;metric]

Where

argument type content
y symbol, function column name of the target variable; or a function to generate the target variable from the batch
predictions symbol, function column name of the predictions; or a function to generate the predictions from the batch
metric symbol metric on which to evaluate model performance

scores the performance of a model over time, allowing changes in model performance to be evaluated, and returns the score from the metric

The following metrics are currently supported

accuracy
f1
mse
rmse

Tip

Lets you evaluate the quality of predictions being made by a model. When evaluated this returns the metric evaluated against all predictions made thus far.

Fit a scikit-learn model saving to the registry, score the model on receipt of new data:

// Retrieve a dataset and format appropriately
dataset:.p.import[`sklearn.datasets;`:load_breast_cancer][]
X:dataset[`:data]`
y:dataset[`:target]`
data: ([] y: y) ,' flip (`$"x",/:string til count first X)!flip X

// Split data into training and testing set
temp: (floor .8 * count data) cut data
.test.training: temp 0
.test.testing : temp 1

features:flip value flip delete y from .test.training
targets :.test.training`y

// Train the model
clf:.p.import[`sklearn.tree]`:DecisionTreeClassifier
clf:clf[`max_depth pykw 3]
clf[`:fit][features;targets]

// Save the model
.ml.registry.set.model["/tmp";clf;"dtc"; "sklearn";()!()]

.qsp.onStart {publish .test.testing}

.qsp.run 
  .qsp.read.fromCallback[`publish]
  .qsp.ml.predict[
    {delete y from x};
    `pred;
    .qsp.use `registry`model!("/tmp";"dtc")
  ]
  .qsp.ml.score[`y; `pred; `f1]
  .qsp.write.toConsole[]

Fit a q model saving to the registry, score the model using accuracy on receipt of new data:

// Retrieve a dataset and format appropriately
dataset:.p.import[`sklearn.datasets;`:load_breast_cancer][]
X:dataset[`:data]`
y:dataset[`:target]`
data: ([] y: y) ,' flip (`$"x",/:string til count first X)!flip X

// Split data into training and testing set
temp: (floor .8 * count data) cut data
.test.training: temp 0
.test.testing : temp 1

features:flip value flip delete y from .test.training
targets :.test.training`y

// Train the model
mdl:.ml.online.sgd.logClassifier.fit[features;targets;1b;::]

// Save the model
.ml.registry.set.model["/tmp";mdl;"qmodel";"q";::]

.qsp.onStart {publish .test.testing}

.qsp.run 
  .qsp.read.fromCallback[`publish]
  .qsp.ml.predict[
    {delete y from x};
    `pred;
    .qsp.use `registry`model!("/tmp";"qmodel")
  ]
  .qsp.ml.score[`y; `pred; `accuracy]
  .qsp.write.toConsole[]

.qsp.ml.update

Train a model

.qsp.ml.update[X;y;udf;opt]

Where

argument type content
X symbol[], function predictor variable’s column names; or a function to generate the predictors from the batch
y symbol, function target variable’s column name; or a function to generate it from the batch
udf function, symbol user-defined function for integrating the predictions into the batch; or a column name to join them to the table as a new column
opt dictionary, :: arguments from .qsp.use options where options is a dictionary with eys as below

trains a model in the ML Registry, incrementally returning predictions for each record in a batch.

A user-defined function can join these predictions into the data, or do any arbitrary computation.

A udf function argument has syntax

udf[data;y;predictions;modelInfo]

where

argument type content
data any batch passed to the operator (only the data not the metadata)
y symbol target variable, as extracted by the y parameter
predictions predictions for each record in the batch
modelInfo :: (currently unused)

Valid options keys:

key type default description
registry string :: location of the registry to load from
experiment string :: name of the experiment a model is associated with.
model string :: name of the model to retrieve from registry; if omitted, or ::, the latest model
untrained function, embedpy :: untrained ML model, e.g. .ml.online.sgd.linearRegression; if omitted, the model is retrieved from the registry
modelType string :: type of model to save to the registry, either q or sklearn (not required when loading from registry)
modelArgs list :: arguments to pass to the model after X and y

Tip

Lets you deploy a model saved within an ML Registry to a Stream Processor and update the model weights incrementally.

Fit an untrained, updatable q model sgd, adding predictions to initial data:

// Initialize functionality and data required for running example
a:500?1f; b:500?1f
data:([]a;b;y:a+b)

.qsp.onStart {publish data}

.qsp.run 
  .qsp.read.fromCallback[`publish]
  .qsp.ml.update[
    {delete y from x};
    {exec y from x};
    `yhat;
    .qsp.use (!) . flip (
      (`registry  ; "/tmp");
      (`model     ; "sgd");
      (`untrained ;.ml.online.sgd.linearRegression);
      (`modelType ;"q");
      (`modelArgs ;(1b;()!()))
    )
  ]
  .qsp.write.toConsole[]