Skip to content

Pipeline

Pipeline .ml.createPipeline Generate a pipeline from a graph .ml.execPipeline Execute a valid pipeline

After generating a graph, convert it into an executable code structure, a pipeline. Use this library to validate the graph, then generate and execute a pipeline as follows.

Graph validation

For a graph to be valid all inputs to a node must be connected to an output sourced either from a configuration node or another functional node in the graph.

The inputs to a node and the outputs to which they connect must have the same type, defined when the graph was created.

Pipeline structure

Once the graph has been validated, generate the optimal execution path for the graph.

Algorithm for creating the path
  1. Generate all paths required by each node to be executed.
  2. Retrieve the longest path for each node in the graph.
  3. Find all dependencies for each of the longest paths.
  4. Reverse the ordering of the longest paths to ensure they are in the correct execution order.
  5. Retrieve the optimal execution order of nodes defined by 'razing' the longest paths (longest first) together and taking the distinct elements.

Generate a schema, based on the graph structure, containing the following information.

  • Boolean highlighting if the node has been executed successfully.
  • Any issues which arise in execution and what was the error.
  • The outputs of individual nodes at intermediate steps in execution and following complete execution of the pipeline.
  • The inputs required for the execution of a node in the order they are to be applied to the functionality contained within the node.
  • The expected input and output types of a node.
  • The function to be applied on the relevant datasets.
  • The mapping required to correctly populate the inputs to a node with required outputs from another node.
  • The expected ordering of inputs to the node to ensure that the variable ordering is correct on node execution.

Populate the pipeline schema with the node function, inputs, outputs and output mapping, with rows populated based on ordering retrieved from the generation of the optimal path.

Pipeline execution

  1. Retrieve the first incomplete node within the pipeline.
  2. Apply the required inputs to the node function, stop execution if this results in an error and highlight the error to the user.
  3. Add the outputs from the current node execution to any rows that require this data for future executions.

Repeat steps 1 to 3 until either all rows in the graph have been successfully executed or an error is encountered.

.ml.createPipeline

Generate a execution pipeline based on a valid graph

.ml.createPipeline[graph]

Where graph is a graph originally generated by .ml.createGraph, which has all relevant input edges connected validly, returns as a keyed table an optimal execution pipeline populated with all information required to allow its successful execution.

// Generate a simple valid graph
q)graph:.ml.createGraph[]

// Configuration containing x data
q)graph:.ml.addCfg[graph;`xData;enlist[`xData]!enlist desc 100?1f]

// Configuration containing y data
q)graph:.ml.addCfg[graph;`yData;enlist[`yData]!enlist asc 100?1f]

// Node to randomize y dataset
q)yRandInput:"!"
q)yRandOutput:"F"
q)yRandFunction:{yData:x`yData;neg[count yData]?yData}
q)yRandNode:`inputs`outputs`function!(yRandInput;yRandOutput;yRandFunction)
q)graph:.ml.addNode[graph;`yRand;yRandNode]

// Node to calculate correlation between two float vectors
q)corrInput:`xData`yData!"!F"
q)corrOutput:"f"
q)corrFunction:{x[`xData] cor y}
q)corrNode:`inputs`outputs`function!(corrInput;corrOutput;corrFunction)
q)graph:.ml.addNode[graph;`corr;corrNode]

// Connect edges together
q)graph:.ml.connectEdge[graph;`xData;`output;`corr;`xData]
q)graph:.ml.connectEdge[graph;`yData;`output;`yRand;`input]
q)graph:.ml.connectEdge[graph;`yRand;`output;`corr;`yData]

// Generate pipeline
q)show pipeline:.ml.createPipeline[graph]
nodeId| complete error function                                              ..
------| ---------------------------------------------------------------------..
yData | 0              @[;(,`yData)!,`s#0.00969842 0.01596794 0.02054163 0.02..
yRand | 0              ![,`output]@[enlist]{yData:x`yData;neg[count yData]?yD..
xData | 0              @[;(,`xData)!,0.9988041 0.9936284 0.9880844 0.9789487 ..
corr  | 0              ![,`output]@[enlist]{x[`xData] cor y}                 ..

.ml.execPipeline

Execute a generated pipeline

.ml.execPipeline[pipeline]

Where pipeline is a pipeline created by .ml.createPipeline, returns the pipeline with each node executed and appropriate outputs populated.

This allows you to retrieve relevant data from execution.

This example uses the pipeline generated in the .ml.createPipeline example above:

// Valid pipeline execution
q)pipeline:.ml.execPipeline[pipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr

// Display the pipeline
q)show pipeline
nodeId| complete error function                                              ..
------| ---------------------------------------------------------------------..
yData | 1              @[;(,`yData)!,`s#0.00969842 0.01596794 0.02054163 0.02..
yRand | 1              ![,`output]@[enlist]{yData:x`yData;neg[count yData]?yD..
xData | 1              @[;(,`xData)!,0.9988041 0.9936284 0.9880844 0.9789487 ..
corr  | 1              ![,`output]@[enlist]{x[`xData] cor y}                 ..

// Retrieve the outputs of the pipeline
q)exec outputs from pipeline
(,`)!,::
(,`)!,::
(,`)!,::
``output!(::;0.225908)

// Invalid example modifying the corr node the produce improper execution
q)pipeline:.ml.execPipeline[invalidPipeline]
Executing node: yData
Executing node: yRand
Executing node: xData
Executing node: corr
Error: rank

// Display the pipeline
q)show pipeline
nodeId| complete error function                                              ..
------| ---------------------------------------------------------------------..
yData | 1              @[;(,`yData)!,`s#0.004194243 0.006855978 0.01139698 0...
yRand | 1              ![,`output]@[enlist]{yData:x`yData;neg[count yData]?yD..
xData | 1              @[;(,`xData)!,0.9847626 0.9823238 0.9796802 0.9788011 ..
corr  | 0        rank  ![,`output]@[enlist]{x[`xData] cor string y}          ..