Determinism
A pipeline will emit duplicate messages if it restores from a checkpoint, or if replicas are used, so to get exactly once messaging from the stream processor, there are two conditions that must be met:
- The writers used must be capable of filtering out duplicate messages
- The pipeline must be deterministic, meaning
- After restoring from a checkpoint, any batches that get re-emitted must have the same contents as when they were first emitted
- The n-th batch emitted by a given replica exactly matches the n-th batch emitted by the other replicas
The only deterministic writers currently available are the database writer, and the stream writer. Each message emitted by these writers is given a sequence number, which is just an integer counting the number of batches previously emitted. The consumer will perform deduplication by discarding any batches whose sequence numbers have been seen already.
Deduplication is done strictly based on sequence numbers, the content of the batches is not considered. As such, any differences in output between pipelines including re-ordering of records, extra records, or missing records can cause data loss or duplicate data.
Sources of nondeterminism
An incomplete list of sources of nondeterminism would include
- Merging data from multiple readers, or sharing data between streams with global variables. Except for certain use cases with the merge node, the lack of global ordering of messages across multiple readers will introduce nondeterminism.
- Using a nondeterministic reader
- Any sources of randomness, or references to the system time (with some exceptions for windows, see time-based windowing).
- Asynchronous calls introduce nondeterminism, as there is no guarantee on the order in which they will come back.
- Inherently nondeterministic operators, like union
- Untracked global variables
Readers
To guarantee determinism when restoring from a checkpoint, the messages' previously read data must either still be available, or the reader must journal messages as they come in. Some nodes, like the Kafka reader or stream reader, keep previously read messages in a write ahead log (WAL). These readers provide a deterministic replay sequence after a restart event. Kafka readers are deterministic within a topic-partition within a stream: they are deterministic if you scale the infrastructure while maintaining you topic-partitions, but not if number of topic-partitions is increased. Callback readers use event journaling to record incoming messages and are deterministic, whereas HTTP readers don't have event journaling, and are nondeterministic.
Callback readers
A callback reader can be made deterministic when only a single pipeline is running, but not when replicas are used.
This is done by setting the replay
option to true, which will enable event journaling.
Event journaling ensures that messages received between writing a checkpoint and the failure get replayed on recovery.
Note that if a pipeline is down and recovering, incoming messages will be buffered in the TCP
inbound buffer while the pipeline is blocked. When the tcp buffer is full, any new messages will be
dropped. As such, if replicas are used, the agent sending messages has the responsibility of
ensuring determinism.
File readers
The file readers, both local and cloud storage (S3, Azure, and GCP), are non-deterministic if the files they have read are modified, or if they are reading from a glob pattern, and on recovery the set of files matched by that glob has changed.
Time-based windowing
Timer windows, sliding windows, and tumbling windows are only deterministic when used with stream readers. When the timer fires, it emits a control event back into the stream. It is this control event that triggers the new window, ensuring that the new window will be created at the same point in the stream across all replicas, and in the event of a recovery. The control events include sequence numbers to allow for deduplication when replicas are used.
Although tumbling and sliding windows rely primarily on the timestamps within the data, they will also emit their buffer after a delay if the stream goes idle, which then needs the same control event mechanism that timer windows use.
When these window operators are used with any reader other than the stream reader, they will fire based on the system timer alone, with no control events, and are thus nondeterministic.
As an example, even though Kafka readers are deterministic because they can replay events, the pipeline can't inject control events into the stream of Kafka messages, so the combination of Kafka readers and time-based windows is nondeterministic.
Asynchronous operators
Asynchronous operators like apply can introduce nondeterminism. The following pipeline is nondeterministic, since there is no guarantee that the responses will come back in the same order in which the requests were sent.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[{[op; md; data]
// Register a task which represents the unfinished async kurl GET request
tid: .qsp.registerTask[op];
.kurl.async (data `url; "GET";
enlist[`callback]!enlist {[op;md;data;taskID;r]
.qsp.finishTask[op;taskID]; // Mark the task as finished
data[`response]: r 1; // Add GET request response to data
.qsp.push[op;md;data]; // Push enriched data to the next operator
}[op;md;data;tid]
);
}]
.qsp.write.toConsole[];
publish each ([] url: ("https://www.google.ca"; "https://www.example.com"))
Nondeterministic operators
Some operators, such as union, are inherently nondeterministic, or include nondeterministic modes. Consider the following pipeline which joins two CSVs. The ordering of events across multiple readers is nondeterministic. Running the same pipeline multiple times could give different results. If these are large files, and chunking is used, they will be nondeterministically interlaced.
.qsp.run
.qsp.read.fromAmazonS3[`:s3://myBucket/sales_domestic.csv]
.qsp.union[.qsp.read.fromAzureStorage[`:ms://myBucket/sales_international.csv; `myAccount]]
If you include one of these operators without setting KXI_ALLOW_NONDETERMINISM
to "true"
,
you'll get the following error.
Set the KXI_ALLOW_NONDETERMINISM environment variable to "true" to enable operator: union
Note that KXI_ALLOW_NONDETERMINISM
only covers a small subset of possible sources of nondeterminism,
and setting it to false does not mean that all sources of nondeterminism will trigger warnings.
Merge
It's the responsibility of a merge node to enforce determinism. As an example, the following pipeline takes a stream of incoming trades from a callback, and loads a table of company specific data from a file to join to each trade. When reading that file, chunking must be false, or the merge could run when only the first chunk of the table is has been read. If the right stream (the reference table) could be updated, the pipeline wouldn't be deterministic, as it is uncertain where in the left stream (incoming trades) those updates would happen on recovery.
// read.fromCallback (incoming trades)
// \
// merge -> write.toVariable
// /
// read.fromFile (reference table) -> apply -> decode.csv
// Generate a CSV holding a reference table
`companyData.csv 0: csv 0: ([] sym: `AAPL`F`GOOGL; name: ("Apple";"Ford";"Google"));
// This pipeline fragment loads the reference table.
// It's stored in a variable so it can be referenced by the merge node.
companyData:
.qsp.read.fromFile[`companyData.csv; .qsp.use ``chunking!00b]
// Unkey the stream (implicitly keyed on file name),
// or the two streams will end up in different buffers
.qsp.apply[{[op; md; data] .qsp.push[op; enlist[`]!enlist[::]; data]}]
.qsp.decode.csv[([sym: `$()] name: ())]
.qsp.teardown[]; .qsp.run
.qsp.read.fromCallback[`trades]
.qsp.merge[companyData; {[top; bottom] top lj bottom}]
.qsp.write.toVariable[`output]
trades ([] sym: `GOOGL`GOOGL`AAPL; price: 107.5 107.49 168.54)
output
sym price name
---------------------
GOOGL 107.5 "Google"
GOOGL 107.49 "Google"
AAPL 168.54 "Apple"
State
If a worker pod dies, the pipeline can be resumed from a snapshot of the pipeline's state called a checkpoint. On restoring from a checkpoint, the internal state should be the same as it was at a given point in the stream before restoring. To be included in this checkpoint, any state should be managed through the get, set, and track functions.
If replicas are used, each replica should, after consuming n batches, have the same state as any other replica at that point in the stream.
Get and Set
.qsp.get and .qsp.set in q can be used to get and set state specific to a given node, and in the case of a keyed-stream, specific to that key. This state is stored in checkpoints, and can be recovered in case of failure. There is currently no equivalent in Python pipelines.
Tracking global variables
To write a pipeline that maintains determinism while using global variables, .qsp.track can be used in q pipelines to record global variables to checkpoints, and recover them in case of failures.
However, even with .qsp.track
, global variables can introduce determinism if they are used to combine
content from different readers, as the order with which batches arrive between multiple readers is nondeterministic.
Currently there is no equivalent in Python pipelines.