1

My goal is reading all messages from Kafka topic using Flink KafkaSource. I tried to execute with batch and streaming modes. The problem is the following : I have to use a Sink which contains bug when I set env.setParallelism higher than 2. Thus, I set for example: streamExecutionEnvironment.setParallelism(1);

The Kafka topic that I want to consume contains 3 partitions. This is a code snippet that I have:

KafkaSourceBuilder<Request> builder = KafkaSource.builder();
    builder.setBootstrapServers(kafkaBrokers);
    builder.setProperty("partition.discovery.interval.ms", "10000");
    builder.setTopics(topic);
    builder.setGroupId(groupId);
    builder.setBounded(OffsetsInitializer.latest());
    builder.setStartingOffsets(OffsetsInitializer.earliest());
    builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));

DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    streamSource.map(new MyMapper())
            .addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
            .name("Flink " + context.getJobDetail().getKey());

This code is supposed to run within Spring Boot application that will be dockerized, I configured a quartz job that periodically will be executed and streamExecutionEnvironment is local environment : StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

At this point there are already messages within the topic, more than 10M. When the job is executed I can see in the log:

    [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1

Then they consume about 1M messages in total and stop consumption and for all of 3 I can see:

[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher  : Finished reading from splits [request-1]

Thus, they do not fully consume the topic, only part of it. When quartz job is re-triggered, it again starts reading from OffsetsInitializer.earliest() , they consume duplicate messages but also new messages, not only newly added to the topic, but also some messages which weren't consumed during previous execution.

I have tried also renaming consumer groups in order to eliminate the problem with offsets in case if consumer committed after previous consumption.

My question is - how can I configure the data stream in order to fully read the topic. How my problem is related to setParallelism(1) setting or parallelism in general, consumer group configuration or anything else ? Please give me any suggestion on troubleshooting the problem.

Sergey Gazaryan
  • 1,013
  • 1
  • 9
  • 25

1 Answers1

0

The problem is related to

builder.setBounded(OffsetsInitializer.latest());

This line tells Kafka to read messages to the last offset seen at the start of the job. It will then stop consuming more messages.

bottaio
  • 4,963
  • 3
  • 19
  • 43
  • thanks for the answer, the thing is that I want to enable batch processing, consume message till the last offset, it will be recurring job, thus the next time it will load newest ones. – Sergey Gazaryan May 27 '22 at 16:40
  • the problem is that kafka consumers do not consume all existing messages from the beginning. I tried to set stream.rebalance, changed watermarking of messages with different options, nothing helps. Let say with first execution from the beginning of the messages it loaded for 2022-05-22 1M messages, for 2022-05-23 1M messages. With second execution - 2022-05-22 another 0.5M messages and for 2022-05-23 another 0.5M messages. I would be ok with that, but I want to have also checkpoints, in order to not only run from the beginning. – Sergey Gazaryan May 27 '22 at 16:46