2

Is there any method to process messages with high priority first?

I tried creating three topics 'high', 'medium', and 'low', and subscribed to all three topics with one consumer and if there is an unprocessed message in the 'high' topic it will pause the other two. Is there any better way for implementing message priority?

I tried using the logic given below.

topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)

while True:

    messages = consumer.poll() 
    high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
    medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
    low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)

    if high_priotity_unprocessed_msg >0:  
     consumer.pause(medium_topic_partition)
            consumer.pause(low_topic_partition)

        else:
            consumer.resume(medium_topic_partition)

            if medium_priotity_unprocessed_msg >0:
                consumer.pause(low_topic_partition)
            else:
                consumer.resume(low_topic_partition)
        if messages:
            process(messages)
Shaido
  • 27,497
  • 23
  • 70
  • 73
Sudheer
  • 23
  • 3
  • Does this answer your question? [Does Kafka support priority for topic or message?](https://stackoverflow.com/questions/30655361/does-kafka-support-priority-for-topic-or-message) – Michael Heil Feb 02 '21 at 09:00

1 Answers1

2

One option that you may evaluate is basically just having more parallelism on higher priority messages...

For example:

Topic1 (Priority Low):    1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High):  20 partitions

And then basically have:

  • 1 consumers to get the data from topic1
  • 5 consumers from topic2
  • 20 consumers from topic3

Now, I would suggest you the easiest way to do this is basically write the code once... but externalize the configuration of the "topic name"... and then just scale it up (of course using containers)... Please refer to this reading:

For example, the code could be as simple as:

SuperAwesomeAppBinaryCode:

topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)

while True:

    messages = consumer.poll() 
    if messages:
        process(messages)

Now, if we have that code deployed on, let's say K8s, you could have 3 different deployments, running the same code, but injecting the right topic for each case, for example:

Low Priority Messages

apiVersion: apps/v1
kind: Deployment
metadata:
  name: LowPriorityProcessor
  labels:
    app: LowPriorityProcessor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: LowPriorityProcessor
  template:
    metadata:
      labels:
        app: LowPriorityProcessor
    spec:
      containers:
      - name: LowPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic1
        ports:
        - containerPort: 80

Medium Priority Messages

apiVersion: apps/v1
kind: Deployment
metadata:
  name: MediumPriorityProcessor
  labels:
    app: MediumPriorityProcessor
spec:
  replicas: 5
  selector:
    matchLabels:
      app: MediumPriorityProcessor
  template:
    metadata:
      labels:
        app: MediumPriorityProcessor
    spec:
      containers:
      - name: MediumPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic2
        ports:
        - containerPort: 80

High Priority Messages

apiVersion: apps/v1
kind: Deployment
metadata:
  name: HighPriorityProcessor
  labels:
    app: HighPriorityProcessor
spec:
  replicas: 20
  selector:
    matchLabels:
      app: HighPriorityProcessor
  template:
    metadata:
      labels:
        app: HighPriorityProcessor
    spec:
      containers:
      - name: HighPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic3
        ports:
        - containerPort: 80

And then just let the parallelism do its magic If you check carefully the only thing that changes from one "k8s deployment" to another is the topic and the number of replicas.

Notes:

  • You can achieve this without K8s.... using Docker Swarm or even just docker-compose or running manually the instances ‍♂️, but why would you like to reinvent the wheel, but for sure in some edge cases, there is no much option...
  • A nice reading about this topic can be found here
Marco Vargas
  • 1,232
  • 13
  • 31