17

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

From researching I understand that either a heartbeat or an increased connection timeout can be used to solve this. Both these solutions raise errors when attempting them. In reading answers to similar posts I've also learned that many changes have been implemented to RabbitMQ since the answers were posted (e.g. the default heartbeat timeout has changed to 60 from 580 prior to RabbitMQ 3.5.5).

When specifying a heartbeat and blocked connection timeout:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

The following error is displayed:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

When specifying heartbeat_interval=1000 in the connection parameters a similar error is shown: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

And similarly for socket_timeout = 1000 the following error is displayed: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

I am running RabbitMQ 3.6.1, pika 0.10.0 and python 2.7 on Ubuntu 14.04.

  1. Why are the above approaches producing errors?
  2. Can a heartbeat approach be used where there is a long running continuous task? For example can heartbeats be used when performing large database joins which take 30+ mins? I am in favour of the heartbeat approach as many times it is difficult to judge how long a task such as database join will take.

I've read through answers to similar questions

Update: running code from the pika documentation produces the same error.

Community
  • 1
  • 1
Greg
  • 8,175
  • 16
  • 72
  • 125
  • 1
    Is any kind of load balance sitting in front of the rabbit mq server ? What your environment looks like may be relevant to answering this question. – michael.schuett Mar 21 '16 at 06:51
  • The producer and consumer machines are all on the same private network. – Greg Mar 21 '16 at 08:40
  • 1
    The problem is that you need to process data while waiting, even if you are not consuming messages; connection.process_data_events(). Otherwise pika wont respond to heartbeats. – eandersson Apr 01 '16 at 12:06

2 Answers2

15

I've run into the same problem with my systems, that you are seeing, with dropped connection during very long tasks.

It's possible the heartbeat might help keep your connection alive, if your network setup is such that idle TCP/IP connections are forcefully dropped. If that's not the case, though, changing the heartbeat won't help.

Changing the connection timeout won't help at all. This setting is only used when initially creating the connection.

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

there are two reasons for this, both of which you have run into already:

  1. Connections drop randomly, even under the best of circumstances
  2. Re-starting a process because of a re-queued message can cause problems

Having deployed RabbitMQ code with tasks that range from less than a second, out to several hours in time, I found that acknowledging the message immediately and updating the system with status messages works best for very long tasks, like this.

You will need to have a system of record (probably with a database) that keeps track of the status of a given job.

When the consumer picks up a message and starts the process, it should acknowledge the message right away and send a "started" status message to the system of record.

As the process completes, send another message to say it's done.

This won't solve the dropped connection problem, but nothing will 100% solve that anyways. Instead, it will prevent the message re-queueing problem from happening when a connection is dropped.

This solution does introduce another problem, though: when the long running process crashes, how do you resume the work?

The basic answer is to use the system of record (your database) status for the job to tell you that you need to pick up that work again. When the app starts, check the database to see if there is work that is unfinished. If there is, resume or restart that work in whatever manner is appropriate.

Derick Bailey
  • 72,004
  • 22
  • 206
  • 219
  • 1
    What about using another thread to send heartbeat signals asynchronously? – nn0p Jan 26 '21 at 07:10
  • If an ack is sent immediately , doesnt that mean that broker will just send another message, and eventually overload the consumer ? – Pit Digger Jul 31 '21 at 23:00
  • @nn0p I'm having a problem with that approach. My tasks are very CPU heavy and even though another thread is sending heartbeats, sometimes it can't find any opportunity to send one during CPU load, which causes connection to terminate. – uylmz Nov 17 '21 at 13:43
  • We have a long-running operation performed on the consumer which can take hours to complete. We are getting a timeout exception as we don't acknowledge the message till it is processed completely. If we ack the message, the next message will start getting processed which we don't want. Is there a way to overcome this issue? – SuriyaPrashath Apr 26 '22 at 17:06
-3

I've already see this issue. The reason is you declare to use this queue. but you didn't bind the queue in the exchange.

for example:

 @Bean(name = "test_queue")
 public Queue testQueue() {
        return queue("test_queue");
 }

@RabbitListener(queues = "test_queue_1")
public void listenCreateEvent(){
}

if you listen a queue didn't bind to the exchange. it will happen.

JetQin
  • 85
  • 3