Skip to content

Lifecycle

.qsp.finishTask

Mark a task as finished

.qsp.finishTask[op;taskID]

Parameters:

name q type description
op .qsp.op A pipeline operator configuration, or the name of the operator as a symbol.
taskID int A task identifier.

marks the task as finished.

If all tasks for the operator are finished, and the operator has been requested to finish, the operator is finished.

Register an asynchronous task for true async using .qsp.apply:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task that represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        data`url;
        "GET";
        ``callback!(
          ::;
          {[op;md;data;tid;r]        // GET request with a callback
            .qsp.finishTask[op;tid]; //   Mark the task as finished
            data[`response]: r 1;    //   Add GET request response to data
            .qsp.push[op;md;data];   //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      )
    }
  ]
  .qsp.write.toConsole[]

publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url     | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html  ..

See registerTask for more information

.qsp.onError

Set the onError handler for the pipeline

.qsp.onError[handler]

Parameters:

name q type description
handler function A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error.

The onError event handler is called when an error occurs in a pipeline. The handler will be passed an error message, the operator that threw the error, and the batch that caused the error.

.qsp.onCheckpoint

Set the onCheckpoint handler

.qsp.onCheckpoint[handler]

Parameters:

name q type description
handler function A nullary function to execute during checkpointing.

This sets the onCheckpoint handler for the pipeline.

The onCheckpoint handler for the pipeline is called whenever a checkpoint is created. Its result is saved to the checkpoint, and later passed into the pipeline onRecover handler.

.qsp.onOperatorCheckpoint

Set the onOperatorCheckpoint event handler for a specific operator

.qsp.onOperatorCheckpoint[id;handler]

Parameters:

name q type description
id symbol An operator ID.
handler function A unary function which executes before an operator performs a checkpoint

sets the onOperatorCheckpoint event handler of the operator with id.

Its argument is the operator.

The onOperatorCheckpoint event handler is called when a checkpoint is created, before operator state is saved. This provides an opportunity for an operator to make updates to its state before it is checkpointed. The return value of the handler is also added to the checkpoint, and will be passed to the operators onOperatorRecover function on recovery.

Set onOperatorCheckpoint handler:

.qsp.onOperatorCheckpoint[`map;{[op]
    log.info ("Operator state:\n%s"; .Q.s .qsp.get[op;::]);
    }]

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{[op;md;x]
        .qsp.set[op;md] .qsp.get[op;md] upsert x;
        x
        }; .qsp.use `state`name!(();`map)]
    .qsp.write.toConsole[];

.qsp.onOperatorPostCheckpoint

Set the onOperatorPostCheckpoint event handler for a specific operator

.qsp.onOperatorPostCheckpoint[id;handler]

Parameters:

name q type description
id symbol An operator ID.
handler function A ternary function to execute after a checkpoint has been saved.

sets the onOperatorPostCheckpoint event handler of the operator with id.

The onOperatorPostCheckpoint event handler is called after a checkpoint has either been saved to disk or written to the Controller.

It has arguments:

  1. operator
  2. checkpointed operator state (i.e. state set via .qsp.set)
  3. data returned by the operator's onCheckpoint handler
Reserved for readers

The operator post checkpoint hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection.

.qsp.onOperatorRecover

Set the onOperatorRecover event handler for a specific operator

.qsp.onOperatorRecover[id;handler]

Parameters:

name q type description
id symbol An operator ID.
handler function A binary function to execute after an operator recovers

sets the onOperatorRecover event handler of the operator with the id.

It has arguments:

  1. operator
  2. data returned by the operator's onCheckpoint handler

The onOperatorRecover handler is called when a pipeline recovers from a checkpoint. Operator state has already been restored once this handler has been called. This provides an opportunity for operators to make updates to their state before the pipeline starts running.

Reserved for readers

The operator recover hook cannot be used on a reader operator. This includes any operators from the .qsp.read.* collection.

Checkpointing external state:

TABLE: ();

.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];

.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];

.qsp.run
    .qsp.read.fromCallback[`publish]
    .qsp.map[{
        `TABLE upsert x;
        x
        }; .qsp.use ``name!(::;`map)]
    .qsp.write.toConsole[];

.qsp.onPostCheckpoint

Set the onPostCheckpoint handler

.qsp.onPostCheckpoint[handler]

Parameters:

name q type description
handler function A unary function to be called after a checkpoint has been written.

This sets the onPostCheckpoint handler for the pipeline.

The onPostCheckpoint handler for the pipeline is called after a checkpoint has been written either to disk or to the controller, and has been acknowledged by the controller.

Its argument is the data returned by the pipeline's onCheckpoint handler.

.qsp.onRecover

Set the onRecover handler

.qsp.onRecover[handler]

Parameters:

name q type description
handler function A unary function to execute after recovery.

This sets the onRecover handler for the pipeline.

The onRecover handler for the pipeline is called when a pipeline recovers from a checkpoint.

Its argument is the data returned by the pipeline's onCheckpoint handler.

.qsp.onSetup

Set the onSetup event handler

.qsp.onSetup[handler]

Parameters:

name q type description
handler function A nullary function to execute after all operators in a pipeline are setup.

This sets the onSetup event handler for the pipeline.

The onSetup event handler is called before the pipeline is finished initializing,.

.qsp.onStart

Set the onStart event handler

.qsp.onStart[handler]

Parameters:

name q type description
handler function A nullary function to execute after all operators in a pipeline are started.

This sets the onStart event handler for the pipeline.

The onStart event handler is called after the pipeline has invoked start on its readers; it returns generic null.

.qsp.onFinish

Set the onFinish event handler

.qsp.onFinish[handler]

Parameters:

name q type description
handler function A unary function that accepts an operator configuration to execute when a given operator is finished.

This sets the onFinish event handler for the pipeline.

The onFinish event handler is called when an operator is complete; it returns generic null.

.qsp.onTeardown

Set the onTeardown event handler

.qsp.onTeardown[handler]

Parameters:

name q type description
handler function A nullary function to execute before a pipeline is torn down.

This sets the onTeardown event handler for the pipeline.

The onTeardown event handler is called after the pipeline is torn down; it returns generic null.

.qsp.registerTask

Register a task for an operator

.qsp.registerTask[op]

Parameters:

name q type description
op .qsp.op A pipeline operator.

This registers a task for the operator and returns an identifier (int) for the task.

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using .qsp.finishTask.

Register an asynchronous task for true async using .qsp.apply:

.qsp.run
  .qsp.read.fromCallback[`publish]
  .qsp.apply[
    {[op; md; data]
      // Register a task which represents the unfinished async kurl GET request
      tid: .qsp.registerTask[op];
      .kurl.async (
        "https://www.google.ca";
        "GET";
        ``callback!(::;
          {[op;md;data;tid;r]         // GET request with a callback
            .qsp.finishTask[op;tid];  //   Mark the task as finished
            data[`response]: r 1;     //   Add GET request response to data
            .qsp.push[op;md;data];    //   Push enriched data to the next operator
          }[op;md;data;tid]
        )
      );
    }
  ]
  .qsp.write.toConsole[]

.qsp.subscribe

Subscribes to a particular event with the provided callback

.qsp.subscribe[handler]

Parameters:

name q type description
eventType symbol Type of an event.
handler function A unary function to execute after the eventType occurs. Takes the event as argument.

An event is a dictionary with keys:

key q type description
type symbol Type of the event.
time timestamp Timestamp.
origin symbol Origin identifier.
data any Payload of the event object.

When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.

Returns the event type and a subscription identifier to use to unsubscribe {(symbol;long)}.

Subscribe to file events .qsp.readFromFile:

.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;

.qsp.run
  .qsp.read.fromFile[`:/tmp/numbers.txt]
  .qsp.write.toVariable[`test.output];
q).file.events
eventType     eventTime                     origin                 data
-------------------------------------------------------------------------------------------------------------------------
file.found    2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start    2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end      2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)

.qsp.unsubscribe

Unsubscribe a particular subscriber or all subscribers

.qsp.unsubscribe[id]

Parameters:

name q type description
id symbol or (symbol;long) If given just an event type, all subscribers are cleared. To clear a single subscription, pass the return of subscribe for the particular subscriber

Unsubscribes a particular subscriber or all subscribers for a given event type.