I have a celery task that's scheduled as a cronjob. But if failed I'm running a backup script which runs the tasks from command options as subprocess. This requires subprocess to
1) Activate python virtual environment in right directory
2) Use django manage.py to acess python shell
3) import and run the task
I have created the code as follows
process = subprocess.Popen(["source", "/application/bin/activate", "&&", "python3","/application/src/manage.py", "shell"],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
output1, error1 = process.communicate(b"from app.tasks import prediction")
output2, error2 = process.communicate(b"prediction.run_task()")
This causes django import not found error which caused by not activating python venv
Each of these commands work fine running in terminal
Is there any better way to do this or how can I resolve this?