0

I am trying to figure out a solution to the problem of watermarks progress when the number of Kafka partitions is larger than the Flink parallelism employed.

Consider for example that I have Flink app with parallelism of 3 and that it needs to read data from 5 Kafka partitions. My issue is that when starting the Flink app, it has to consume historical data from these partitions. As I understand it each Flink task starts consuming events from a corresponding partition (probably buffers a significant amount of events) and progress event time (therefore watermarks) before the same task transitions to another partition that now will have stale data according to watermarks already issued.

I tried considering a watermark strategy using watermark alignment of a few seconds but that does not solve the problem since historical data are consumed immediately from one partition and therefore event time/watermark has progressed.Below is a snippet of code that showcases watermark strategy implemented.

WatermarkStrategy.forGenerator(ws)
        .withTimestampAssigner(
            (event, timestamp) -> (long) event.get("event_time))
        .withIdleness(IDLENESS_PERIOD)
        .withWatermarkAlignment(
            GROUP,
            Duration.ofMillis(DEFAULT_MAX_WATERMARK_DRIFT_BETWEEN_PARTITIONS),
            Duration.ofMillis(DEFAULT_UPDATE_FOR_WATERMARK_DRIFT_BETWEEN_PARTITIONS));

I also tried using a downstream operator to sort events as described here Sorting union of streams to identify user sessions in Apache Flink but then again also this cannot effectively tackle my issue since event record times can deviate significantly.

How can I tackle this issue ? Do I need to have the same number of Flink tasks as the number of Kafka partitions or I am missing something regarding the way data are read from Kafka partitions

ypanag
  • 287
  • 5
  • 22
  • 1
    How are you using this watermark strategy? Are you passing it into `fromSource`, or applying it later using `assignTimestampsAndWatermarks`, or ... ? – David Anderson Feb 17 '23 at 20:42
  • I do apply it later indeed using assignTimestampsAndWatermarks, but immediately after reading from source. – ypanag Feb 19 '23 at 16:55
  • Ok David I have missed that: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_. It is stated clearly there, on the relevant Note section, what my issue was – ypanag Feb 20 '23 at 14:19

1 Answers1

0

The easiest solution to this problem will be using the fromSource with WatermarkStrategy instead of assigning that by using assignTimestampsAndWatermarks.

When You use the WatermarkStrategy directly in fromSource with kafka connector, the watermarks will be partition aware, so the Watermark generated by the given operator will be minimum of all partitions assinged to this operator.

Assigning watermarks directly in source will solve the problem You are facing, but it has one main drawback, since the generated watermark in min of all partitions processed by the given operator, if some partition is idle watermark for this operator will not progress either.

The docs describe kafka connector watermarking here.

Dominik Wosiński
  • 3,769
  • 1
  • 8
  • 22