0

Setup: 1 Topic (test-topic) with 4 partitions

  • Vert.x: 4.0.3
  • RxJava2 : 2.2.12
  • Vert.x internally uses kafka-client: 2.6.0
  • Kafka broker: 2.8.1

Scenario 1: When 1 consumer group with 1 consumer is used, the system takes 60 seconds to complete processing 100 messages

Scenario 2: When 1 consumer group with 4 consumers are used, then the system takes around 15 seconds to process same 100 messages

Scenario 3: When 4 consumer groups having same group id with 1 consumer per group is used, then the system takes 60 seconds to process.

My assumption was in Scenario 3, the time taken to process all the messages would be around 15 seconds but that is not the case. I verified by logs that the partitions are distributed and all the partitions recieve 25 messages. Can anyone please help me understand what I might be doing wrong here that adding consumer groups does not scale? Is this behavior normal (which I do not think so)

Note:

  • The consumer and producer setting are default and the message delivery is "exactly-once".
  • The code is tested on local setup and on AWS too.

Psuedo Code:

    Observable<KafkaConsumerRecord<String, String>> observable = consumer.toObservable();
    consumer.subscribe(topics);
    observable
            .flatMapCompletable(record -> producer.rxBeginTransaction()
                    .andThen(producer.rxSend(producerRecord))
                    .flatMapCompletable(recordMetadata -> Completable.fromAction(() -> {
                        Map<TopicPartition, OffsetAndMetadata> consumerOffsetsMap = new HashMap<>();
                        consumerOffsetsMap.put(
                                new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        producer.getDelegate().unwrap().sendOffsetsToTransaction(consumerOffsetsMap, "grp1");
                    }))
                    .andThen(producer.rxCommitTransaction())
                    .andThen(consumer.rxCommit())
            ).subscribe(() -> {
                System.out.println("Processed successfully");
            });
Vinay Limbare
  • 151
  • 2
  • 16
  • What does "4 consumer groups having same group id" mean? The same ID means it's one group... How are you starting all consumers at the same time? – OneCricketeer Jan 22 '22 at 05:32
  • It's a Java app running on Kubernetes. Just change the replicas to increase the number of instance. – Vinay Limbare Jan 23 '22 at 18:36
  • 1
    Sure, but how are you creating 4 groups with the same ID? – OneCricketeer Jan 23 '22 at 22:25
  • 1
    The consumer config "group.id" is hardcoded to "grp1". So all these (consumer) instances get rebalanced when started. – Vinay Limbare Jan 24 '22 at 13:39
  • I believe that's for "scenario 2"? Which still doesn't answer my question. If you're telling me that's the setup for "scenario 3", that's **still only one group**. Since you're asking specifically about Scenario 3, then you need to specify how you're getting multiple groups – OneCricketeer Jan 24 '22 at 14:47

1 Answers1

0

One group will have (approximately) the same processing time as multiple, independent groups.

With a perfectly balanced topic, and immediate consumer group rebalances, one group with distributed consumers will take a fractional time as you've found -

(time of one consumer reading all partitions / min(number of partitions, number of group members)

In your case, 60 / min(4,4) = 15


Scenario 3: When 4 consumer groups having same group id

That's literally not possible. I assume you meant different group ids. This would be the only reason why Scenario 3 takes the same time as Scenario 1. Because you're repeating Scenario 1 four times. The fact that it takes the same amount of time to run 4 groups means it does scale (horizontally). A reduced time would imply vertical scaling within the group, which you saw in Scenario 2.

app running on Kubernetes. Just change the replicas to increase the number of instance

This alone will not create unique consumer groups. You need to inject a different group.id value into each app to create new consumer groups.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • For scenario 3, the "group.id" value is the same for all 4 instances. This can be confirmed by the rebalancing logs that one Consumer group (with a single consumer) is polling only 1 topic at a time. – Vinay Limbare Jan 24 '22 at 14:57
  • 1
    Forth and final time I'll say it... That's not four groups then – OneCricketeer Jan 24 '22 at 14:58
  • Yes, technically it is 1 group. But that's not the issue I'm running into. It is the time taken by this group to process messages. – Vinay Limbare Jan 24 '22 at 15:29
  • @VinayLimbare Not technically. Literally. Your times are fine. What exactly is unclear from my answer? – OneCricketeer Jan 24 '22 at 16:14