4

Note to duplicate markers: I DID check out the other question, but it does not answer my specific question below.

So imagine I have a Kafka topic on a single server with only one partition. So it is much similar to a queue.

Now lets assume I want 100 listeners waiting to accept values from the queue. So by design, if all 100 consumers are in a single group, the contents from the log (or queue here) will be distributed among the consumers. So the operation will be over in 1/100th of the time.

The problem is that the Spring Kafka listener is only configured with the topic name.

@Service
public class Consumer {

    @KafkaListener(topics = "${app.topic}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        System.out.println("Received message="+message);
        headers.keySet().forEach(key -> System.out.println(key+"->"+headers.get(key)));
    }
}

I can seem to get Kafka to spawn up a 100 consumers for processing messages from the "queue" (logs). How can it be done?

Sam Fisher
  • 63
  • 1
  • 6
  • 1
    See the answer. You question is not about Apache Kafka. There is no queue-like behavior with Kafka at all. You can't distribute evenly messages from the same partition in Kafka. There is an `offset` entity to deal with. And this one must be processed in the proper order. So, if you want queue, don't go to Kafka! – Artem Bilan Nov 02 '18 at 18:56
  • 1
    Stop mixing up "queue" and "topic" they are in no way similar. Kafka uses "topic" – DwB Nov 02 '18 at 19:35

1 Answers1

5

Check out this answer for an understanding of Kafka consumers In Apache Kafka why can't there be more consumer instances than partitions?

To properly distribute messages amongst a single consumer group you must have more than one partition. Once you find the correct partition amount for your load I would use spring cloud streaming to better manage your concurrency and consumer group assignment.

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

Sample of sink

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void handle(Person person) {
    System.out.println("Received: " + person);
}

public static class Person {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}
}

Concurrency settings

cloud:
  stream:
    bindings:
      input:
        destination: <topic-name>
        group: <consumer-group>
        consumer:
          headerMode: raw
          partitioned: true
          concurrency: 20

More information available here https://cloud.spring.io/spring-cloud-stream/

danzdoran
  • 281
  • 2
  • 7