0

I am very new to Kafka. I am creating two topics and publishing on these two topics from two Producers. I have one consumer which consumes the messages from both the topics. This is because I want to process according to the priority.

I am getting a stream from both the topics but as soon as I start iterating on ConsumerItreator of any stream, it blocks there. As it's written in documentation, it will be blocked till it gets a new message.

Is any one aware of how to read from two topics and two streams from a single Kafka Consumer?

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
                topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();

                while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
                {
                    byte[] bytes = highPrioerityIterator.next().message();
                    Object obj = null;
                    CLoudDataObject thunderDataObject = null;
                    try
                    {

                        obj = SerializationUtils.deserialize(bytes);
                        if (obj instanceof CLoudDataObject)
                        {
                            thunderDataObject = (CLoudDataObject) obj;
                            System.out.println(thunderDataObject);
                            // TODO Got the Thunder object here, now write code to send it to Thunder service.
                        }

                    }
                    catch (Exception e)
                    {
                    }
                }
dasmaximum
  • 93
  • 1
  • 14
aviundefined
  • 802
  • 2
  • 10
  • 25

1 Answers1

0

If you don't want to process lower priority messages before high priority ones, how about setting consumer.timeout.ms property and catch ConsumerTimeoutException to detect that the flows for high priority reach the last message available? By default it's set -1 to block until a new message arrives. (http://kafka.apache.org/07/configuration.html)

The below explains a way to process multiple flows concurrently with different priorities.

Kafka requires multi-thread programming. In your case, the streams of the two topics need to be processed by threads for the flows. Because each thread will run independently to process messages, one blocking flow (thread) won't affect other flows.

Java's ThreadPool implementation can help the job in creating multi-thread application. You can find example implementation here:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Regarding the priority of execution, you can call Thread.currentThread.setPriority method to have the proper priorities of threads based on their serving Kafka topic.

suztomo
  • 5,114
  • 2
  • 20
  • 21
  • Thanks for your answer. But just by setting the thread priority will not help me. My use case is first consume the High priority topic if it is empty then consume from low priority topic. Please check the answer present in this post and it seems 'sky' has achieved the same functionality: http://stackoverflow.com/questions/30655361/does-apache-kafka-supports-priority-for-topic-or-message – aviundefined Jun 10 '15 at 00:37
  • 1
    Now I understand your requirement. How about setting consumer.timeout.ms property and catch ConsumerTimeoutException to detect that the consumer reaches the last message available? By default it's set -1 not to cause timeout. http://kafka.apache.org/07/configuration.html – suztomo Jun 10 '15 at 00:56
  • It will also not solve the problem. It can't handle the following scenario: 1) Lets say we have two topics "High" and "Low" and on "Low" topic we have very large message flow. So once it will start reading the message from "Low", it will not hit it's time out until "Low" stream is empty for configured time out time (which is very low as 100ms) Please correct me if I am wrong – aviundefined Jun 10 '15 at 18:45
  • Your point seems right. although I thought your requirement is to consume high priority topic first until it's empty. In question, can you describe several scenarios you have in mind? – suztomo Jun 10 '15 at 19:00
  • Lets say we have 1 Producer publish on "High" priority topic and 100 Producer publishing on "Low" priority topic. In my use case I am expecting large traffic on "Low" priority topic. After consuming the message, it needs to send to some third party cloud which doesn't allow multiple connections. So I want to consume High always befor Low – aviundefined Jun 10 '15 at 20:56
  • @aviundefined If you always need to process high priority topic first, then use the timeout method for high priority flow reading before processing low priority. If you allow low priority flow to be processed when high priority flows haven't received messages, then use the multi-threading method with Thread priority. – suztomo Jun 11 '15 at 05:04