8

I have a task queue in RabbitMQ with multiple producers (12) and one consumer for heavy tasks in a webapp. When I run the consumer it starts dequeuing some of the messages before crashing with this error:

Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")

The producers code is:

message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))

connection.close()

And the only consumer's code (the one is clashing):

def callback(self, ch, method, properties, body):
    """Callback when receive a message."""
    message = json.loads(body)
    try:
        image = _get_image(message['image_url'])
    except:
        sys.stderr.write('Error getting image in note %s' % note['id'])
   # Crop image with PIL. Not so expensive
   box_path = _crop(image, message['image_name'], box)

   # API call. Long time function
   result = long_api_call(box_path)

   if result is None:
       sys.stderr.write('Error in note %s' % note['id'])
       return
   # update the db
   db.update_record(result)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()

As you can see, there are 3 expensive functions for message. One crop task, one API call and one database update. Without the API call, que consumer runs smoothly.

Thanks in advance

user3753792
  • 83
  • 1
  • 1
  • 4
  • Please provide information about your environment - what versions of software you're using, are you using Docker, are you using a load balancer, is there anything logged by RabbitMQ. `Connection reset by peer` means that something interrupted your TCP connection unexpectedly. I expect to see a similar message logged by RabbitMQ. – Luke Bakken Oct 24 '18 at 22:21
  • Hello. I have rabbitmq 3.7.0 running on a Amazon Linux EC2 Instance. No docker or load balancer. Also, this code. result = long_api_call(box_path) is behind a try catch block, so is supposed to be fault tolerant. This *long_api_call* points to an external service with a currently unstable internet conection, so is not rare than some of the callback calls just don't work. But the error shouldn't drop the consumer with this weird error. My rabbitmq log_file: – user3753792 Oct 25 '18 at 13:16
  • 2018-10-25 06:04:54.854 [info] <0.7436.0> closing AMQP connection <0.7436.0> (127.0.0.1:42882 -> 127.0.0.1:5672, vhost: '/', user: 'guest') 2018-10-25 06:05:14.740 [warning] <0.5202.0> closing AMQP connection <0.5202.0> (127.0.0.1:32816 -> 127.0.0.1:5672): missed heartbeats from client, timeout: 60s 2018-10-25 06:06:59.367 [info] <0.7460.0> accepting AMQP connection <0.7460.0> (127.0.0.1:43332 -> 127.0.0.1:5672) 2018-10-25 06:06:59.370 [info] <0.7460.0> connection <0.7460.0> (127.0.0.1:43332 -> 127.0.0.1:5672): user 'guest' authenticated and granted – user3753792 Oct 25 '18 at 13:44

2 Answers2

14

Your RabbitMQ log shows a message that I thought we might see:

missed heartbeats from client, timeout: 60s

What's happening is that your long_api_call blocks Pika's I/O loop. Pika is a very lightweight library and does not start threads in the background for you so you must code in such a way as to not block Pika's I/O loop longer than the heartbeat interval. RabbitMQ thinks your client has died or is unresponsive and forcibly closes the connection.

Please see my answer here which links to this example code showing how to properly execute a long-running task in a separate thread. You can still use no_ack=True, you will just skip the ack_message call.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

Luke Bakken
  • 8,993
  • 2
  • 20
  • 33
1

Starting with RabbitMQ 3.5.5, the broker’s default heartbeat timeout decreased from 580 seconds to 60 seconds.

See pika: Ensuring well-behaved connection with heartbeat and blocked-connection timeouts.

The simplest fix is to increase the heartbeat timeout:

rabbit_url = host + "?heartbeat=360"
conn = pika.BlockingConnection(pika.URLParameters(rabbit_url))

# or

params = pika.ConnectionParameters(host, heartbeat=360)
conn = pika.BlockingConnection(params)
vladimir
  • 13,428
  • 2
  • 44
  • 70