I am using pika with a hosted CloudAMQP instance.
Messages are being published to the queue in short bursts: ~10 messages/second and then nothing for a few minutes. The consumer sometimes can take up to ~30 seconds to process a single message. My simple consumer code was:
import pika, os, time
url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
def callback(ch, method, properties, body):
print("Received " + str(body), method, properties)
# ... long task that is equivalent to:
time.sleep(30)
queue_name = 'test-queue'
channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)
channel.basic_consume(queue_name, callback, auto_ack=True)
channel.start_consuming()
connection.close()
From time to time, I would see the following behavior:
- ~20-30 messages were published to the queue in a few rapid bursts
- the consumer fetched all of the queued messages, auto-ack'ing them in one swoop i.e. they all "disappeared" from the queue
- after processing the auto-ack'ed messages, pika would throw the following Exception:
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
(full Traceback below)
I solved my problem by disabling auto_ack=True
and ack'ing messages by hand (see below).
Was there an alternative approach to fixing this? Is the EOF Exception happening because CloudAMQP/the RabbitMQ server did not get a heartbeat in time, and closed the connection? Or was it an internal timeout to pika..? Thanks!
Traceback:
Traceback (most recent call last):
File "/app/app.py", line 146, in <module>
pika_obj['pika_channel'].start_consuming()
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
self._process_data_events(time_limit=None)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
self._flush_output(common_terminator)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')