My goal is reading all messages from Kafka topic using Flink KafkaSource. I tried to execute with batch and streaming modes. The problem is the following :
I have to use a Sink which contains bug when I set env.setParallelism higher than 2. Thus, I set for example:
streamExecutionEnvironment.setParallelism(1);
The Kafka topic that I want to consume contains 3 partitions. This is a code snippet that I have:
KafkaSourceBuilder<Request> builder = KafkaSource.builder();
builder.setBootstrapServers(kafkaBrokers);
builder.setProperty("partition.discovery.interval.ms", "10000");
builder.setTopics(topic);
builder.setGroupId(groupId);
builder.setBounded(OffsetsInitializer.latest());
builder.setStartingOffsets(OffsetsInitializer.earliest());
builder.setDeserializer(KafkaRecordDeserializationSchema.of(deserializer));
DataStreamSource<Request> streamSource = streamExecutionEnvironment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
streamSource.map(new MyMapper())
.addSink(new Sink(props)).setParallelism(3) //by this setting I expected to have exactly 3 consumers - per partition/split, but it doesn't work when I do not set anything as well
.name("Flink " + context.getJobDetail().getKey());
This code is supposed to run within Spring Boot application that will be dockerized, I configured a quartz job that periodically will be executed and streamExecutionEnvironment is local environment :
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
At this point there are already messages within the topic, more than 10M. When the job is executed I can see in the log:
[ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=rrequest_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-2
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-0
INFO 7748 --- [ -> Map (1/1)#0] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=request_consumer_test-0, groupId=request_consumer_test] Seeking to EARLIEST offset of partition request-1
Then they consume about 1M messages in total and stop consumption and for all of 3 I can see:
[ -> Map (1/1)#0] o.a.f.c.b.s.reader.fetcher.SplitFetcher : Finished reading from splits [request-1]
Thus, they do not fully consume the topic, only part of it. When quartz job is re-triggered, it again starts reading from OffsetsInitializer.earliest() , they consume duplicate messages but also new messages, not only newly added to the topic, but also some messages which weren't consumed during previous execution.
I have tried also renaming consumer groups in order to eliminate the problem with offsets in case if consumer committed after previous consumption.
My question is - how can I configure the data stream in order to fully read the topic. How my problem is related to setParallelism(1) setting or parallelism in general, consumer group configuration or anything else ? Please give me any suggestion on troubleshooting the problem.