0

I am using simple Kafka client API. As far as I know there are two ways to consumer messages, subscribe to a topic and assign partition to consumer.

However the first method does not work. Consumer poll() would hang forever. It only works with assign.

    // common config for consumer
    Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", bootstrap);

    config.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    config.put("enable.auto.commit", "true");
    config.put("auto.offset.reset", "earliest");
    config.put("key.deserializer", StringDeserializer.class.getName());
    config.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, deserializer, deserializer);

    // subscribe does not work, poll() hangs
    consumer.subscribe(Arrays.asList(KafkaTestConstants.KAFKA_TOPIC));

Here is the code that works by assigning the partition.

    // assign works
    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    consumer.assign(tps);

Since I'd like to utilize the auto commit feature which is supposed to only work with consumer group management according to this post. Why does not subscribe() work?

Community
  • 1
  • 1
ddd
  • 4,665
  • 14
  • 69
  • 125
  • It is not possible to use both manual partition assignment with assign(Collection) and group assignment with subscribe. – amethystic Mar 10 '17 at 00:54
  • @amethystic I don't mean to make BOTH to work. Just trying to get `subscribe` working is all. – ddd Mar 10 '17 at 01:29
  • Could you remove `consumer.assign(tps)` then and retry ? – amethystic Mar 10 '17 at 01:32
  • @amethystic Sorry about the confusion. The last two blocks of code don't exist at the same time. Just trying to say the `subscribe` method does not work while the last three lines work – ddd Mar 10 '17 at 01:35
  • What version of Kafka do you use on server side and client side? And you could also try a new group.id to rerun the program, checking whether there are any exceptions thrown on the server/client side? – amethystic Mar 10 '17 at 01:42
  • @amethystic the kafka client version is 0.10.2.0. The kafka version on the server is 0.1.0. I had UUID in group id originally, it seemed to work by getting all the messages every time. – ddd Mar 10 '17 at 01:47
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/137718/discussion-between-amethystic-and-ddd). – amethystic Mar 10 '17 at 02:06
  • Hi guys, I'm facing the same issue and I tried following the resolution of this in the chat, but regretfully I can't read Chinese. Could any of you please post which was the solution found? Thanks in advance. – JPS Jan 25 '19 at 20:25
  • @JPS Essentially, use an older version of consumer instead. It's been a while and I don't remember how exactly this was resolved. Good luck – ddd Jan 27 '19 at 00:50

1 Answers1

-1

I faced the same issue. I was using the kafka_2.12 jar version, when I downgrade it to kafka_2.11 it worked.

Gunjan
  • 19
  • 6