4

I am trying to create a Kafka Consumer for a topic without using the @KafkaListener annotation. I want to do this because I am trying to dynamically create listeners based on the application.properties without the use of spring boot.

I figured the best route for this would be to manually create a KafkaListenerContainerFactory Could someone please provide an example of how to do this in it's own class.

Clev_James23
  • 171
  • 1
  • 3
  • 15

3 Answers3

15
  • with spring
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(String topic) {

    ContainerProperties containerProperties = new ContainerProperties(topic);
    containerProperties.setMessageListener(new MyMessageListener());

    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
    KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    listenerContainer.setAutoStartup(false);
    // bean name is the prefix of kafka consumer thread name
    listenerContainer.setBeanName("kafka-message-listener");
    return listenerContainer;
}

private Map<String, Object> consumerProperties(){
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    return props;
}

static class MyMessageListener implements MessageListener<String, String> 
    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // do something
    }
}
     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
ho yoje
  • 355
  • 2
  • 9
2

I had the same need. I don't want to use low level consumer and call poll by myself. I want to use the same logic @KafkaListener does and just configure it dynamically, especially create multiple Kafka listeners based on configuration.

Below solution is what I was looking for: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

v1t3x
  • 21
  • 1
0

This will start a consumer thread in the background. Also it will send a message after every 10 seconds to test-topic.

  import org.apache.kafka.clients.consumer.ConsumerConfig;
  import org.apache.kafka.clients.producer.ProducerConfig;
  import org.apache.kafka.common.serialization.StringDeserializer;
  import org.apache.kafka.common.serialization.StringSerializer;
  import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  import org.springframework.kafka.core.KafkaTemplate;
  import org.springframework.kafka.core.ProducerFactory;
  import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  import org.springframework.kafka.listener.config.ContainerProperties;

  import java.util.Date;
  import java.util.HashMap;
  import java.util.Map;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.TimeUnit;

  public class PeriodicProducerConsumer implements Runnable {

    KafkaTemplate<String, Object> kafkaTemplate;
    ScheduledExecutorService service;

    PeriodicProducerConsumer() {
      // Producer Declaration
      Map<String, Object> configs = new HashMap<>();
      configs.put("bootstrap.servers", "localhost:9092");
      configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

      ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(configs);
      this.kafkaTemplate = new KafkaTemplate<>(producerFactory);


      // Consumer Declaration
      Map<String, Object> consumerConfig = new HashMap<>();
      consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
      consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "airbus-service-ka-consumer-group");

      DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
          new DefaultKafkaConsumerFactory<>(
              consumerConfig,
              new StringDeserializer(),
              new StringDeserializer());

      String topic = "test-topic";
      ContainerProperties containerProperties = new ContainerProperties(topic);
      containerProperties.setMessageListener(new AirbusServiceKaMessageListener());

      ConcurrentMessageListenerContainer<String, String> container =
          new ConcurrentMessageListenerContainer<>(
              kafkaConsumerFactory,
              containerProperties);
      container.start();
    }

    public void start() {
      service = Executors.newSingleThreadScheduledExecutor();
      service.scheduleWithFixedDelay(this, 5, 10, TimeUnit.SECONDS);
    }

    public void stop() {
      service.shutdown();
    }

    @Override
    public void run() {
      String data = String.format("New Airbus Hello at %s", new Date());
      kafkaTemplate.send("test-topic", data);
    }
  }

Here is Consumed message processing function:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

public class AirbusServiceKaMessageListener implements MessageListener<String, Object> {
    private static final Logger LOG = LoggerFactory.getLogger(AirbusServiceKaMessageListener.class);

    @Override
    public void onMessage(ConsumerRecord<String, Object> data) {
        LOG.info("########################## New Consuming Message From Message Listener ##########################");
        LOG.info("Message # {}", data.value());
        LOG.info("#################################################################################################");
    }
}

Bean.xml

    <bean name="purekafkaProduceConsume" class="com.myntra.airbus.purekafka.PeriodicProducerConsumer" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>
amarVashishth
  • 847
  • 3
  • 12
  • 26