5

EDIT

In case anyone else is in this particular situation, I got something akin to what I was looking for after tweaking the consumer configurations. I created a producer that sent the priority messages to three separate topics (for high/med/low priorities), and then I created 3 separate consumers to consume from each. Then I polled the higher priority topics frequently, and didn't poll the lower priorities unless the high was empty:

    while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records

The poll timeout (argument to the .poll() method) determines how long to wait if there are no records to poll. I set this to a very short time for each topic, but you can set it lower for the lower priorities to make sure it's not consuming valuable cycles waiting when high pri messages are there

The max.poll.records config obviously determines the maximum number of records to grab in a single poll. This could be set higher for the higher priorities as well.

The max.poll.interval.ms config determines the time between polls - how long it should take to process max.poll.records messages. Clarification here.

Also, I believe pausing/resuming an entire consumer/topic can be implemented like this:

    kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }

I'm not sure if this is the best way, but I couldn't find a good example elsewhere

I agree with senseiwu below that this is not really the correct use for Kafka. This is single-threaded processing, with each topic having a dedicated consumer, but I will work on improving this process from here.


Background

We are trying to improve our application and hoping to use Apache Kafka for messaging between decoupled components. Our system is frequently low-bandwidth (although there are cases where bandwidth can be high for a time), and have small, high-priority messages that must be processed while larger files wait, or are processed slowly to consume less bandwidth. We would like to have topics with different priorities.

I am new to Kafka, but have tried looking into both the Processor API and Kafka Streams with no success, although certain posts on forums seem to be saying this is doable.

Processor API

When I tried the Processor API, I tried to determine if the High Priority KafkaConsumer was currently processing anything by checking if poll() was empty, and then hoped to poll() with the Med Priority Consumer, but the second topic poll returned empty. There also didn't seem to be an easy way to get all TopicPartition's on a topic in order to call kafkaConsumer.pause(partitions).

Kafka Streams

When I tried KafkaStreams, I set up a stream to consume from each of my "priority" topics, but there was no way to check if the KStream or KafkaStreams instance connected to the higher-priority topic was currently idle or processing.

I based my code on this file

Other

I also tried the code here: priority-kafka-client, but it didn't work as expected, as running the downloaded test file had mixed priorities.

I found this thread, where one of the developers says (addressing adding priorities for topics): "...a user could implement this behavior with pause and resume". But I was unable to find out how he meant this could work.

I found this StackOverflow article, but they seem to be using a very old version, and I was unclear on how their mapping function was supposed to work.

Conclusion

I would be very grateful if someone would tell me if they think this is something worth pursuing. If this isn't how Apache Kafka is supposed to work, because it disrupts the benefit gained from the automatic topic/partition handling, that's fine, and I will look elsewhere. However, there were so many instances where people seemed to have success with it, that I wanted to try. Thank you.

kiwikski
  • 150
  • 1
  • 11
  • Instead of updating you question, you should post an answer to your own question and accept it :) -- Btw: Kafka Streams is not a good fit, because message are prioritized based on their timestamps, what makes sense for data stream processing. – Matthias J. Sax Apr 15 '19 at 02:26

1 Answers1

2

This sounds like a design issue in your application - kafka is originally designed as a commit log where each message is written to the broker with an offset and various consumer consume them in the order in which they were committed with very low latency and high throughput. Given that partitions and not topics are fundamental unit of work distribution in Kafka, having topic level priorities would be difficult to achieve natively.

I'd recommend to adapt your design to use other architectural components than Kafka instead of trying to cut your feet to fit into the shoes. One thing you could already do is to let your producer upload file to a proper file storage and send the link via Kafka including metadata. Then depending upon the bandwidth status, your consumer could decide based on metadata of the large file whether it is sensible to download or not. This way you are probably more likely to have a robust design rather than using Kafka the wrong way.

If you indeed want to stick to only Kafka, one solution would be to send large files to some fixed number of hardcoded partitions and consumers consume from those partitions only when bandwidth is good.

senseiwu
  • 5,001
  • 5
  • 26
  • 47