Skip to content

Checkpoints and recovery

During execution of a pipeline, both Controllers and Workers write checkpoints periodically to track processing progress. This checkpointing is done off the main processing threads executing the pipeline as to reduce the impact on pipeline performance.

The advantage of writing checkpoints periodically is that recovery time is much reduced.

Checkpoints

The captured checkpoints include managed state for stateful operators, as well as engine-specific execution information including stream offsets for data sources that support replay.

Checkpoint directory persistence

Make the configured checkpoint directory persistent. That is, if the container restarts, the configured checkpoint directory should remain intact so that new checkpoints can be read during recovery. This could be a Docker volume mount or a Persistent Volume within Kubernetes.

Once written down, checkpoint progress is asynchronously committed to the Worker’s Controller, which keeps track of progress information for the pipeline as a whole. The Controller also checkpoints the progress information it receives on its own configurable checkpoint interval KXI_SP_CHECKPOINT_FREQ.

Frequency

The frequency of the checkpoint operation for both Controllers and Workers can be individually configured through environment variables in the relevant Docker Compose file or other equivalent means.

If checkpoints take longer to write down than the configured checkpoint frequency, overlapping checkpoints will be dropped. The next checkpoint will be persisted only when the previous checkpoint has finished. This avoids an increasing queue of checkpoints, keeping performance and memory usage consistent.

Disabling

To disable the checkpoint mechanism entirely, configure the frequency to 0:

docker run \
    -e KXI_SP_CHECKPOINT_FREQ=0 \
    ...

This could be desirable if there are external means of recovery, such as replaying a tickerplant’s logs from beginning.

Recovery

When running within Docker, containers can be configured to recover upon failure:

  controller:
    image: registry.dl.kx.com/kxi-sp-controller:1.5.1
    deploy:
      restart_policy:
        condition: on-failure
    # ...

  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.5.1
    deploy:
      restart_policy:
        condition: on-failure
    # ...

This can be configured as desired.

When a Worker restarts after a failure, the Controller will initialize the Worker with the most recent checkpoint from that Worker before the failure. By recovering from the checkpoint, any managed state is set back onto the operators within the pipeline in the Worker, and streams are replayed from the offset corresponding to the state, if the readers being used support replay.

Impact on state

Since both progress information as well as managed operator state are contained within each checkpoint commit, state will be rolled back to the point of the last checkpoint during recovery, and events will be replayed on top of that state based on the last checkpointed offset, if replay is available on the data source. This ensures that state has not processed records more than once, and with replay enabled, any messages lost between the checkpoint and point of failure are still processed.

Impact on windows

Though state is captured consistently, any analytics written using processing-time semantics will inherently have different behavior during a recovery scenario than during regular operation.

For example, a pipeline with a .qsp.window.timer uses processing time (the local clock) timers to bucket events into batches. This can help improve throughput where event time semantics are not required, by processing larger data batches than are written. All events that arrive within a given interval are grouped into a batch. The implication of this is that during a recovery event with a data source capable of replaying events, recovered events may come in much faster during recovery than they were actually observed in the regular live-stream processing, so the window during recovery may be much larger than the equivalent window observed without a failure.

Event-time windows, where quiet periods in the stream cause windows to be triggered by internal timer timeouts, essentially act as a processing-time effect. Because of this, during recovery, if new events cover the quiet period so that the timer trigger is not required, the partial windows would not be emitted.

In order to mitigate the effects of this for .qsp.window.timer, which already flushes the buffer automatically if a certain number of buffered records is reached, the countTrigger setting can be set to lower this threshold and produce smaller windows, during both the live stream and during recovery. However, any general user defined functions that are processing-time sensitive (for example, any uses of .z.p, .z.P, etc) should be carefully considered.

Impact on output events

Though the state of a pipeline is committed along with the progress information, to maintain high performance the current release of the Stream Processor does not commit output events. The implication of this is that during recovery, any events emitted by data sinks within the pipeline between the time of the last committed checkpoint and the failure event will be re-emitted.

Downstream data sinks

Ensure downstream data sinks are idempotent or can handle duplicate messages. Otherwise they might duplicate output events in downstream data stores from recovery scenarios.

Implications for scaled pipelines

Each Worker commits its own asynchronous checkpoints. If a single Worker fails and restarts, the recovery event is localized to the single Worker, and all other Workers in the pipeline continue processing and making progress during the recovery event.

Implications of Controller failure

If a Controller were to fail during execution of a live pipeline, that would tear down all Workers executing the pipeline. If the Controller and Workers are configured with a restart policy as discussed above, the Controller, upon restart, will reinitialize all Workers with the last checkpoint each has persisted. In most container orchestrators, this recovery is typically complete in a few seconds, but additional delays can be caused by the amount of data that needs to be replayed to catch up to the live stream.