I have made a flask api and it is deployed on railway. There are some tasks that I have tried to made them background tasks using Celery. I am getting task backend and task id in the print but task.state is always pending and I have only few tasks in the . Only one task is created for each deployment I think.
Here is how I have tried to use celery.
"""
path: tasks.py
Celery tasks.
"""
import pandas as pd
from celery import shared_task
from celery.contrib.abortable import AbortableTask
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.adaccount import AdAccount
@shared_task(bind=True, base=AbortableTask)
def fetch_insights_task(self, account_id):
"""
Get insights from Facebook Ads API.
"""
params = {
'level': 'campaign',
'date_preset': 'yesterday',
'time_increment': 1,
'breakdowns': ['gender', 'age'],
}
.
.
.
return insights
# path: views.py
@main.route('/insights')
def get_insights():
"""
Insights page
"""
if 'account_id' not in session:
return redirect('/select_account')
task_id = session.get('insights_task_id')
task = fetch_insights_task.AsyncResult(task_id) if task_id else None
.
.
"""
Utils
"""
from celery import Celery
def make_celery(app):
"""
Make celery
"""
celery = Celery(app.import_name)
celery.conf.update(app.config["CELERY_CONFIG"])
celery.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC')
class ContextTask(celery.Task):
"""
Context task
"""
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery -A run:celery worker --loglevel=debug -P solo
I was expeting the tasks to be colmpleted. But the tasks are always in pending state.