1

I'm using python with pika, and have the following two similar use cases:

  1. Connect to RabbitMQ server A and server B (at different IP addrs with different credentials), listen on exchange A1 on server A; when a message arrives, process it and send to an exchange on server B
  2. Open an HTTP listener and connect to RabbitMQ server B; when a specific HTTP request arrives, process it and send to an exchange on server B

Alas, in both these cases using my usual techniques, by the time I get to sending to server B the connection throws ConnectionClosed or ChannelClosed.

I assume this is the cause: while waiting on the incoming messages, the connection to server B (its "driver") is starved of CPU cycles, and it never gets a chance to service is connection socket, thus it can't respond to heartbeats from server B, thus the servers shuts down the connection.

But I can't noodle out the fix. My current work around is lame: I catch the ConnectionClosed, reopen a connection to server B, and retry sending my message.

But what is the "right" way to do this? I've considered these, but don't really feel I have all the parts to solve this:

  • Don't just sit forever in server A's basic_consume (my usual pattern), but rather, use a timeout, and when I catch the timeout somehow "service" heartbeats on server B's driver, before returning to a "consume with timeout"... but how do I do that? How do I "let service B's connection driver service its heartbeats"?
  • I know the socket library's select() call can wait for messages on several sockets and once, then service the socket who has packets waiting. So maybe this is what pika's SelectConnection is for? a) I'm not sure, this is just a hunch. b) Even if right, while I can find examples of how to create this connection, I can't find examples of how to use it to solve my multiconnection case.
  • Set up the the two server connections in different processes... and use Python interprocess queues to get the processed message from one process to the next. The concept is "two different RabbitMQ connections in two different processes should thus then be able to independently service their heartbeats". Except... I think this has a fatal flaw: the process with "server B" is, instead, going to be "stuck" waiting on the interprocess queue, and the same "starvation" is going to happen.

I've checked StackOverflow and Googled this for an hour last night: I can't for the life of me find a blog post or sample code for this.

Any input? Thanks a million!

Dan H
  • 14,044
  • 6
  • 39
  • 32
  • 1
    What a coincidence! I have the exact same issue. Looking for a clean way to resolve this as well. Will post an answer if/when I find anything. – Karl Sutt Jul 28 '22 at 12:14

1 Answers1

3

I managed to work it out, basing my solution on the documentation and an answer in the pika-python Google group.

First of all, your assumption is correct — the client process that's connected to server B, responsible for publishing, cannot reply to heartbeats if it's already blocking on something else, like waiting a message from server A or blocking on an internal communication queue.

The crux of the solution is that the publisher should run as a separate thread and use BlockingConnection.process_data_events to service heartbeats and such. It looks like that method is supposed to be called in a loop that checks if the publisher still needs to run:

def run(self):
    while self.is_running:
        # Block at most 1 second before returning and re-checking
        self.connection.process_data_events(time_limit=1)

Proof of concept

Since proving the full solution requires having two separate RabbitMQ instances running, I have put together a Git repo with an appropriate docker-compose.yml, the application code and comments to test this solution.

https://github.com/karls/rabbitmq-two-connections

Solution outline

Below is a sketch of the solution, minus imports and such. Some notable things:

  1. Publisher runs as a separate thread
  2. The only "work" that the publisher does is servicing heartbeats and such, via Connection.process_data_events
  3. The publisher registers a callback whenever the consumer wants to publish a message, using Connection.add_callback_threadsafe
  4. The consumer takes the publisher as a constructor argument so it can publish the messages it receives, but it can work via any other mechanism as long as you have a reference to an instance of Publisher
  5. The code is taken from the linked Git repo, which is why certain details are hardcoded, e.g the queue name etc. It will work with any RabbitMQ setup needed (direct-to-queue, topic exchange, fanout, etc).
class Publisher(threading.Thread):
    def __init__(
        self,
        connection_params: ConnectionParameters,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.is_running = True
        self.name = "Publisher"
        self.queue = "downstream_queue"
        self.connection = BlockingConnection(connection_params)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, auto_delete=True)
        self.channel.confirm_delivery()

    def run(self):
        while self.is_running:
            self.connection.process_data_events(time_limit=1)

    def _publish(self, message):
        logger.info("Calling '_publish'")
        self.channel.basic_publish("", self.queue, body=message.encode())

    def publish(self, message):
        logger.info("Calling 'publish'")
        self.connection.add_callback_threadsafe(lambda: self._publish(message))

    def stop(self):
        logger.info("Stopping...")
        self.is_running = False

        # Call .process_data_events one more time to block
        # and allow the while-loop in .run() to break.
        # Otherwise the connection might be closed too early.
        #
        self.connection.process_data_events(time_limit=1)

        if self.connection.is_open:
            self.connection.close()
            logger.info("Connection closed")
        logger.info("Stopped")


class Consumer:
    def __init__(
        self,
        connection_params: ConnectionParameters,
        publisher: Optional["Publisher"] = None,
    ):
        self.publisher = publisher
        self.queue = "upstream_queue"
        self.connection = BlockingConnection(connection_params)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, auto_delete=True)
        self.channel.basic_qos(prefetch_count=1)

    def start(self):
        self.channel.basic_consume(
            queue=self.queue, on_message_callback=self.on_message
        )
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            logger.info("Warm shutdown requested...")
        except Exception:
            traceback.print_exception(*sys.exc_info())
        finally:
            self.stop()

    def on_message(self, _channel: Channel, m, _properties, body):
        try:
            message = body.decode()
            logger.info(f"Got: {message!r}")
            if self.publisher:
                self.publisher.publish(message)
            else:
                logger.info(f"No publisher provided, printing message: {message!r}")
            self.channel.basic_ack(delivery_tag=m.delivery_tag)
        except Exception:
            traceback.print_exception(*sys.exc_info())
            self.channel.basic_nack(delivery_tag=m.delivery_tag, requeue=False)

    def stop(self):
        logger.info("Stopping consuming...")
        if self.connection.is_open:
            logger.info("Closing connection...")
            self.connection.close()

        if self.publisher:
            self.publisher.stop()

        logger.info("Stopped")
Karl Sutt
  • 575
  • 3
  • 11
  • 2
    Alternatively, if all you need is pumping messages from node A to node B, the best option might be to use RabbitMQ's shovel plugin: https://www.rabbitmq.com/shovel.html – Karl Sutt Aug 03 '22 at 08:43
  • My answer above has been incorporated into Pika as a documentation improvement and an example: https://github.com/pika/pika/pull/1384. – Karl Sutt Sep 12 '22 at 10:51