Checkpoints and Recovery
During execution of a pipeline, both Controllers and Workers write checkpoints periodically to track processing progress. This checkpointing is done off the main processing threads executing the pipeline as to reduce the impact on pipeline performance.
The advantage to writing checkpoints periodically is that recovery time is much reduced. See recovery for more information about the recovery process.
$KXI_SP_CHECKPOINT_DIR is not set, and checkpoints are enabled (
$KXI_SP_CHECKPOINT_FREQ is not
0), Workers will write checkpoints over IPC to their Controller instead of writing them to disk. This will happen on the main thread.
The captured checkpoints include managed state for stateful operators, as well as engine-specific execution information including stream offsets for data sources that support replay.
The configured checkpoint directory should be persistent. That is, if the container restarts, the configured checkpoint directory should remain intact so that new checkpoints can be read during recovery. This could be a Docker volume mount or a Persistent Volume within Kubernetes.
Once written down, checkpoint progress is asynchronously committed to the Worker's Controller who keeps track of progress information for the pipeline as a whole. The Controller also checkpoints the progress information it receives on its own configurable checkpoint interval
The frequency of the checkpoint operation for both Controllers and Workers can be individually configured through environment variables in the relevant Docker Compose file or other equivalent means. See the configuration documentation for more information about the relevant environment variables.
If checkpoints take longer to write down than the configured checkpoint frequency, overlapping checkpoints will be dropped. The next checkpoint will only be persisted when the previous checkpoint has finished. This avoids an increasing queue of checkpoints, keeping performance and memory usage consistent.
To disable the checkpoint mechanism entirely, configure the frequency to
$ docker run \ -e KXI_SP_CHECKPOINT_FREQ=0 \ ...
This could be desirable if there are external means of recovery, such as replaying a tickerplant's logs from beginning.
When running within Docker, containers can be configured to recover upon failure:
controller: image: registry.dl.kx.com/kxi-sp-controller:0.8.2 deploy: restart_policy: condition: on-failure # ... worker: image: registry.dl.kx.com/kxi-sp-worker:0.8.2 deploy: restart_policy: condition: on-failure # ...
This can be configured as desired.
When a worker restarts after a failure, the Controller will initialize the Worker with the most recent checkpoint from that Worker before the failure. By recovering from the checkpoint, any managed state is set back onto the operators within the pipeline in the Worker, and streams are replayed from the offset corresponding to the state, if the readers being used support replay.
Impact on state¶
Since both progress information as well as managed operator state are contained within each checkpoint commit, state will be rolled back to the point of the last checkpoint during recovery, and events will be replayed on top of that state based off the last check-pointed offset, if replay is available on the data source. This ensures that state has not processed records more than once, and with replay enabled, any messages lost between the checkpoint and point of failure are still processed.
Impact on windows¶
Though state is captured consistently, any analytics written using processing-time semantics will inherently have different behaviour during a recovery scenario than during regular operation.
For example, a pipeline with a
.qsp.window.timer uses processing time (the local clock) timers to bucket events into batches. This can help improve throughput where event time semantics aren't required by processing larger data batches than are written. All events that arrive within a given interval are grouped into a batch. The implication of this is that during a recovery event with a data source that is capable of replaying events, recovered events may come in much faster during recovery than they were actually observed in the regular live stream processing, so the window during recovery may be much larger than the equivalent window observed without a failure.
Event-time windows, where quiet periods in the stream cause windows to be triggered by internal timer timeouts, essentially act as a processing-time effect. Because of this, during recovery, if new events cover the quiet period so that the timer trigger is not required, the partial windows would not be emitted.
In order to mitigate the effects of this for
batchSize setting could be used to limit the maximum size of batches during either the live stream or during recovery. However, any general user defined functions that are written sensitively to processing time (for example, any uses of
.z.P, etc) should be carefully considered.
Impact on output events¶
Though the state of a pipeline is committed along with the progress information, in order to maintain high performance, the current release of the Stream Processor does not commit output events. The implication of this is that during recovery, any events emitted by data sinks within the pipeline between the time of the last committed checkpoint and the failure event will be re-emitted.
Make sure downstream data sinks are idempotent or can handle duplicate messages in order to not duplicate output events in downstream data stores from recovery scenarios.
Implications on scaled pipelines¶
Each Worker commits it's own asynchronous checkpoints. If a single Worker fails and restarts, the recovery event is localized to the single Worker, and all other Workers in the pipeline will continue processing and making progress during the recovery event.
Implications of Controller failure¶
If a Controller were to fail during execution of a live pipeline, this will tear down all Workers executing the pipeline. If the Controller and Workers are configured with a restart policy as discussed above, the Controller, upon restart, will reinitialize all Workers with the last checkpoint each has persisted. In most container orchestrators, this recovery is typically complete in few seconds, but additional delays can be caused by the amount of data that needs to be replayed to catch up to the live stream.