Lifecycle
.qsp. finishTask mark a task as finished onError set the onError event handler onCheckpoint set the onCheckpoint handler onOperatorCheckpoint set the onCheckpoint event handler onOperatorPostCheckpoint set the onPostCheckpoint event handler onOperatorRecover set the onRecover event handler onPostCheckpoint set the onPostCheckpoint handler onRecover set the onRecover handler onSetup set the onSetup event handler onStart set the onStart event handler onFinish set the onFinish event handler onTeardown set the onTeardown event handler registerTask register a task for an operator subscribe add a subscriber for an event unsubscribe remove a subscriber or all subscribers
.qsp.finishTask
Mark a task as finished
.qsp.finishTask[op;taskID]
Parameters:
name | q type | description |
---|---|---|
op | .qsp.op | A pipeline operator configuration, or the name of the operator as a symbol. |
taskID | int | A task identifier. |
marks the task as finished.
If all tasks for the operator are finished, and the operator has been requested to finish, the operator is finished.
Register an asynchronous task for true async using .qsp.apply
:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task that represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
data`url;
"GET";
``callback!(
::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
)
}
]
.qsp.write.toConsole[]
publish `url`response!("https://www.google.ca"; "");
2021.10.02D11:49:58.896623200 | url | "https://www.google.ca"
2021.10.02D11:49:58.896623200 | response| "<!doctype html><html ..
See registerTask for more information
.qsp.onError
Set the onError
handler for the pipeline
.qsp.onError[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A ternary function that is passed any messages that cause an error in the pipeline. The first argument is the error message, the operator that threw the error and the batch of data that cause the error. |
The onError
event handler is called when an error occurs in a pipeline. The handler
will be passed an error message, the operator that threw the error, and the
batch that caused the error.
.qsp.onCheckpoint
Set the onCheckpoint
handler
.qsp.onCheckpoint[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A nullary function to execute during checkpointing. |
This sets the onCheckpoint
handler for the pipeline.
The onCheckpoint
handler for the pipeline is called whenever a checkpoint is created.
The return value is saved to the checkpoint. This value will be passed into the
onPostCheckpoint
and onRecover
handlers, and can be used for tracking files.
.qsp.onOperatorCheckpoint
Set the onOperatorCheckpoint
event handler for a specific operator
.qsp.onOperatorCheckpoint[id;handler]
Parameters:
name | q type | description |
---|---|---|
id | symbol | An operator ID. |
handler | function | A unary function which executes before an operator performs a checkpoint |
sets the onOperatorCheckpoint
event handler of the operator with id
.
Its argument is the operator.
The onOperatorCheckpoint
event handler is called when a checkpoint is created, before operator
state is saved. This provides an opportunity for an operator to make updates to its state
before it is checkpointed. The return value of the handler is also added to the checkpoint,
and will be passed to the operators onOperatorRecover
function on recovery.
Set onOperatorCheckpoint
handler:
.qsp.onOperatorCheckpoint[`map;{[op]
log.info ("Operator state:\n%s"; .Q.s .qsp.get[op;::]);
}]
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{[op;md;x]
.qsp.set[op;md] .qsp.get[op;md] upsert x;
x
}; .qsp.use `state`name!(();`map)]
.qsp.write.toConsole[];
.qsp.onOperatorPostCheckpoint
Set the onOperatorPostCheckpoint
event handler for a specific operator
.qsp.onOperatorPostCheckpoint[id;handler]
Parameters:
name | q type | description |
---|---|---|
id | symbol | An operator ID. |
handler | function | A ternary function to execute after a checkpoint has been saved. |
sets the onOperatorPostCheckpoint
event handler of the operator with id
.
The onOperatorPostCheckpoint
event handler is called after a checkpoint has either been
saved to disk or written to the Controller.
It has arguments:
- operator
- checkpointed operator state (i.e. state set via .qsp.set)
- data returned by the operator's
onCheckpoint
handler
Reserved for readers
The operator post checkpoint hook cannot be used on a reader operator. This includes
any operators from the .qsp.read.*
collection.
.qsp.onOperatorRecover
Set the onOperatorRecover
event handler for a specific operator
.qsp.onOperatorRecover[id;handler]
Parameters:
name | q type | description |
---|---|---|
id | symbol | An operator ID. |
handler | function | A binary function to execute after an operator recovers |
sets the onOperatorRecover
event handler of the operator with the id
.
It has arguments:
- operator
- data returned by the operator's
onCheckpoint
handler
The onOperatorRecover
handler is called when a pipeline recovers from a checkpoint.
Operator state has already been restored once this handler has been called. This provides
an opportunity for operators to make updates to their state before the pipeline starts running.
Reserved for readers
The operator recover hook cannot be used on a reader operator. This includes
any operators from the .qsp.read.*
collection.
Checkpointing external state:
TABLE: ();
.qsp.onOperatorCheckpoint[`map;{[op] TABLE }];
.qsp.onOperatorRecover[`map; {[op;t] TABLE:: t}];
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{
`TABLE upsert x;
x
}; .qsp.use ``name!(::;`map)]
.qsp.write.toConsole[];
.qsp.onPostCheckpoint
Set the onPostCheckpoint
handler
.qsp.onPostCheckpoint[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A unary function to be called after a checkpoint has been written. |
This sets the onPostCheckpoint
handler for the pipeline.
The onPostCheckpoint
handler for the pipeline is called after a checkpoint has been written
either to disk or to the controller, and has been acknowledged by the controller.
Its argument is the data returned by the pipeline's onCheckpoint
handler.
.qsp.onRecover
Set the onRecover
handler
.qsp.onRecover[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A unary function to execute after recovery. |
This sets the onRecover
handler for the pipeline.
The onRecover
handler for the pipeline is called when a pipeline recovers from a checkpoint.
Its argument is the data returned by the pipeline's onCheckpoint
handler.
.qsp.onSetup
Set the onSetup
event handler
.qsp.onSetup[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A nullary function to execute after all operators in a pipeline are setup. |
This sets the onSetup
event handler for the pipeline.
The onSetup
event handler is called before the pipeline is finished initializing,.
.qsp.onStart
Set the onStart
event handler
.qsp.onStart[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A nullary function to execute after all operators in a pipeline are started. |
This sets the onStart
event handler for the pipeline.
The onStart
event handler is called after the pipeline has invoked start on its readers; it returns generic null.
.qsp.onFinish
Set the onFinish
event handler
.qsp.onFinish[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A unary function that accepts an operator configuration to execute when a given operator is finished. |
This sets the onFinish
event handler for the pipeline.
The onFinish
event handler is called when an operator is complete; it returns generic null.
.qsp.onTeardown
Set the onTeardown
event handler
.qsp.onTeardown[handler]
Parameters:
name | q type | description |
---|---|---|
handler | function | A nullary function to execute before a pipeline is torn down. |
This sets the onTeardown
event handler for the pipeline.
The onTeardown
event handler is called after the pipeline is torn down; it returns generic null.
.qsp.registerTask
Register a task for an operator
.qsp.registerTask[op]
Parameters:
name | q type | description |
---|---|---|
op | .qsp.op | A pipeline operator. |
This registers a task for the operator and returns an identifier (int) for the task.
A task represents an asynchronous request that an operator made, and once that
request is finished, the task should be marked as finished using .qsp.finishTask
.
Register an asynchronous task for true async using .qsp.apply
:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[
{[op; md; data]
// Register a task which represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (
"https://www.google.ca";
"GET";
``callback!(::;
{[op;md;data;tid;r] // GET request with a callback
.qsp.finishTask[op;tid]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
)
);
}
]
.qsp.write.toConsole[]
.qsp.subscribe
Subscribes to a particular event with the provided callback
.qsp.subscribe[handler]
Parameters:
name | q type | description |
---|---|---|
eventType | symbol | Type of an event. |
handler | function | A unary function to execute after the eventType occurs. Takes the event as argument. |
An event is a dictionary with keys:
key | q type | description |
---|---|---|
type | symbol | Type of the event. |
time | timestamp | Timestamp. |
origin | symbol | Origin identifier. |
data | any | Payload of the event object. |
When an event is emitted matching the type of the event subscribed to, the provided callback will be invoked with the event object.
Returns the event type and a subscription identifier to use to unsubscribe {(symbol;long)}
.
Subscribe to file events .qsp.readFromFile
:
.file.events: ([] eventType: `symbol$(); eventTime: `timestamp$(); origin: `symbol$(); data:());
.qsp.subscribe[ ;{`.file.events upsert value x}] each `file.found`file.start`file.progress`file.end;
`:/tmp/numbers.txt 0: string til 5;
.qsp.run
.qsp.read.fromFile[`:/tmp/numbers.txt]
.qsp.write.toVariable[`test.output];
q).file.events
eventType eventTime origin data
-------------------------------------------------------------------------------------------------------------------------
file.found 2022.11.18D09:41:14.111578506 file_:/tmp/numbers_txt (,`paths)!,,":/tmp/numbers.txt"
file.start 2022.11.18D09:41:14.111668936 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
file.progress 2022.11.18D09:41:14.111703746 file_:/tmp/numbers_txt `path`totalBytes`bytesRead!(":/tmp/numbers.txt";10;10)
file.end 2022.11.18D09:41:14.111710593 file_:/tmp/numbers_txt `path`size!(":/tmp/numbers.txt";10)
.qsp.unsubscribe
Unsubscribe a particular subscriber or all subscribers
.qsp.unsubscribe[id]
Parameters:
name | q type | description |
---|---|---|
id | symbol or (symbol;long) | If given just an event type, all subscribers are cleared. To clear a single subscription, pass the return of subscribe for the particular subscriber |
Unsubscribes a particular subscriber or all subscribers for a given event type.