I am using simple Kafka client API. As far as I know there are two ways to consumer messages, subscribe to a topic and assign partition to consumer.
However the first method does not work. Consumer poll()
would hang forever. It only works with assign
.
// common config for consumer
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", bootstrap);
config.put("group.id", KafkaTestConstants.KAFKA_GROUP);
config.put("enable.auto.commit", "true");
config.put("auto.offset.reset", "earliest");
config.put("key.deserializer", StringDeserializer.class.getName());
config.put("value.deserializer", StringDeserializer.class.getName());
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, deserializer, deserializer);
// subscribe does not work, poll() hangs
consumer.subscribe(Arrays.asList(KafkaTestConstants.KAFKA_TOPIC));
Here is the code that works by assigning the partition.
// assign works
TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
List<TopicPartition> tps = Arrays.asList(tp);
consumer.assign(tps);
Since I'd like to utilize the auto commit feature which is supposed to only work with consumer group management according to this post. Why does not subscribe()
work?