3

I am using Kafka Consumer to read from several topics and I need one of those to have higher priority. The processing takes a lot of time and there are always many messages in (low priority) topics, but I need the messages from other one to be processed as soon as possible.

It's similar question as Does Kafka support priority for topic or message? but this one is using old API.

In new API (0.10.1.1), there are methods

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)

But it's not clear to me, how to effectively detect that there are new messages in high priority topic and it is necessary to pause consumption from the other topics.

Any ideas/examples?

miran
  • 1,419
  • 1
  • 12
  • 26
  • 1
    You could check if endOffsets for the partitions you are monitoring are bigger than the last committed offsets for those partitions. How this works exactly will be implementation specific but that'll get you to know if there are more messages to consume before you poll – dawsaw Jul 17 '17 at 12:36
  • Please take a look at this one, it may be what you are looking for: https://stackoverflow.com/a/66013251/4602706 – Marco Vargas Feb 04 '22 at 20:28

2 Answers2

7

Finally I solved that, as dawsaw advised - in processing loop, I store for all topics/partitions I read from:

  • beginningOffsets
  • endOffsets
  • committed - I can't use position, since I subscribe to topics, not to partitions.

Whenever (endOffset - commited) > 0 for any priority topic, I call consumer.pause() for non priority topics and resume those again after (endOffset - commited) == 0 for all priority topics.

miran
  • 1,419
  • 1
  • 12
  • 26
  • Can you please share your strategy solving the issue? Suppose we have (overall 10 Gbs) of low-priority messages and a few high priority messages. We have multiple consumers and multiple producers. Even if we pause the consumers, we will also need to pause the producers of all other topics, in order to make your idea realize. Right? Did you have any experience on this please, because it seems nearly impossible in a 100 service and 10s of topics ecosystem? -- and yes I have read your related other question on the matter. Thanks – JSBach Sep 04 '17 at 19:59
  • Nope - there is no need to pause any producer - the idea is that you have single consumer subscribed to several topics (some of those topics are hi-priority and others normal-priority). Before polling for new messages, you need to check lag(s) for for hi-priority topic. If any of those lags are non-zero, it means, that you need to pause subscription for normal-priority topics not to "steel" time of your consumer. After you process all the messages from hi-priority topics, you can resume the normaln-priority ones again. – miran Sep 08 '17 at 14:49
  • Thanks. I can’t exactly defy. But it smells bad for larger systems. Once the dam doors are open for a huge amount of data, I will have to check now and then if I’m wasting resources with this low priority queue. Why should I? Right. Anyway. Thanks again – JSBach Sep 09 '17 at 15:44
  • @miran Do you know if there is a similar implementation with the Python client for Kafka? – activelearner Dec 08 '17 at 23:13
  • I believe python client provides same API as java one - so you can definitely implement it ... – miran Dec 18 '17 at 16:39
  • @miran I'm curious if you're still using this approach. Any idea how it scales? I would like to implement something similar, but rather than setting the threshold to 0, I would like it to be a configurable value...and likely set to a higher number like 500 or 1000. I would also like to consider using 3 topics: high, medium, and low priority. I'm just concerned that the pausing/resuming will not be performant. Any thoughts? – grt3kl Feb 14 '18 at 21:09
3

I guess that you could you a mix of position() and committed() methods. The position() method gets the offset of the next record that will be fetched and the committed() method gets the last committed offset for the given partition (as described in the documentation). Before polling on the lower priority you could check the position() and committed() for the higher priority. If position() is higher than committed() you could pause() the lower priority and poll() on the higher priority(), then resuming the lower priority.

ppatierno
  • 9,431
  • 1
  • 30
  • 45