2

I was trying to track when a task starts and task_track_started wasn't working for me. I tried the answer from this post here but I ran into an error. I've the following code to update the state of a task once it's started. This is inside a django module which is then installed as an app in another django project where it's run.

from celery import current_app
from celery.signals import after_task_publish

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):

    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend

    backend.store_result(headers['id'], None, "SENT")

The signal gets fired and but it errors out on the last line with the following error:

Traceback (most recent call last):
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 175, in destination_for
    request = request or current_task.request
  File "/home/dd_env/lib/python3.8/site-packages/celery/local.py", line 143, in __getattr__
    return getattr(self._get_current_object(), name)
AttributeError: 'NoneType' object has no attribute 'request'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/dd_env/lib/python3.8/site-packages/celery/utils/dispatch/signal.py", line 276, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/home/config_module/spinner/signals.py", line 319, in update_sent_state
    backend.store_result(headers['id'], None, "SENT")
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 198, in store_result
    routing_key, correlation_id = self.destination_for(task_id, request)
  File "/home/dd_env/lib/python3.8/site-packages/celery/backends/rpc.py", line 177, in destination_for
    raise RuntimeError(
RuntimeError: RPC backend missing task request for '4cb351b0-3643-4f53-a238-bad84c18042d'

Inside the signal, invoking methods like backend.get_state(headers['id']) or backend.get_result(headers['id']) returns the expected output. The task is being executed successfully and the results returned but I'm unable to set it's status. backend.mark_as_started(headers['id']) also returns the same error.

Here's what my task definition looks like:

from celery import shared_task

@shared_task
def update_keywords_task(pk: int):
    <Random CRUD operations>

Here're my celery settings:

app = Celery('<app_name>', backend='rpc://', broker='pyamqp://')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
app.conf.broker_transport_options = {
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
}

Why is my task request not found?

Judy T Raj
  • 1,755
  • 3
  • 27
  • 41

1 Answers1

0

I had a similar issue...

I think it has to do with using RPC as the backend. see this question for more info:

Getting Celery task results using RPC backend