52

Is it possible to find out whether a task with a certain task id exists? When I try to get the status, I will always get pending.

>>> AsyncResult('...').status
'PENDING'

I want to know whether a given task id is a real celery task id and not a random string. I want different results depending on whether there is a valid task for a certain id.

There may have been a valid task in the past with the same id but the results may have been deleted from the backend.

dominik
  • 5,745
  • 6
  • 34
  • 45

9 Answers9

39

Celery does not write a state when the task is sent, this is partly an optimization (see the documentation).

If you really need it, it's simple to add:

from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
 
    backend.store_result(headers['id'], None, "SENT")

Then you can test for the PENDING state to detect that a task has not (seemingly) been sent:

>>> result.state != "PENDING"
Alex Waygood
  • 6,304
  • 3
  • 24
  • 46
asksol
  • 19,129
  • 5
  • 61
  • 68
  • 3
    It's worth mentioning that purging the queue does not remove the task meta (at least when using Redis as a backend). Therefore this method cannot be used reliably to determine if the task still exists. – SleepyCal Jan 14 '14 at 16:15
  • Do I just have to add this code snippet to my existing tasks? I have them in "tasks.py" module. Further, the "result.state" will only work with "AsyncResult('...').status" not when I reference the state of the sent task or? @sleepycal: Would you then recommend to use RabbitMQ rather then Redis? – Max Jun 09 '18 at 10:27
  • 1
    Currently, the above solution works with a bug. To fix it change **body** to **headers**. This may be of some help http://docs.celeryproject.org/en/latest/internals/protocol.html#message-protocol-task-v2 – vladkha Oct 21 '18 at 16:57
  • Two notes: 1) if the task is not picked up by a worker, `update_sent_state` will block the task. You can try it locally by not starting any celery worker. 2) you can put this snippet directly where you configure you celery app, it will effect all tasks. – nbeuchat Mar 15 '19 at 19:03
  • @nbeuchat what do you mean by "block the task"? If I start no celery worker, I exit update_sent_state normally. Then when I start the worker, the task is done normally. I don't see what's different from when not using this snippet. – Jérôme Jul 18 '19 at 09:55
  • @asksol What are the shortcomings of this? The answer says not doing it is an optim, but it looks like the link meant to explain this has changed (it was latest a while ago...) because I don't think the content is directly related. – Jérôme Jul 18 '19 at 10:03
  • 3
    There can be a race condition when also using [`task_track_started`](http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-track-started). From my tests it looks like if the task queue is empty, the task it started (and its status set to STARTED) right away before this callback is executed, at least before `store_result` actually set the status to `'SENT'`. Then the status is set to `"SENT"` and the `"STARTED"` information is lost. – Jérôme Jul 18 '19 at 13:15
  • 1
    @Jérôme , you're a lifesaver, we copy-pasted this and didn't understand why some of our tasks stay stuck on 'SENT' forever. In practice they were getting updates to 'SUCCESS' and then to 'SENT'. – Nitzan Apr 14 '20 at 16:32
11

AsyncResult.state returns PENDING in case of unknown task ids.

PENDING

Task is waiting for execution or unknown. Any task id that is not known is implied to be in the pending state.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

You can provide custom task ids if you need to distinguish unknown ids from existing ones:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 
mher
  • 10,508
  • 2
  • 35
  • 27
  • 3
    The problem is that I only have an id. Every id was once a valid id but some are not any more because the results were deleted from the backend. So I'll always have an id that starts with `celery-task-id-` but a task could still be invalid. – dominik Apr 09 '12 at 20:57
  • 2
    In that case you should track id history externally. celery backends don't guarantee to keep all results forever. For example amqp backend can be queried only once. – mher Apr 10 '12 at 12:42
  • 1
    @0x00mh: the problem is that having a task id, how can I tell if task is really PENDING or has been deleted from the backend (perhaps because I set celery to forget about it after some time)? – Paulo Scardine Aug 26 '13 at 06:52
3

Right now I'm using following scheme:

  1. Get task id.
  2. Set to memcache key like 'task_%s' % task.id message 'Started'.
  3. Pass task id to client.
  4. Now from client I can monitor task status(set from task messages to memcache).
  5. From task on ready - set to memcache key message 'Ready'.
  6. From client on task ready - start special task that will delete key from memcache and do necessary cleaning actions.
Nikolay Fominyh
  • 8,946
  • 8
  • 66
  • 102
0

You need to call .get() on the AsyncTask object you create to actually fetch the result from the backend.

See the Celery FAQ.


To further clarify on my answer.

Any string is technically a valid ID, there is no way to validate the task ID. The only way to find out if a task exists is to ask the backend if it knows about it and to do that you must use .get().

This introduces the problem that .get() blocks when the backend doesn't have any information about the task ID you supplied, this is by design to allow you to start a task and then wait for its completion.

In the case of the original question I'm going to assume that the OP wants to get the state of a previously completed task. To do that you can pass a very small timeout and catch timeout errors:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

It should go without saying that this only work if your backend is storing results, if it's not there's no way to know if a task ID is valid or not because nothing is keeping a record of them.


Even more clarification.

What you want to do cannot be accomplished using the AMQP backend because it does not store results, it forwards them.

My suggestion would be to switch to a database backend so that the results are in a database that you can query outside of the existing celery modules. If no tasks exist in the result database you can assume the ID is invalid.

Evan Borgstrom
  • 617
  • 1
  • 7
  • 10
  • 2
    `.get()` will block until the system receives result. In case of not-existent ID this will just lock the application. You can pass a `timeout` argument but you are still unable to determine if the task-id is wrong – Igor Apr 06 '12 at 13:06
  • Right, you need to pass a timeout value and catch the timeout error. That's the only way to determine if a task id is "valid" according to your backend. Any id is technically "valid" but only ID's your backend knows about will actually return any data. – Evan Borgstrom Apr 08 '12 at 00:41
  • 2
    My tasks normally last about 30 seconds. So that's no option, right? – dominik Apr 09 '12 at 20:55
  • 1
    You want to get info about the task before it has finished, but from another process than the one that created the task. Basically so you can check if something is running? Is that correct? – Evan Borgstrom Apr 10 '12 at 19:57
  • This is a useful answer given its clarification that ``.get()`` will sometimes never return, without a ``timeout`` parameter. The other answers about storing task state outside Celery are more correct since brokers don't store data forever. However, switching to a database as broker is not such a good idea (such backends were intended for testing only and don't support some Celery features). – RichVel Oct 03 '15 at 10:20
  • The term 'backend' in much of this answer is better replaced by 'broker' or 'message broker' - confusingly, Celery uses the term [broker](http://celery.readthedocs.org/en/latest/getting-started/brokers/) to mean the message transport (e.g. RabbitMQ), which is separate from the [task results backend](http://celery.readthedocs.org/en/latest/configuration.html#task-result-backend-settings) used to store results of tasks. Databases are fine as task results backends, but a bad choice as the broker (experimental in Celery, and missing some features). – RichVel Oct 04 '15 at 16:45
  • Using .get() is clearly not the answer: it will block (or timeout) if the task does exist but is taking a long time to run. What is asked (and what I'm looking for as well), is a way to know whether the task is 'somewhere in the pipe', and not lost. Because we are currently facing this issue of tasks being lost by Celery but it still returns 'PENDING' (after 2 months). This is very annoying. – Arnaud P Jan 15 '18 at 11:17
0

So I have this idea:

import project.celery_tasks as tasks

def task_exist(task_id):
  found = False
  # tasks is my imported task module from celery
  # it is located under /project/project, where the settings.py file is located
  i = tasks.app.control.inspect()
  s = i.scheduled()
  for e in s:
    if task_id in s[e]:
      found = True
      break
  a = i.active()
  if not found:
    for e in a:
      if task_id in a[e]:
        found = True
        break
  r = i.reserved()
  if not found:
    for e in r:
      if task_id in r[e]:
        found = True
        break
  # if checking the status returns pending, yet we found it in any queues... it means it exists...
  # if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
  return found

According to https://docs.celeryproject.org/en/stable/userguide/monitoring.html the different types of queue inspections are: active, scheduled, reserved, revoked, registered, stats, query_task,

so pick and choose as you please.

And there might be a better way to go about checking the queues for their tasks, but this should work for me, for now.

Dharman
  • 30,962
  • 25
  • 85
  • 135
Shmack
  • 1,933
  • 2
  • 18
  • 23
0

maybe use redis direct is a good solution.

pool = redis.ConnectionPool(host=config.REDIS_HOST,
                            port=config.REDIS_PORT,
                            db=config.REDIS_DB,
                            password=config.REDIS_PASSWORD)
redis_client = Redis(connection_pool=pool)

def check_task_exist(id):
    for one in redis_client.lrange('celery', 0, -1):
        task_info = json.loads(one.decode())
        if task_info['headers']['id'] == id:
            return True
    return False
Zheng Xiaodong
  • 143
  • 3
  • 9
0

I found a way to check and it's working for me:

def check_task_exists(task_id):
inspector = app.control.inspect()
active_tasks = inspector.active()

# Check active tasks
if active_tasks:
    for worker, tasks in active_tasks.items():
        for task in tasks:
            if task['id'] == task_id:
                return True

# Check scheduled tasks
scheduled_tasks = inspector.scheduled()

if scheduled_tasks:
    for worker, tasks in scheduled_tasks.items():
        if task_id in tasks:
            return True

# Check reserved tasks
reserved_tasks = inspector.reserved()

if reserved_tasks:
    for worker, tasks in reserved_tasks.items():
        if task_id in tasks:
            return True

# Task not found
return False
Elyasomer
  • 103
  • 1
  • 6
-4

Please correct me if i'm wrong.

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'
pravin
  • 501
  • 2
  • 7
  • 1
    What are `built_in_status_check` and `registry_exists`? How would you implement this? – dominik Apr 09 '12 at 19:00
  • Well, I came to know that there are 6 Task States (PENDING, STARTED, SUCCESS, FAILURE, RETRY and REVOKED). So, i thought we could have a code to check whether the task is in 'PENDING' or not. And if it is in 'PENDING' state than we could check that particular task with registry entries for existence. – pravin Apr 10 '12 at 05:38
  • 1
    No, I know that the state is pending but I don't know the reason why it is pending. I am looking for a smart `registry_exists`. – dominik Apr 10 '12 at 15:34
-4

Try

AsyncResult('blubb').state

that may work.

It should return something different.

Har
  • 4,864
  • 2
  • 19
  • 22
  • 1
    I want to get different results depending on whether the task id is or has been a real task id. The problem is that I'll always get PENDING even if I use a fake id like blubb. – dominik Mar 22 '12 at 14:25
  • `.status` is a deprecated alias of attribute `state` – Igor Apr 06 '12 at 13:09