I am building a web-app that relies on a worker pool based on Celery. I would now like to implement a shared connection to my mongo db. It should be shared across each worker, so I tried to do it like described here.
When I look at the logs from the celery worker pool, I can see that the database connection is established (so the signal to @worker_process_init.connect
works fine) with:
import logging
from celery.signals import worker_process_init, worker_process_shutdown
from pymongo.mongo_client import MongoClient
celery_worker_db_con = None
@worker_process_init.connect
def init_worker(**kwargs):
global celery_worker_db_con
celery_worker_db_con = MongoClient(...)
logging.info(str(celery_worker_db_con))
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global celery_worker_db_con
if celery_worker_db_con:
celery_worker_db_con.close()
However, once I try to import it into another component, i.e.
from utils.workers import celery_worker_db_con
@shared_task(queue=os.environ.get('CELERY_QUEUE'))
def enter_test_item_into_db():
logging.info("retrieving connection..." + str(celery_worker_db_con))
db_con = celery_worker_db_con.get_database()
db_con.test_collection.insert_one({'hello':'world'})
the object celery_worker_db_con
is None
.
What am I missing?
Thanks for your help!