0

I have a pipeline running on Dataflow that ingests files containing several thousand records. These files arrive at a steady frequency, which are processed by a stateful ParDo with timers that attempts to throttle the rate of ingest by batching and holding these files until the timer fires, before being expanded into individual record elements via a file processing ParDo, and finally written to BigQuery destinations.

On occasion, either an intermittent event such as an OOM event or autoscaling events, I have seen Dataflow attempting to emit the files in the stateful ParDo after the event resolves, causing duplicate record elements downstream when the file processing ParDo reprocesses the files. I understand that bundles are retried if there is a failure, but do they account for duplicates?

How/What is exactly-once processing achieving in this context, especially with regard to the State/Timer API, since I am seeing duplicates at my destination?

Seng Cheong
  • 442
  • 3
  • 14

1 Answers1

1

Dataflow achieves exactly once processing by ensuring that data produced from failing workers is not passed downstream (or, more precisely, if work is retried only one successful result is consumed downstream). For example, if stage A of your pipeline is producing elements and stage B is counting them, and workers in stage A fail and are re-tried, duplicate elements will not be counted by stage B (though of course stage B might itself have to be retried). This also applies to state and timers--a given bundle of work is either committed in its entirety (i.e. the set of inputs are marked as consumed, and the set of outputs committed atomically with the consumption/setting of state and timers) or entirely discarded (state/timers is left unconsumed/untouched and the retry will not not be influenced by what happened before.)

What is not exactly once is interactions with external systems (due to the possibility of retries). These are instead at least once, and so to guarantee correctness all such interactions should be idempotent. Sinks often achieve this by assigning a unique id such that multiple writes can be deduplicated in the downstream system. For files, one can write to temporary files, and then rename the "winning" set of shards to the final destination after a barrier. It's not clear from your question what files you're emitting (or ingesting) but hopefully this should be helpful in understanding how the system works.

More specifically, say the initial state is {state: A, timers: [X, Y], inputs: [i, j, k]}. Suppose further that when processing the bundle (these timers and inputs) the state is updated to B, we emit elements m, and n downstream, and we set a timer W.

If the bundle succeeds, the new state will be {state: B, timers: [W], inputs: []} and the elements [m, n] are guaranteed to be passed downstream. Furthermore, any competing retry of this bundle would always fail.

On the other hand, if the bundle fails (even if it "emitted" some of the elements or tried to update the state) the resulting state of the system will be {state: A, timers: [X, Y], inputs: [i, j, k]} for a fresh retry and nothing that was emitted from this failed bundle will be observed downstream.

Another way to look at it is that the set {inputs consumed, timers consumed, state modifications, timers set, outputs to produce downstream} is written to the backing "database" in a single transaction. Only a single successful attempt is ever committed, failed attempts are discarded.

More details can be found at https://beam.apache.org/documentation/runtime/model/

robertwb
  • 4,891
  • 18
  • 21
  • Hi @robertwb, thanks for your reply. This does give some clarity as to how exactly-once processing is achieved for stateful and timer-based DoFns. I have updated my question to more precisely describe when these files are emitted – Seng Cheong Mar 23 '22 at 04:08
  • Could you also elaborate on the statement: "state/timers is left unconsumed/untouched and the retry will not not be influenced by what happened before."? Does this mean that that any timers that are set, will be fired again in a retry? Suppose there is a timer callback that outputs some elements - does this mean that in a retry, these elements can be emitted again, but since the bundle is already committed so we can expect these duplicates from being passed downstream? – Seng Cheong Mar 23 '22 at 07:43
  • 1
    No, duplicates will not be passed downstream. – robertwb Mar 23 '22 at 18:42
  • Expanded the answer a little bit more. Only (and all) timers that were set prior to the first attempt will be fired again in a retry. Anything the failed try attempted to do (including any timers it set, elements it emitted) will be ignored. – robertwb Mar 23 '22 at 18:57
  • It's still unclear to me what you mean by "emitting" files, as the question seems to be about files being consumed. – robertwb Mar 23 '22 at 18:59
  • Hi @robertwb, thanks for your reply. In this pipeline, I have a ParDo that uses a combination of the Stateful and Timer API, to hold files received by FileIO. A BagState holds a batch of files, by which a timer callback is then invoked to release(emit) the files to the next ParDo. I hope it clarifies. – Seng Cheong Mar 23 '22 at 19:15
  • We have seen row duplicates in our destination BigQuery table, but given your answer, it gives us a better understanding of the exactly-once processing and helps us to narrow down the cause of the duplicates. We are thinking that it is the BigQueryIO, based on our analysis, as well as your explanation of at-least-once processing with Sinks – Seng Cheong Mar 23 '22 at 19:19