kxi.sp.lifecycle
Stream Processor lifecycle hooks and triggers.
All of the functions in this module are available at the top-level of the Stream Processor
interface. For example, sp.lifecycle.on_checkpoint
is available
as sp.on_checkpoint
.
Warnings:
Provided handler functions are converted to q functions
Many of the functions in this module accept "handlers", i.e. callback functions. These
functions are converted to q functions, so a few caveats apply to them. They should not rely on
default arguments, as q will expect every default argument will be provided. Also *args
and
**kwargs
are both treated as a single argument, which should be a list or dict respectively.
Best practice is to wrap functions that use these features that don't translate over to q well
in a basic function which q can handle better. For example, instead of passing print
in as a
handler that would be given three arguments use lambda x, y, z: print(x, y, z)
.
on_setup
def on_setup(handler: Callable[[], None]) -> None
Set the global handler that will be called before a pipeline has finished initializing.
Arguments:
handler
- A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored.>>> from kxi import sp >>> def on_setup(): global setup setup = True >>> setup = False >>> assert not setup >>> sp.lifecycle.on_setup(on_setup) >>> sp.run(sp.read.from_callback('publish') | sp.write.to_console()) >>> assert setup
on_start
def on_start(handler: Callable[[], None]) -> None
Set the global handler that will be called when all operators in a pipeline have started.
Arguments:
handler
- A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored.>>> from kxi import sp >>> def on_start(): global started started = True >>> started = False >>> sp.lifecycle.on_start(on_start) >>> sp.run(sp.read.from_callback('publish') | sp.write.to_console()) >>> assert started
on_teardown
def on_teardown(handler: Callable[[], None]) -> None
Set the global handler that will be called before a pipeline is torn down.
A teardown can be triggered by calling sp.teardown
.
Arguments:
handler
- A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored.>>> from kxi import sp >>> torndown = False >>> def on_teardown(): global torndown torndown = True >>> sp.lifecycle.on_teardown(on_teardown) >>> sp.run(sp.read.from_callback('publish') | sp.write.to_console()) >>> sp.teardown() >>> assert torndown
on_error
def on_error(
handler: Callable[[kx.CharVector, kx.Dictionary, kx.K], None]) -> None
Set the global handler that will be called when an error occurs.
Arguments:
handler
- A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into otherkxi.sp
functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored.>>> from kxi import sp >>> import pykx as kx >>> errored = False >>> def on_error(errMsg, op, data): global errored errored = True >>> sp.lifecycle.on_error(on_error) >>> sp.run(sp.read.from_callback('publish') | sp.map(lambda x: 1 / 0) | sp.write.to_console()) >>> assert not errored >>> kx.q('publish',range(10)) >>> assert errored
on_checkpoint
def on_checkpoint(handler: Callable[[], Any]) -> None
Set the global handler that will be called when a checkpoint is being created.
Arguments:
handler
- A function that will be called when a checkpoint is being created. It will not be provided any arguments, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into theon_post_checkpoint
andon_recover
handlers.>>> from kxi import sp >>> import pykx as kx >>> import datetime >>> checkpointed = False >>> def on_checkpoint(): global checkpointed checkpointed = True return datetime.datetime.now() >>> sp.lifecycle.on_checkpoint(on_checkpoint) >>> recovered = False >>> def on_recover(auxiliary_data): print(f'on_recover called with {auxiliary_data}') global recovered recovered = True >>> sp.lifecycle.on_recover(on_recover) >>> sp.run(sp.read.from_callback('publish') | sp.write.to_console()) # Simulate recovery from a checkpoint >>> assert not checkpointed >>> kx.q('.spcp.writeCheckpoint[] `id`checkpoint!(0j; .sp.createCheckpoint[])') >>> assert checkpointed and not recovered # These are internal API calls, and are liable to change without notice. # They should not be used in production. >>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]') >>> assert recovered
on_post_checkpoint
def on_post_checkpoint(handler: Callable[[kx.K], None]) -> None
Set the global handler that will be called after a checkpoint has been created.
The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.
Arguments:
handler
- A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of theon_checkpoint
handler. Its return value is ignored.
>>> from kxi import sp
>>> import pykx as kx
>>> import datetime
>>> def on_checkpoint():
return datetime.datetime.now()
>>> sp.lifecycle.on_checkpoint(on_checkpoint)
>>> postCheckpointed = False
>>> def on_post_checkpoint(auxiliary_data):
print(f'on_post_checkpoint called with {auxiliary_data}')
postCheckpointed = True
>>> sp.lifecycle.on_post_checkpoint(on_post_checkpoint)
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console())
>>> assert not postCheckpointed
>>> kx.q('.spcp.WrittenCheckpoint: (!) . flip ((`checkpoint;.sp.createCheckpoint[]);(`id;0j))')
>>> kx.q('.spwork.acknowledgeCheckpoint[""]')
>>> assert postCheckpointed
on_recover
def on_recover(handler: Callable[[kx.K], None]) -> None
Set the global handler that will be called when a pipeline recovers from a checkpoint.
Arguments:
handler
- A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (seeon_checkpoint
). Its return value is ignored.>>> from kxi import sp >>> import pykx as kx >>> recovered = False >>> def on_recover(): global recovered recovered = True >>> sp.lifecycle.on_recover(on_recover) >>> sp.run(sp.read.from_callback('publish') | sp.write.to_console()) >>> assert not recovered # These are internal API calls used to simulate checkpoint recovery. # They are liable to change without notice, and should not be used in production. >>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]') >>> recovered
True
on_operator_checkpoint
def on_operator_checkpoint(operator: OperatorSpecifier,
handler: Callable[[kx.Dictionary], Any]) -> None
Set a per-operator handler that will be called when a checkpoint is being created.
The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.
Note: This cannot be used for reader operators.
Arguments:
operator
- The operator for which to assign the handler.handler
- A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator'son_operator_post_checkpoint
andon_operator_recover
handlers.>>> from kxi import sp >>> import pykx as kx >>> operatorCheckpointed = False >>> def on_operator_checkpoint(op): global operatorCheckpointed operatorCheckpointed = True >>> sp.lifecycle.on_operator_checkpoint('callback', on_operator_checkpoint) >>> sp.run(sp.read.from_callback('publish', name='callback') | sp.write.to_console()) >>> assert not operatorCheckpointed >>> kx.q('.spcp.writeCheckpoint[] (!) . flip ((`checkpoint; .sp.createCheckpoint[]);(`id;0j));') >>> assert operatorCheckpointed
on_operator_post_checkpoint
def on_operator_post_checkpoint(
operator: OperatorSpecifier,
handler: Callable[[kx.Dictionary, kx.K, kx.K], None]) -> None
Set a per-operator handler that will be called after a checkpoint has been created.
The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.
Note: This cannot be used for reader operators.
Arguments:
operator
- The operator for which to assign the handler.handler
- A function that will be called after a checkpoint has been created. It will be provided three arguments: the operator, the checkpointed operator state (i.e. the state set bysp.state.set
), and the checkpoint data provided by the operator'son_operator_checkpoint
handler. Its return value is ignored.>>> from kxi import sp >>> import pykx as kx >>> operatorPostCheckpointed = False >>> def on_operator_post_checkpoint(op, state, data): global operatorPostCheckpointed operatorPostCheckpointed = True >>> sp.lifecycle.on_operator_post_checkpoint('callback', on_operator_post_checkpoint) >>> sp.run(sp.read.from_callback('publish', name='callback') | sp.write.to_console()) >>> assert not operatorPostCheckpointed >>> kx.q('.spcp.WrittenCheckpoint: (!) . flip ((`checkpoint;.sp.createCheckpoint[]);(`id;0j));') >>> kx.q('.spwork.acknowledgeCheckpoint[""]') >>> assert operatorPostCheckpointed
on_operator_recover
def on_operator_recover(
operator: OperatorSpecifier, handler: Callable[[kx.Dictionary, kx.K],
None]) -> None
Set a per-operator handler that will be called when a pipeline recovers from a checkpoint.
Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts.
See also: on_operator_checkpoint
, which can be
used to set an operator's state before it is checkpointed.
Note: This cannot be used for reader operators.
Arguments:
operator
- The operator for which to assign the handler.handler
- A function that will be called when a pipeline recovers from a checkpoint. It will be provided two arguments: the operator, and the checkpoint data provided by the operator'son_operator_checkpoint
handler. Its return value is ignored.>>> from kxi import sp >>> import pykx as kx >>> operatorRecovered = False >>> def on_operator_recover(op, data): global operatorRecovered operatorRecovered = True >>> sp.lifecycle.on_operator_recover('callback', on_operator_recover) >>> sp.run(sp.read.from_callback('publish', name='callback') | sp.write.to_console()) >>> assert not operatorRecovered # These are internal API calls used to simulate checkpoint recovery. # They are liable to change without notice, and should not be used in production. >>> kx.q('.sp.i.recoverFromCheckpoint .spwork.i.createCheckpoint[]') >>> assert operatorRecovered
finish_task
def finish_task(operator: OperatorSpecifier, task_id: int) -> None
Indicate that the specified task in the specified operator has finished processing.
If all tasks for the operator are finished, and the operator has been requested to finish, then the operator is finished.
Arguments:
operator
- The operator in which the task was created.task_id
- The task identifier.
register_task
def register_task(operator: OperatorSpecifier) -> int
Register a task for the given operator.
A task represents an asynchronous request that an operator made, and once that request is
finished, the task should be marked as finished using
sp.finish_task
.
Arguments:
operator
- The operator for which a task will be registered.
Returns:
A task ID, which will be needed to call sp.finish_task
.