9

I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".

dcos kafka topic create topic1 --partitions 3 --replication 3

Then I wrote a Producer class to send messages and a Consumer class to receive them.

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

First I ran Producer on the cluster to send messages to topic1. However when I ran Consumer, it couldn't receive anything, just hang.

Producer is working since I was able to get all the messages by running the shell script that came with Kafka install

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

But why can't I receive with Consumer? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
ddd
  • 4,665
  • 14
  • 69
  • 125
  • To make sure, the group.id is not an issue, use `kafkaConsumer.seekToBeginning()` – Matthias J. Sax Feb 27 '17 at 22:49
  • @MatthiasJ.Sax Should I delete `consumerConfig.put("auto.offset.reset", "earliest");` then? Where should I add this line, after subscribing? Still not getting anything after adding this. – ddd Feb 27 '17 at 22:52
  • Try to use a longer timeout when invoking poll and are there any exceptions thrown in the logs on the server/client side? – amethystic Feb 27 '17 at 23:03
  • 1
    It depends on you overall pattern. "auto.offset.reset" is only applied if there are not committed offsets found while `seekToBeginnig()` ignores any committed offsets. Thus, on startup, you might get different behavior depending if there are committed offsets of not. For example, if you have a container environment, and a container fails and gets restarted, you might want to resume where you left of -- thus, `seekToBeginning()` might not be the behavior you want. – Matthias J. Sax Feb 27 '17 at 23:04
  • @amethystic Changed poll timeout to 1000 and still the same. No exception thrown. Nothing gets printed out. It just hangs. – ddd Feb 28 '17 at 03:41
  • How long did you wait for the consumer to read data? Do you call poll() over and over again (your code seems to call poll only once?). Maybe decreasing `metadata.max.age.ms` helps. – Matthias J. Sax Mar 09 '17 at 19:57
  • 1
    @MatthiasJ.Sax I am writing a test to check if Kafka works. All it does is send a number messages and see if it receives that many messages. Therefore, I need to resume from the last committed offset and only poll once. – ddd Mar 09 '17 at 19:59
  • But how can you be sure, that a single `poll()` return all messages you wrote? There is guarantee about how many messages will be returned (as far as I know) – Matthias J. Sax Mar 10 '17 at 00:50

2 Answers2

8

As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1")); is causing poll() to hang. According to Kafka Consumer does not receive messages , there are two ways to connect to a topic, assign and subscribe. After I replaced subscribe with the lines below, it started working.

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.

Community
  • 1
  • 1
ddd
  • 4,665
  • 14
  • 69
  • 125
  • 3
    The so-called "separate issue" is that you're receiving bytes (since Kafka processes bytes under the hood). You should use a deserializer, e.g. `key.deserializer=org.apache.kafka.common.serialization.StringDeserializer` for key and a separate one for values. See http://kafka.apache.org/documentation/ (but I can't find the exact page for SerDe). – Jacek Laskowski Feb 28 '17 at 08:03
  • 1
    @JacekLaskowski Thanks for the explanation. Saved me another post – ddd Feb 28 '17 at 14:49
  • It may still hang with `assign` if your topic has multiple partitions. Fix this by adding every partition to the list, not just `0`, after finding out how many there are with `./kafka-topics --describe`. – Victor Basso Sep 10 '18 at 12:37
4

Make sure you gracefully shutdown your consumer:

consumer.close()

TLDR

When you have two consumers running with the same group id Kafka won't assign the same partition of your topic to both.

If you repeatedly run an app that spins up a consumer with the same group id and you don't shut them down gracefully, Kafka will take a while to consider a consumer from an earlier run as dead and reassign his partition to a new one.

If new messages come to that partition and it's never assigned to your new consumer, the consumer will never see the messages.

To debug:

  • How many partition your topic has:
    ./kafka-topics --zookeeper <host-port> --describe <topic>
  • How far have your group consumed from each partition:
    ./kafka-consumer-groups --bootstrap-server <host-port> --describe --group <group-id>

If you already have your partitions stuck on stale consumers, either wipe the state of your Kafka or use a new group id.

Victor Basso
  • 5,556
  • 5
  • 42
  • 60