Checkpoints and recovery
During execution of a pipeline, both Controllers and Workers write checkpoints periodically to track processing progress. This checkpointing is done off the main processing threads executing the pipeline as to reduce the impact on pipeline performance.
The advantage of writing checkpoints periodically is that recovery time is much reduced.
Checkpoints
The captured checkpoints include managed state for stateful operators, as well as engine-specific execution information including stream offsets for data sources that support replay.
Checkpoint directory persistence
Make the configured checkpoint directory persistent. That is, if the container restarts, the configured checkpoint directory should remain intact so that new checkpoints can be read during recovery. This could be a Docker volume mount or a Persistent Volume within Kubernetes.
Once written down, checkpoint progress is asynchronously committed to the Worker’s Controller, which keeps track of progress information for the pipeline as a whole. The Controller also checkpoints the progress information it receives on its own configurable checkpoint interval KXI_SP_CHECKPOINT_FREQ.
Frequency
The frequency of the checkpoint operation for both Controllers and Workers can be individually configured through environment variables in the relevant Docker Compose file or other equivalent means.
If checkpoints take longer to write down than the configured checkpoint frequency, overlapping checkpoints will be dropped. The next checkpoint will be persisted only when the previous checkpoint has finished. This avoids an increasing queue of checkpoints, keeping performance and memory usage consistent.
Disabling
To disable the checkpoint mechanism entirely, configure the frequency to 0:
docker run \
-e KXI_SP_CHECKPOINT_FREQ=0 \
...
This could be desirable if there are external means of recovery, such as replaying a tickerplant’s logs from beginning.
Recovery
When running within Docker, containers can be configured to recover upon failure:
controller:
image: portal.dl.kx.com/kxi-sp-controller:1.17.0
deploy:
restart_policy:
condition: on-failure
# ...
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.17.0
deploy:
restart_policy:
condition: on-failure
# ...
This can be configured as desired.
When a Worker restarts after a failure, the Controller will initialize the Worker with the most recent checkpoint from that Worker before the failure. By recovering from the checkpoint, any managed state is set back onto the operators within the pipeline in the Worker, and streams are replayed from the offset corresponding to the state, if the readers being used support replay.
Impact on state
Since both progress information as well as managed operator state are contained within each checkpoint commit, state will be rolled back to the point of the last checkpoint during recovery, and events will be replayed on top of that state based on the last checkpointed offset, if replay is available on the data source. This ensures that state has not processed records more than once, and with replay enabled, any messages lost between the checkpoint and point of failure are still processed.
Impact on windows
Though state is captured consistently, any analytics written using processing-time semantics will inherently have different behavior during a recovery scenario than during regular operation.
For example, a pipeline with a .qsp.window.timer uses processing time (the local clock) timers to bucket events into batches. This can help improve throughput where event time semantics are not required, by processing larger data batches than are written. All events that arrive within a given interval are grouped into a batch. The implication of this is that during a recovery event with a data source capable of replaying events, recovered events may come in much faster during recovery than they were actually observed in the regular live-stream processing, so the window during recovery may be much larger than the equivalent window observed without a failure.
Event-time windows, where quiet periods in the stream cause windows to be triggered by internal timer timeouts, essentially act as a processing-time effect. Because of this, during recovery, if new events cover the quiet period so that the timer trigger is not required, the partial windows would not be emitted.
In order to mitigate the effects of this for .qsp.window.timer, which already flushes the buffer automatically if a certain number of buffered records is reached, the countTrigger setting can be set to lower this threshold and produce smaller windows, during both the live stream and during recovery. However, any general user defined functions that are processing-time sensitive (for example, any uses of .z.p, .z.P, etc) should be carefully considered.
Impact on output events
Though the state of a pipeline is committed along with the progress information, to maintain high performance the current release of the Stream Processor does not commit output events. The implication of this is that during recovery, any events emitted by data sinks within the pipeline between the time of the last committed checkpoint and the failure event will be re-emitted.
Downstream data sinks
Ensure downstream data sinks are idempotent or can handle duplicate messages. Otherwise they might duplicate output events in downstream data stores from recovery scenarios.
Implications for scaled pipelines
Each Worker commits its own asynchronous checkpoints. If a single Worker fails and restarts, the recovery event is localized to the single Worker, and all other Workers in the pipeline continue processing and making progress during the recovery event.
Implications of Controller failure
If a Controller were to fail during execution of a live pipeline, that would tear down all Workers executing the pipeline. If the Controller and Workers are configured with a restart policy as discussed above, the Controller, upon restart, will reinitialize all Workers with the last checkpoint each has persisted. In most container orchestrators, this recovery is typically complete in a few seconds, but additional delays can be caused by the amount of data that needs to be replayed to catch up to the live stream.
Pipeline Spec Changes and Restarts
During normal pipeline operation, as time passes and data progresses through the pipeline, checkpoints describing the current state of the pipeline are written to disk (as described in the previous sections on this page). These checkpoints can contain various different node-specific information depending on specific node/pipeline configuration - examples include managed state for stateful operators, or internal engine-specific information like whether a node has finished.
In cases where the pipeline graph changes between restarts, and checkpoints are available to recover from, the Stream Processor will detect these changes. Any nodes present in both the checkpoint and new pipeline will have state restored as usual, where possible. New nodes that appear only in the new pipeline will start fresh, and any nodes in the checkpoint not present in the new pipeline are ignored.
Nodes are dynamically named based on their arguments and position in the pipeline. When restoring from a checkpoint, nodes are matched to their checkpointed state by name. Changing the graph may cause a node to change its name and lose its state, so it is recommended that nodes are explicitly named to prevent this.
Naming operators in code pipelines is done using the name parameter:
Using the q API, pass the name value as part of a .qsp.use dictionary.
.qsp.run
.qsp.read.fromCallback[`pub; .qsp.use ``name!(::;`callbackRead)]
.qsp.map[{x + 1}; .qsp.use ``name!(::;`addMap)];
.qsp.write.toConsole[.qsp.use ``name!(::;`consoleWrite)]
Using the python API, pass it as the name named parameter.
sp.run(sp.read.from_callback('pub', name='callbackRead')
| sp.map(lambda: x + 1, name='addMap')
| sp.write.to_console(name='consoleWrite'))
When using the Insights Enterprise UI, auto-generated names are displayed
under the node type. These can be changed by right clicking and selecting the
Rename Node option. Names are generally 'sticky' unless removed and re-added,
or changed.

There is an important caveat - if the checkpointed state contains any
finished nodes, these recovery changes
do not apply, and the pipeline will error. Restarting a pipeline with any
finished nodes would entail 'unfinishing' these nodes, forcing them back into
their most recent active state and potentially resulting in undefined behaviour
or duplicate data. This error can be ignored and starting can be forced by
setting the KXI_SP_FORCE_GRAPH_CHANGE environment variable to true.
For more information on developing and deploying pipelines, see writing or running.