3

I am using pika with a hosted CloudAMQP instance.

Messages are being published to the queue in short bursts: ~10 messages/second and then nothing for a few minutes. The consumer sometimes can take up to ~30 seconds to process a single message. My simple consumer code was:

import pika, os, time

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=True)

channel.start_consuming()
connection.close()

From time to time, I would see the following behavior:

  • ~20-30 messages were published to the queue in a few rapid bursts
  • the consumer fetched all of the queued messages, auto-ack'ing them in one swoop i.e. they all "disappeared" from the queue
  • after processing the auto-ack'ed messages, pika would throw the following Exception:
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')

(full Traceback below)

I solved my problem by disabling auto_ack=True and ack'ing messages by hand (see below).

Was there an alternative approach to fixing this? Is the EOF Exception happening because CloudAMQP/the RabbitMQ server did not get a heartbeat in time, and closed the connection? Or was it an internal timeout to pika..? Thanks!


Traceback:

Traceback (most recent call last):
 File "/app/app.py", line 146, in <module>
   pika_obj['pika_channel'].start_consuming()
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
   self._process_data_events(time_limit=None)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
   self.connection.process_data_events(time_limit=time_limit)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
   self._flush_output(common_terminator)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
   raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
Greg Sadetsky
  • 4,863
  • 1
  • 38
  • 48

2 Answers2

1

I was able to fix the code above by introducing a simple change: setting auto_ack=False and calling basic_ack by hand after processing each message:

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)
  # ack the message manually
  ch.basic_ack(delivery_tag=method.delivery_tag)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=False)

channel.start_consuming()
connection.close()
Greg Sadetsky
  • 4,863
  • 1
  • 38
  • 48
0

This question is old but one thing that stuck out to me was that you are keeping the queue held up while your consumer uses the message received from the queue. I think this is a recipe for disaster anytime. MQ is an asynchronous medium for a reason but this design kills that purpose. I have a similar setup where messages arrive in a burst every minute but in my callback function I just put them into a local queue (queue.Queue), this allows the MQ client (pika) to immediately Ack the message on the provider queue. At the local side, I have a thread continuously polling the internal queue for messages and consuming them. I still save the message to disk in case any failure occurs on the local end, but I relieve MQ of the responsibility of holding my messages for any longer than needed. Hope this helps someone else who is looking for a solution for this situation.