4

I have a code that just enqueuing a message to the broker's queue with pika.

class Publisher:

    def __init__(self, config):
        self._params = ConnectionParameters(
            host = config.RABBITMQ_HOST,
            credentials = PlainCredentials(config.RABBITMQ_USER, config.RABBITMQ_PASSWORD))
        self._conn = None
        self._channel = None
        self.exchange_name = config.RABBITMQ_AGENT_EXCHANGE


    def connect(self):
        if not self._conn or self._conn.is_closed:
            self._conn = BlockingConnection(self._params)
            self._channel = self._conn.channel()
            self._channel.exchange_declare(exchange=self.exchange_name,  exchange_type = 'topic')

    def _publish(self, task):
        properties = BasicProperties(expiration=task.expiration_ms)
        self._channel.basic_publish(exchange= self.exchange_name,
                                    routing_key = task.routing_key,
                                    properties = properties if task.has_expiration else None,
                                    body=dumps(task, cls = TaskEncoder).encode())
        logging.debug('message sent: %s', task)


    def publish(self, msg):
        """Publish msg, reconnecting if necessary."""

        try:
            self._publish(msg)
        except ConnectionClosed:
            logging.error('reconnecting to queue')
            self.connect()
            self._publish(msg)

Pika stops enqueuing messages for long-running connection with the next messages and doesn't throw any error anymore

2021-03-14 12:25:09,981 MainThread-140100212655936 pika.heartbeat [INFO] - Connection is idle, 1 stale byte intervals
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)>
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)

Code usage

publisher = Publisher(config)
publisher.connect()
while True:
  publisher.publish(obj)
  time.sleep(1)

I have 2 questions:

How to prevent it? Does it disabling heartbeat can work in this case?

How to reproduce/simulate this behavior with a firewall? I tried to add a rule with packet drop on RMQ port but with no luck.

Pika version: 1.0.1

RMQ version: 3.8.9

Python: 3.8.6

snakecharmerb
  • 47,570
  • 11
  • 100
  • 153
Andrei Kovrov
  • 2,087
  • 1
  • 18
  • 28

1 Answers1

0

there are quite a few posts that recommend keeping the callback execution very short, so the control should go back to pika. see more on it here

if you have big things to calculate, it's better to span them to another thread/process, or accumulate them for later execution. this concept helped me with other real-time api.

what seems to happen, is that rabbit is closing the channel (you may catch that in a callback, if your hands aren't busy other place), and the heartbeat seems to me is stopped by pika.

  • links
alex
  • 651
  • 1
  • 9
  • 11
  • 2
    I think you are missing the point. He is not consuming anything here, just sending messages. That's as fast as rabbitmq and network allow. – Daniel Aug 10 '22 at 15:14