Skip to content

ML Pipeline

ML Pipeline .ml.createPipeline Generate a pipeline from a graph .ml.execPipeline Execute a valid pipeline .ml.updDebug Update debugging mode

After generating a graph, convert it into an executable code structure, a pipeline. Use this library to validate the graph, then generate and execute an ML pipeline as follows.

Graph validation

For a graph to be valid all inputs to a node must be connected to an output sourced either from a configuration node or another functional node in the graph.

The inputs to a node and the outputs to which they connect must have the same type, defined when the graph was created.

Pipeline structure

Once the graph has been validated, generate the optimal execution path for the graph.

Algorithm for creating the path
  1. Generate all paths required by each node to be executed.
  2. Retrieve the longest path for each node in the graph.
  3. Find all dependencies for each of the longest paths.
  4. Reverse the ordering of the longest paths to ensure they are in the correct execution order.
  5. Retrieve the optimal execution order of nodes defined by 'razing' the longest paths (longest first) together and taking the distinct elements.

Generate a schema, based on the graph structure, containing the following information.

  • Boolean highlighting if the node has been executed successfully.
  • Any issues which arise in execution and what was the error.
  • The outputs of individual nodes at intermediate steps in execution and following complete execution of the pipeline.
  • The inputs required for the execution of a node in the order they are to be applied to the functionality contained within the node.
  • The expected input and output types of a node.
  • The function to be applied on the relevant datasets.
  • The mapping required to correctly populate the inputs to a node with required outputs from another node.
  • The expected ordering of inputs to the node to ensure that the variable ordering is correct on node execution.

Populate the pipeline schema with the node function, inputs, outputs and output mapping, with rows populated based on ordering retrieved from the generation of the optimal path.

Pipeline execution

  1. Retrieve the first incomplete node within the pipeline.
  2. Apply the required inputs to the node function, stop execution if this results in an error and highlight the error to the user.
  3. Add the outputs from the current node execution to any rows that require this data for future executions.

Repeat steps 1 to 3 until either all rows in the graph have been successfully executed or an error is encountered.


.ml.createPipeline

Generate a execution pipeline based on a valid graph

.ml.createPipeline graph

Where graph is a graph originally generated by .ml.createGraph, which has all relevant input edges connected validly, returns as a keyed table an optimal execution pipeline populated with all information required to allow its successful execution.

// Generate a simple valid graph
q)graph:.ml.createGraph[]

// Configuration containing x data
q)graph:.ml.addCfg[graph;`xData;enlist[`xData]!enlist desc 100?1f]

// Configuration containing y data
q)graph:.ml.addCfg[graph;`yData;enlist[`yData]!enlist asc 100?1f]

// Node to randomize y dataset
q)yRandInput:"!"
q)yRandOutput:"F"
q)yRandFunction:{yData:x`yData;neg[count yData]?yData}
q)yRandNode:`inputs`outputs`function!(yRandInput;yRandOutput;yRandFunct..
q)graph:.ml.addNode[graph;`yRand;yRandNode]

// Node to calculate correlation between two float vectors
q)corrInput:`xData`yData!"!F"
q)corrOutput:"f"
q)corrFunction:{x[`xData] cor y}
q)corrNode:`inputs`outputs`function!(corrInput;corrOutput;corrFunction)
q)graph:.ml.addNode[graph;`corr;corrNode]

// Connect edges together
q)graph:.ml.connectEdge[graph;`xData;`output;`corr;`xData]
q)graph:.ml.connectEdge[graph;`yData;`output;`yRand;`input]
q)graph:.ml.connectEdge[graph;`yRand;`output;`corr;`yData]

// Generate pipeline
q)show pipeline:.ml.createPipeline[graph]
nodeId| complete error function                                       ..
------| --------------------------------------------------------------..
yData | 0              @[;(,`yData)!,`s#0.00969842 0.01596794 0.020541..
yRand | 0              ![,`output]@[enlist]{yData:x`yData;neg[count yD..
xData | 0              @[;(,`xData)!,0.9988041 0.9936284 0.9880844 0.9..
corr  | 0              ![,`output]@[enlist]{x[`xData] cor y}          ..

.ml.execPipeline

Execute a generated pipeline

.ml.execPipeline pipeline

Where pipeline is a pipeline created by .ml.createPipeline, returns the pipeline with each node executed and appropriate outputs populated.

This allows you to retrieve relevant data from execution.

This example uses the pipeline generated in the .ml.createPipeline example above:

// Valid pipeline execution
q)pipeline:.ml.execPipeline[pipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr

// Display the pipeline
q)show pipeline
nodeId| complete error function                                       ..
------| --------------------------------------------------------------..
yData | 1              @[;(,`yData)!,`s#0.00969842 0.01596794 0.020541..
yRand | 1              ![,`output]@[enlist]{yData:x`yData;neg[count yD..
xData | 1              @[;(,`xData)!,0.9988041 0.9936284 0.9880844 0.9..
corr  | 1              ![,`output]@[enlist]{x[`xData] cor y}          ..

// Retrieve the outputs of the pipeline
q)exec outputs from pipeline
(,`)!,::
(,`)!,::
(,`)!,::
``output!(::;0.225908)

// Invalid example modifying the corr node the produce improper execution
q)pipeline:.ml.execPipeline[invalidPipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr
Error: rank

// Display the pipeline
q)show pipeline
nodeId| complete error function                                       ..
------| --------------------------------------------------------------..
yData | 1              @[;(,`yData)!,`s#0.004194243 0.006855978 0.0113..
yRand | 1              ![,`output]@[enlist]{yData:x`yData;neg[count yD..
xData | 1              @[;(,`xData)!,0.9847626 0.9823238 0.9796802 0.9..
corr  | 0        rank  ![,`output]@[enlist]{x[`xData] cor string y}   ..

.ml.updDebug

Update debugging mode

.ml.updDebug[]

Debugging is updated. The default value is 0b which disables errors occuring within the graph to be debugged

q).ml.graphDebug
0b
// Run pipeline when debugging is disabled
q).ml.execPipeline[pipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr
Error: type
nodeId| complete error function                                        ..
------| ---------------------------------------------------------------..
yData | 1              @[;(,`yData)!,`s#0.01318344 0.01563857 0.0394730..
yRand | 1              ![,`output]@[enlist]{yData:x`yData;neg[count yDa..
xData | 1              @[;(,`xData)!,0.9861346 0.9845271 0.9755778 0.96..
corr  | 0        type  ![,`output]@[enlist]{x[`xData] cor y+`e}        ..
// Update the graph debugging
q).ml.updDebug[]
q).ml.graphDebug
1b
// Run pipeline with debugging enabled
q).ml.execPipeline[pipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr
'type
  [3]  corrFunction:{x[`xData] cor y+`e}