Skip to content

Lifecycle

Stream Processor lifecycle hooks and triggers.

All of the functions in this module are available at the top-level of the Stream Processor interface. For example, sp.lifecycle.on_checkpoint is available as sp.on_checkpoint.

Provided handler functions are converted to q functions

Many of the functions in this module accept "handlers", i.e. callback functions. These functions are converted to q functions, so a few caveats apply to them. They should not rely on default arguments, as q will expect every default argument will be provided. Also *args and **kwargs are both treated as a single argument, which should be a list or dict respectively. Best practice is to wrap functions that use these features that don't translate over to q well in a basic function which q can handle better. For example, instead of passing print in as a handler that would be given three arguments use lambda x, y, z: print(x, y, z).

kxi.sp.lifecycle.__all__ = ['finish_task', 'on_checkpoint', 'on_error', 'on_operator_checkpoint', 'on_operator_post_checkpoint', 'on_operator_recover', 'on_post_checkpoint', 'on_recover', 'on_setup', 'on_start', 'on_teardown', 'register_task'] module-attribute

kxi.sp.lifecycle.__dir__

kxi.sp.lifecycle.on_setup

Set the global handler that will be called before a pipeline has finished initializing.

Parameters:

Name Type Description Default
handler Callable[[], None]

A function that will be called before a pipeline has finished initializing, after all operator in a pipeline are setup. It will not be provided any arguments, and its return value is ignored.

required

kxi.sp.lifecycle.on_start

Set the global handler that will be called when all operators in a pipeline have started.

Parameters:

Name Type Description Default
handler Callable[[], None]

A function that will be called when all operators in a pipeline have started. It will not be provided any arguments, and its return value is ignored.

required

kxi.sp.lifecycle.on_teardown

Set the global handler that will be called before a pipeline is torn down.

A teardown can be triggered by calling sp.teardown.

Parameters:

Name Type Description Default
handler Callable[[], None]

A function that will be called before a pipeline is torn down. It will not be provided any arguments, and its return value is ignored.

required

kxi.sp.lifecycle.on_error

Set the global handler that will be called when an error occurs.

Parameters:

Name Type Description Default
handler Callable[[kx.CharVector, kx.Dictionary, kx.K], None]

A function that will be called when an error occurs in a pipeline. It will be provided three arguments: the error message (as a character vector), the operator (as a q dictionary which can be passed into other kxi.sp functions that take an operator), and the batch of data that was being processed when the error occurred. Its return value is ignored.

required

kxi.sp.lifecycle.on_checkpoint

Set the global handler that will be called when a checkpoint is being created.

Parameters:

Name Type Description Default
handler Callable[[], Any]

A function that will be called when a checkpoint is being created. It will not be provided any arguments, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the on_recover handler.

required

kxi.sp.lifecycle.on_post_checkpoint

Set the global handler that will be called after a checkpoint has been created.

The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.

Parameters:

Name Type Description Default
handler Callable[[kx.K], None]

A function that will be called after a checkpoint has been created. It will be provided one argument, which is the return value of the on_checkpoint handler. Its return value is ignored.

required

kxi.sp.lifecycle.on_recover

Set the global handler that will be called when a pipeline recovers from a checkpoint.

Parameters:

Name Type Description Default
handler Callable[[kx.K], None]

A function that will be called when a pipeline recovers from a checkpoint. It will be provided one argument, which is whatever was returned by the last run checkpoint handler (see on_checkpoint). Its return value is ignored.

required

kxi.sp.lifecycle.on_operator_checkpoint

Set a per-operator handler that will be called when a checkpoint is being created.

The handler will be called before the specified operator's state is saved. This provides an opportunity to make updates to the state before it is checkpointed.

Note: This cannot be used for reader operators.

Parameters:

Name Type Description Default
operator OperatorSpecifier

The operator for which to assign the handler.

required
handler Callable[[kx.Dictionary], Any]

A function that will be called when a checkpoint is being created. It will be provided the operator as an argument, and whatever it returns is converted to a q type, then saved in the checkpoint. Later this data may be passed into the operator's on_operator_recover handler.

required

kxi.sp.lifecycle.on_operator_post_checkpoint

Set a per-operator handler that will be called after a checkpoint has been created.

The handler will be called after the checkpoint has been written to disk and acknowledged by the controller.

Note: This cannot be used for reader operators.

Parameters:

Name Type Description Default
operator OperatorSpecifier

The operator for which to assign the handler.

required
handler Callable[[kx.Dictionary, kx.K, kx.K], None]

A function that will be called after a checkpoint has been created. It will be provided three arguments: the operator, the checkpointed operator state (i.e. the state set by sp.state.set), and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored.

required

kxi.sp.lifecycle.on_operator_recover

Set a per-operator handler that will be called when a pipeline recovers from a checkpoint.

Operator state has already been restored by the time the recovery handler is called. This provides an opportunity to edit the state before the pipeline starts.

See also: on_operator_checkpoint, which can be used to set an operator's state before it is checkpointed.

Note: This cannot be used for reader operators.

Parameters:

Name Type Description Default
operator OperatorSpecifier

The operator for which to assign the handler.

required
handler Callable[[kx.Dictionary, kx.K], None]

A function that will be called when a pipeline recovers from a checkpoint. It will be provided two arguments: the operator, and the checkpoint data provided by the operator's on_operator_checkpoint handler. Its return value is ignored.

required

kxi.sp.lifecycle.finish_task

Indicate that the specified task in the specified operator has finished processing.

If all tasks for the operator are finished, and the operator has been requested to finish, then the operator is finished.

Parameters:

Name Type Description Default
operator OperatorSpecifier

The operator in which the task was created.

required
task_id int

The task identifier.

required

kxi.sp.lifecycle.register_task

Register a task for the given operator.

A task represents an asynchronous request that an operator made, and once that request is finished, the task should be marked as finished using sp.finish_task.

Parameters:

Name Type Description Default
operator OperatorSpecifier

The operator for which a task will be registered.

required

Returns:

Type Description
int

A task ID, which will be needed to call sp.finish_task.

kxi.sp.types.Checkpoint

Bases: TypedDict

kxi.sp.types.Checkpoint.app: Dict class-attribute

kxi.sp.types.Checkpoint.pipeline: Any class-attribute

kxi.sp.types.Checkpoint.operator: Dict class-attribute