I've been trying to connect to my app's database from celery but having issues. The documents are not returning anything from mongodb.
tasks/celery_app.py
celery_app = Celery(
'tasks',
broker=os.environ.get("CLOUDAMQP_URL", "amqp://guest:guest@localhost:5672//"),
backend=REDIS_URL,
include=["tasks"]
)
routes/api/test.py
@router.post("/task", status_code = 200)
async def post_task(
# payload: dict = Body(...),
):
# values = jsonable_encoder(payload)
task = test_task.delay()
task_id = task.id
return {"task_id": str(task_id), "status": "Processing"}
@router.get( "/task/{task_id}", status_code=200 )
async def get_task(task_id):
task = AsyncResult(id=task_id, app=celery_app)
if not task.ready():
return {"task_id": str(task_id), "status": "Processing"}
result = task.result # returns []
return {"task_id": task_id, "status": "Success", "result": result}
tasks/test_workers.py
# Im not exactly sure if this is the right way:
# reading from https://stackoverflow.com/a/26853948/3232194
db_conn: MongoClient= None
@worker_process_init.connect
def init_worker(**kwargs):
print('Initializing database connection for worker.')
global db_conn
db_conn = MongoClient(MONGODB_CLOUD)
print (MONGODB_CLOUD)
@celery_app.task( ignore_result=False, track_started=True)
def test_task():
user = db_conn.db['user']
usr = user.collection.find({})
return list(usr) # returns []
as sanity check, I checked with my normal api route to get users, the user
collection is not empty. I've also checked for MONGODB_CLOUD
route connection. The connection does include sharding - from MongoClientdocument_class=dict, tz_aware=False, connect=True, retrywrites=True, authsource='admin', replicaset='xxxxx-shard-0', tls=True
, I don't know if that matters?