1

I have attempted to follow guidance given here: Handling long running tasks in pika / RabbitMQ and here: https://github.com/pika/pika/issues/753#issuecomment-318124510 on how to run long tasks in a separate thread to avoid interrupting the connection heartbeat. I'm a beginner to threading and still struggling to understand this solution.

For my final use case, I need to make function calls that are several minutes long, represented in the example code below by the long_function(). I've found that if the sleep call in long_function() exceeds the length of the heartbeat timeout, I lose connection (presumably because this function is blocking thread #2 from receiving/acknowledging the heartbeat messages from thread #1) and I get this message in the logs: ERROR: Unexpected connection close detected: StreamLostError: ("Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')",). A sleep call of the same length in the target function of thread #2 does not lead to a StreamLostError.

What's the proper solution for overcoming the StreamLostError here? Do I launch all subsequent function calls in their own threads to avoid blocking thread #2? Do I increase the heartbeat to be longer than long_function()? If this is the solution, what was the point of running my long task in a separate thread? Why not just make the heartbeat timeout in the main thread long enough to accommodate the whole message being processed? Thanks!

import functools
import logging
import pika
import threading
import time
import os
import ssl
from common_utils.rabbitmq_utils import send_message_to_queue, initialize_rabbitmq_channel
import json
import traceback


logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s',
                    level=logging.INFO,
                    datefmt='%Y-%m-%d %H:%M:%S')


def send_message_to_queue(channel, queue_name, body):
    channel.basic_publish(exchange='',
                        routing_key=queue_name,
                        body=json.dumps(body),
                        properties=pika.BasicProperties(delivery_mode=2)
                        )
    logging.info("RabbitMQ publish to queue {} confirmed".format(queue_name))


def initialize_rabbitmq_channel(timeout=5*60):
    credentials = pika.PlainCredentials(os.environ.get("RABBITMQ_USER"), os.environ.get("RABBITMQ_PASSWORD"))
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    params = pika.ConnectionParameters(port=5671, host=os.environ.get("RABBITMQ_HOST"), credentials=credentials,
                                       ssl_options=pika.SSLOptions(context), virtual_host="/", heartbeat=timeout)
    connection = pika.BlockingConnection(params)
    return connection.channel(), connection


def long_function():
    logging.info("Long function starting...")
    time.sleep(5)
    logging.info("Long function finished.")


def ack_message(channel, delivery_tag):
    """
    Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
        logging.info("Message {} acknowledged".format(delivery_tag))
    else:
        logging.error("Channel is closed and message acknowledgement will fail")
        pass


def do_work(connection, channel, delivery_tag, body):

    thread_id = threading.get_ident()
    fmt1 = 'Thread id: {} Delivery tag: {} Message body: {}'
    logging.info(fmt1.format(thread_id, delivery_tag, body))

    # Simulating work including a call to another function that exceeds heartbeat timeout
    time.sleep(5)
    long_function()

    send_message_to_queue(channel, "test_inactive", json.loads(body))

    cb = functools.partial(ack_message, channel, delivery_tag)
    connection.add_callback_threadsafe(cb)


def on_message(connection, channel, method, property, body):
    t = threading.Thread(target=do_work, args=(connection, channel, method.delivery_tag, body))
    t.start()
    t.join()


if __name__ == "__main__":
    channel, connection = initialize_rabbitmq_channel(timeout=3)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue="test_queue",
                        auto_ack=False,
                        on_message_callback=lambda channel, method, property, body: on_message(connection, channel, method, property, body)
                        )

    channel.start_consuming()
Mov
  • 51
  • 3
  • This looks like reasonable code at first glance. Please open an issue in the Pika GitHub repository. Provide a repo I can clone and run your code from. PS I maintain Pika. Thanks. – Luke Bakken Mar 10 '22 at 21:30

0 Answers0