I am trying to figure out a solution to the problem of watermarks progress when the number of Kafka partitions is larger than the Flink parallelism employed.
Consider for example that I have Flink app with parallelism of 3 and that it needs to read data from 5 Kafka partitions. My issue is that when starting the Flink app, it has to consume historical data from these partitions. As I understand it each Flink task starts consuming events from a corresponding partition (probably buffers a significant amount of events) and progress event time (therefore watermarks) before the same task transitions to another partition that now will have stale data according to watermarks already issued.
I tried considering a watermark strategy using watermark alignment of a few seconds but that does not solve the problem since historical data are consumed immediately from one partition and therefore event time/watermark has progressed.Below is a snippet of code that showcases watermark strategy implemented.
WatermarkStrategy.forGenerator(ws)
.withTimestampAssigner(
(event, timestamp) -> (long) event.get("event_time))
.withIdleness(IDLENESS_PERIOD)
.withWatermarkAlignment(
GROUP,
Duration.ofMillis(DEFAULT_MAX_WATERMARK_DRIFT_BETWEEN_PARTITIONS),
Duration.ofMillis(DEFAULT_UPDATE_FOR_WATERMARK_DRIFT_BETWEEN_PARTITIONS));
I also tried using a downstream operator to sort events as described here Sorting union of streams to identify user sessions in Apache Flink but then again also this cannot effectively tackle my issue since event record times can deviate significantly.
How can I tackle this issue ? Do I need to have the same number of Flink tasks as the number of Kafka partitions or I am missing something regarding the way data are read from Kafka partitions