0

I am building a web-app that relies on a worker pool based on Celery. I would now like to implement a shared connection to my mongo db. It should be shared across each worker, so I tried to do it like described here.

When I look at the logs from the celery worker pool, I can see that the database connection is established (so the signal to @worker_process_init.connect works fine) with:

import logging
from celery.signals import worker_process_init, worker_process_shutdown
from pymongo.mongo_client import MongoClient

celery_worker_db_con = None

@worker_process_init.connect
def init_worker(**kwargs):
    global celery_worker_db_con
    celery_worker_db_con = MongoClient(...)
    logging.info(str(celery_worker_db_con))


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global celery_worker_db_con
    if celery_worker_db_con:
        celery_worker_db_con.close()

However, once I try to import it into another component, i.e.

from utils.workers import celery_worker_db_con
@shared_task(queue=os.environ.get('CELERY_QUEUE'))
def enter_test_item_into_db():
    logging.info("retrieving connection..." + str(celery_worker_db_con))
    db_con = celery_worker_db_con.get_database()

    db_con.test_collection.insert_one({'hello':'world'})

the object celery_worker_db_con is None.

What am I missing?

Thanks for your help!

Kon
  • 1
  • 1

0 Answers0