22

I am using Kombu in Python to consume a durable RabbitMQ queue.

There is only one consumer consuming the queue in Windows. This consumer produces the below error:

Traceback (most recent call last):
  File ".\consumer_windows.py", line 66, in <module>
    message.ack()
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\kombu\message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "C:\Users\Administrator\Anaconda2\lib\site-packages\amqp\transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "C:\Users\Administrator\Anaconda2\lib\socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10054] An existing connection was forcibly closed by the remote host

There are at most 500 messages in the queue at any one time. Each message is small in size however it is a task and takes up to 10 minutes to complete (although it usually takes less then 5 mins per message).

I have tried restarting the consumer, RabbitMQ server and deleting the queue however the error still persists.

I've seen this question however the answer is from 2010 and my rabbitmq.log has different entries:

=ERROR REPORT==== 24-Apr-2016::08:26:20 ===
closing AMQP connection <0.6716.384> (192.168.X.X:59602 -> 192.168.Y.X:5672):
{writer,send_failed,{error,timeout}}

There were no recent events in the rabbitmq-sasl.log.

Why is this error happening and how can I prevent it from occurring?

Greg
  • 8,175
  • 16
  • 72
  • 125

3 Answers3

1

I'm still looking for an answer. In the meantime I restart the connection to my rabbit server:

while True:
    try:
​
        connection = pika.BlockingConnection(params)
        channel = connection.channel() # start a channel
        channel.queue_declare(queue=amqp_q, durable=True) # Declare a queue
        ...
​
    except pika.exceptions.ConnectionClosed:
        print('connection closed... and restarted')
hestellezg
  • 3,309
  • 3
  • 33
  • 37
0

I had the same issue with MySQL server which was hosted... I came to understand that it happened if we open the connection for a long time or unmodified for a long time.. If your program opens the DB or anything until the whole program runs make it in a such a way that it opens the DB writes everything and closes and repeats

I don't know what exactly rabbitmq is but I think the error you wrote as title may be for this reason

0

I had the same error (using pure PIKA library) and trying to connect to a Rabbitmq broker through Amazon MQ.

The problem resolved when setting up correctly the ssl configuration.

Please check full blog post here: https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-rabbitmq-pika.html

Core snippets that I used:

Define Pika Client:

import ssl
import pika

class BasicPikaClient:

    def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region):

        # SSL Context for TLS configuration of Amazon MQ for RabbitMQ
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA')

        url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671"
        parameters = pika.URLParameters(url)
        parameters.ssl_options = pika.SSLOptions(context=ssl_context)

        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

Producer:

from basicClient import BasicPikaClient

class BasicMessageSender(BasicPikaClient):

    def declare_queue(self, queue_name, durable):
        print(f"Trying to declare queue({queue_name})...")
        self.channel.queue_declare(queue=queue_name, durable=durable)

    def send_message(self, exchange, routing_key, body):
        channel = self.connection.channel()
        channel.basic_publish(exchange=exchange,
                              routing_key=routing_key,
                              body=body)
        print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}")

    def close(self):
        self.channel.close()
        self.connection.close()

Calling Producer:

# Initialize Basic Message Sender which creates a connection
# and channel for sending messages.
basic_message_sender = BasicMessageSender(
    credentials["broker_id"],
    credentials["username"],
    credentials['password'],
    credentials['region']
)

# Declare a queue
basic_message_sender.declare_queue("q_name", durable=True)

# Send a message to the queue.
basic_message_sender.send_message(exchange="", routing_key="q_name", body=b'Hello World 2!')

# Close connections.
basic_message_sender.close()

Define Consumer:

class BasicMessageReceiver(BasicPikaClient):

    def get_message(self, queue):
        method_frame, header_frame, body = self.channel.basic_get(queue)
        if method_frame:
            print(method_frame, header_frame, body)
            self.channel.basic_ack(method_frame.delivery_tag)
            return method_frame, header_frame, body
        else:
            print('No message returned')

    def close(self):
        self.channel.close()
        self.connection.close()

Calling Consumer:

# Create Basic Message Receiver which creates a connection
# and channel for consuming messages.
basic_message_receiver = BasicMessageReceiver(
    credentials["broker_id"],
    credentials["username"],
    credentials['password'],
    credentials['region']
)

# Consume the message that was sent.
basic_message_receiver.get_message("q_name")

# Close connections.
basic_message_receiver.close()

I hope the above helps. Thanks

cndv
  • 507
  • 2
  • 13
  • 26