I am using Alpakka kafka connector to consume packets from kafka. I am using Consumer as a CommittableSource. I would like to create multiple consumer threads on a single machine and use them as a single source. How can I achieve that?
Currently, I have created multiple sources using Consumer.CommittableSource and merge all the sources into a single source using "merge" function. But I am not sure whether this is a right approach as I am not creating the threads.
Please find below the Source code that I am currently using :
public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> source() {
Source finalSource = Source.empty();
for (int index = 0; index < consumerConfig.getNoOfConsumers(); index++) {
finalSource = finalSource.merge(Consumer.committableSource(consumerSettings, subscription));
}
return finalSource;
}