0

I've been trying to connect to my app's database from celery but having issues. The documents are not returning anything from mongodb.

tasks/celery_app.py

celery_app = Celery(
    'tasks',
    broker=os.environ.get("CLOUDAMQP_URL", "amqp://guest:guest@localhost:5672//"),
    backend=REDIS_URL,
    include=["tasks"]
)

routes/api/test.py

@router.post("/task", status_code = 200)
async def post_task(
    # payload: dict = Body(...),
): 
    # values = jsonable_encoder(payload)
    task = test_task.delay()
    task_id = task.id
    return {"task_id": str(task_id), "status": "Processing"}

@router.get( "/task/{task_id}", status_code=200 )
async def get_task(task_id):
    task = AsyncResult(id=task_id, app=celery_app)
    if not task.ready():
        return {"task_id": str(task_id), "status": "Processing"}
    
    result = task.result # returns []
    return {"task_id": task_id, "status": "Success", "result": result}

tasks/test_workers.py

# Im not exactly sure if this is the right way:
# reading from https://stackoverflow.com/a/26853948/3232194
db_conn: MongoClient= None

@worker_process_init.connect
def init_worker(**kwargs):
    print('Initializing database connection for worker.')
    global db_conn
    db_conn = MongoClient(MONGODB_CLOUD)
    print (MONGODB_CLOUD)

@celery_app.task( ignore_result=False, track_started=True)
def test_task():
    user = db_conn.db['user']
    usr = user.collection.find({})
    return list(usr) # returns []

as sanity check, I checked with my normal api route to get users, the user collection is not empty. I've also checked for MONGODB_CLOUD route connection. The connection does include sharding - from MongoClientdocument_class=dict, tz_aware=False, connect=True, retrywrites=True, authsource='admin', replicaset='xxxxx-shard-0', tls=True, I don't know if that matters?

hammies
  • 1,344
  • 2
  • 22
  • 46

0 Answers0