7

I'm constructing messages using below code...

Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420",  "{message:'hello world'}");          
producer.send(keyedMsg);

And Consuming using following code block...

//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();       
topicCount.put(topic, 1);

//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);         

// Get Kafka stream for topic
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);

// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();         
    while (consumerIte.hasNext()) {             
        MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();            
        System.out.println(topic.toUpperCase() + ">"
                + " Partition:" + msg.partition()
                + " | Key:"+ new String(msg.key())
                + " | Offset:" + msg.offset()
                + " | Message:"+ new String(msg.message()));
    }
}

Everything is working fine because I'm reading data topic wise. So I want to know that Is there any way to to consume data using message key i.e. device-420 in this example?

User
  • 4,023
  • 4
  • 37
  • 63

1 Answers1

7

Short answer: no.

The smallest granularity in Kafka is a partition. You can write a client that reads only from a single partition. However, a partition can contain multiple keys and you need to consume all the keys contained in this partition.

Tvaroh
  • 6,645
  • 4
  • 51
  • 55
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks! you saved my time. But how can we access data partition wise. Can you please post code? – User May 26 '16 at 04:36
  • 1
    Depends you the Kafka version you are using. Have a look here for 0.8: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example – Matthias J. Sax May 26 '16 at 08:38