I was trying to track when a task starts and task_track_started
wasn't working for me. I tried the answer from this post here but I ran into an error.
I've the following code to update the state of a task once it's started. This is inside a django module which is then installed as an app in another django project where it's run.
from celery import current_app
from celery.signals import after_task_publish
@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
backend.store_result(headers['id'], None, "SENT")
The signal gets fired and but it errors out on the last line with the following error:
Traceback (most recent call last):
File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 175, in destination_for
request = request or current_task.request
File "/home/dd_env/lib/python3.8/site-packages/celery/local.py", line 143, in __getattr__
return getattr(self._get_current_object(), name)
AttributeError: 'NoneType' object has no attribute 'request'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dd_env/lib/python3.8/site-packages/celery/utils/dispatch/signal.py", line 276, in send
response = receiver(signal=self, sender=sender, **named)
File "/home/config_module/spinner/signals.py", line 319, in update_sent_state
backend.store_result(headers['id'], None, "SENT")
File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 198, in store_result
routing_key, correlation_id = self.destination_for(task_id, request)
File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 177, in destination_for
raise RuntimeError(
RuntimeError: RPC backend missing task request for '4cb351b0-3643-4f53-a238-bad84c18042d'
Inside the signal, invoking methods like backend.get_state(headers['id'])
or backend.get_result(headers['id'])
returns the expected output. The task is being executed successfully and the results returned but I'm unable to set it's status. backend.mark_as_started(headers['id'])
also returns the same error.
Here's what my task definition looks like:
from celery import shared_task
@shared_task
def update_keywords_task(pk: int):
<Random CRUD operations>
Here're my celery settings:
app = Celery('<app_name>', backend='rpc://', broker='pyamqp://')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
app.conf.broker_transport_options = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
Why is my task request not found?