It is an antipattern to share a channel between multiple threads and it's quite unlikely you will manage to share it between processes.
The rule of thumb is 1 connection
per process and 1 channel
per thread.
You can read more in regard of this matter at the following links:
- 13 common RabbitMQ mistakes
- RabbitMQ best practices
- This SO thread gives an in depth analysis in regards of RabbitMQ and concurrent consumption
If you want to pair message consumption together with multiprocessing, the usual pattern is to let the main process receive the messages, deliver their payload to a pool of worker processes and acknowledge them once they are done.
Simple example using pika.BlockingChannel
and concurrent.futures.ProcessPoolExecutor
:
def ack_message(channel, delivery_tag, _future):
"""Called once the message has been processed.
Acknowledge the message to RabbitMQ.
"""
channel.basic_ack(delivery_tag=delivery_tag)
for message in channel.consume(queue='example'):
method, properties, body = message
future = pool.submit(process_message, body)
# use partial to pass channel and ack_tag to callback function
ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
future.add_done_callback(ack_message_callback)
The above loop will endlessly consume messages from the example
queue and submit them to the pool of processes. You can control how many messages to process concurrently via RabbitMQ consumer prefetch parameter. Check pika.basic_qos
to see how to do it in Python.