0

I was just reviewing the documentation to understand how Google Dataflow handles watermarks, and it just mentions the very vague:

The data source determines the watermark

It seems you can add more flexibility through withAllowedLateness but what will happen if we do not configure this?

Thoughts so far

I found something indicating that if your source is Google PubSub it already has a watermark which will get taken, but what if the source is something else? For example a Kafka topic (which I believe does not inherently have a watermark, so I don't see how something like this would apply).

Is it always 10 seconds, or just 0? Is it looking at the last few minutes to determine the max lag and if so how many (surely not since forever as that would get distorted by the initial start of processing which might see giant lag)? I could not find anything on the topic.

I also searched outside the context of Google DataFlow for Apache Beam documentation but did not find anything explaining this either.

Dennis Jaheruddin
  • 21,208
  • 8
  • 66
  • 122
  • Hi @Dennis Jaheruddin, Does this answer your question [stackoverflow.com/questions/42169004](https://stackoverflow.com/questions/42169004/what-is-the-watermark-heuristic-for-pubsubio-running-on-gcd/42234263#42234263)? You can also refer to this [document](https://cdn.oreillystatic.com/en/assets/1/event/155/Watermarks_%20Time%20and%20progress%20in%20streaming%20dataflow%20and%20beyond%20Presentation.pdf).Let me know if it’s helpful or not? – Prajna Rai T Jun 10 '22 at 13:38
  • I read both, but it is not as clear as I wish. As mentioned, in the doc that I link it says "The data source determines the watermark", and these pages mostly focus on explaining that defining a watermark is hard. Perhaps pubsub doesn't actually work with a watermark as per the typical definition but instead does something similar but slightly different (focusing on deviations compared to messages received). – Dennis Jaheruddin Jun 27 '22 at 10:59
  • Hi @DennisJaheruddin, If you find my answer helpful, please consider to accept & upvote it as per [Stack Overflow guidelines](https://stackoverflow.com/help/someone-answers), helping more Stack contributors with their researches. If not, let me know so that I can improve the answer. – Prajna Rai T Jun 28 '22 at 09:44

1 Answers1

3

When using Apache Kafka as a data source, each Kafka partition may have a simple event time pattern (ascending timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel, interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka’s consumer clients work).

In that case, you can use Flink’s Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.

For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the ascending timestamps watermark generator will result in perfect overall watermarks. Note, that TimestampAssigner is not provided in the example, the timestamps of the Kafka records themselves will be used instead.

In any data processing system, there is a certain amount of lag between the time a data event occurs (the “event time”, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the “processing time”, determined by the clock on the system processing the element). In addition, there are no guarantees that data events will appear in your pipeline in the same order that they were generated.

For example, let’s say we have a PCollection that’s using fixed-time windowing, with windows that are five minutes long. For each window, Beam must collect all the data with an event time timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belong to a different window.

However, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. Beam tracks a watermark, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data.

From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Beam would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.

Prajna Rai T
  • 1,666
  • 3
  • 15