2

I run elixir program with producer and consumer. And I set priority for my messages. But priority is not working.

My program:

Publisher:

Basic.publish(channel, "my_exchange", "my_queue", "1", persistent: true, priority: 2)
Basic.publish(channel, "my_exchange", "my_queue", "2", persistent: true, priority: 3)
Basic.publish(channel, "my_exchange", "my_queue", "3", persistent: true, priority: 1)
Basic.publish(channel, "my_exchange", "my_queue", "4", persistent: true, priority: 0)
Basic.publish(channel, "my_exchange", "my_queue", "5", persistent: true, priority: 4)

Consumer:

def init(_) do
Connection.open(Application.get_env(:app, :rabbitmq))
{:ok, channel} = Channel.open(conn)
:ok = Basic.qos(channel, prefetch_count: 1, global: true)
{:ok, _consumer_tag} = Basic.consume(channel, "my_queue", nil, arguments: [{"x-priority", :signedint, 100}])
{:ok, %{channel: channel}}
end

def handle_info({:basic_deliver, payload, %{delivery_tag: tag, priority: priority}}, %{channel: channel} = state) do

Logger.info("Received message with payload: #{payload} and priority: #{priority}")

Basic.ack(channel, tag)

{:noreply, state}
end

After publishing, I run consumer.

Expected output:

Received message with payload: 5 and priority: 4
Received message with payload: 2 and priority: 3
Received message with payload: 1 and priority: 2
Received message with payload: 3 and priority: 1
Received message with payload: 4 and priority: 0

Actual output:

Received message with payload: 1 and priority: 2
Received message with payload: 2 and priority: 3 
Received message with payload: 3 and priority: 1
Received message with payload: 4 and priority: 0
Received message with payload: 5 and priority: 4

Anything I did wrong? Is message's priority not working?

Onorio Catenacci
  • 14,928
  • 14
  • 81
  • 132
  • Possible duplicate of [RabbitMQ and message priority](https://stackoverflow.com/questions/10745084/rabbitmq-and-message-priority) – theMayer Aug 14 '18 at 13:37

3 Answers3

3

One should not expect messages to be sorted inside a queue, due to what “queue” basically means (roughly any queue is either LIFO or FIFO, RabbitMQ’s one is the latter.)

Shuffling messages in the queue to sort them all by their priorities on every single insert would drastically decrease a performance. What is actually allowed by RabbitMQ (effective since 3.5.0,) is:

NB Despite the above, starting with 3.5.0 RabbitMQ allows the sorting of messages that are stuck in the queue, assuming consumers get messages slower that they get to the queue. In such a case unconsumed messages will get sorted.

It still does not get to guaranteed sorting, though. Priorities only allow you to make some queues/consumers to “raise their hands” to receive messages first unless they are blocked. Otherwise, the next one in the priority chain would receive a message.

If you need to sort an income (it’s hard to imagine why would you ever pick up a message queue in such a case, but still,) you have to sort it either before sending to queue in publisher, or after collecting all the expected messages from the queue in consumer.

Aleksei Matiushkin
  • 119,336
  • 10
  • 100
  • 160
  • It turns out that rabbit allows you to send messages with priority, but does not take it into account? – Dmitry Krakosevich Aug 13 '18 at 10:17
  • Options are just [a keyword list](https://github.com/pma/amqp/blob/master/lib/amqp/basic.ex#L51), not restricted by keyword name, you might send `foo: :bar` there, it’ll be filtered out by the client. But `priority` is indeed allowed to be sent. Based on it rabbit will decide to which queue this message to put into. But since you have a single queue, this setting does not have any effect. – Aleksei Matiushkin Aug 13 '18 at 11:41
  • It should be added that message priority is a planned feature, but as you mentioned, the implementation is a bit jumbly. – theMayer Aug 14 '18 at 03:59
  • @theMayer do you have any reference to prove “message priority is a planned feature”? Because the implementation is not “a bit jumbly” it’s plain impossible without nasty hacks all ruining the idea of fast and reliable message processing. – Aleksei Matiushkin Aug 14 '18 at 04:14
  • I guess I was wrong. It's actually been implemented - see [this page](https://www.rabbitmq.com/priority.html). – theMayer Aug 14 '18 at 13:36
  • @theMayer this is the same link as the first one in my answer. It’s a specific queue that prioritizes messages while they stay in the queue unconsumed, providing _no guarantee at all_ that any prioritization happens. That said, it works under very special circumstances and honestly, I do not see how it could be used save for hacking backpressure by blindly acking messages having higher priority and processing only those having less priority when all the high-prioritized are trashed out. I might be wrong, though. – Aleksei Matiushkin Aug 14 '18 at 13:51
  • Yes, but the bottom line is that this answer is incorrect due to the first sentence. RabbitMQ sorts messages in the queue. What appears to be happening in this case is that messages aren't being sorted because they get delivered first. – theMayer Aug 14 '18 at 13:55
  • @theMayer updated an answer, according to your comments, thank you. – Aleksei Matiushkin Aug 14 '18 at 14:00
2

As pointed out here, the correct queue parameter is x-max-priority. You are using x-priority, so your queue is not being declared as a priority queue.

Docs: https://www.rabbitmq.com/priority.html

Luke Bakken
  • 8,993
  • 2
  • 20
  • 33
0

Just to eliminate any doubt, it appears as though your consumer and producer are both connected and processing at the same time. When this happens, messages are routed directly to the consumer (essentially) without being queued.

So, RabbitMQ is working correctly, but your expectations need to be adjusted a bit. Message priority will only operate on messages in the queue, and even then, there are cases (e.g. parallel consumers) where priority may not be respected.

Bottom line - don't write a unit test that requires messages to be delivered in any particular order, as that is not a guarantee provided by message queuing systems.

theMayer
  • 15,456
  • 7
  • 58
  • 90
  • I ran a test bed and published messages without consuming them. When I manually checked the message in the web ui they were sorted in the correct prioritised order. As theMayor suggests perhaps your consumer is consuming the messages before the new messages get published. – kaifong Mar 25 '19 at 10:54