0

I am using Alpakka kafka connector to consume packets from kafka. I am using Consumer as a CommittableSource. I would like to create multiple consumer threads on a single machine and use them as a single source. How can I achieve that?

Currently, I have created multiple sources using Consumer.CommittableSource and merge all the sources into a single source using "merge" function. But I am not sure whether this is a right approach as I am not creating the threads.

Please find below the Source code that I am currently using :

public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
        for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
            finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
        }
return finalSource;
}
  • (Without any experience) I thought concurrent processing on a single topic can only be achieved by specifying multiple partitions for that topic. Every partition can have its own consumer (see https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions) – Conffusion May 06 '19 at 09:04
  • @Conffusion : I already know that every partition can have its own consumer. I was asking how to consume multiple packets but by alpakka connector. – mayank bansal May 06 '19 at 09:53
  • I was just sharing my little theoretical knowledge in the hope it could help. I will not be able to help you further. Good luck. – Conffusion May 06 '19 at 10:06

1 Answers1

0

What makes you believe you need more threads? More ofen you would want to share a single Kafka consumer client instance across multiple streams.

You should not merge elements from multiple Consumer.committableSources into one stream, it will not work with batched committing.

Would running the same stream setup multiple times solve your need?

Enno
  • 283
  • 2
  • 8
  • If I do not require the batch committing, would merging elements from multiple `committableSource` into one stream provide the same performance as provided by `Consumer.committablePartitionedSource` ? `committablePartitionedSource` would create source per partition and thus it provide batch committting ? – mayank bansal May 06 '19 at 09:46
  • No, `committablePartitionedSource` uses a single Kafka consumer internally. Committing works with it. – Enno May 06 '19 at 13:09