133

How does one check whether a task is running in celery (specifically, I'm using celery-django)?

I've read the documentation, and I've googled, but I can't see a call like:

my_example_task.state() == RUNNING

My use-case is that I have an external (java) service for transcoding. When I send a document to be transcoded, I want to check if the task that runs that service is running, and if not, to (re)start it.

I'm using the current stable versions - 2.4, I believe.

Marcin
  • 48,559
  • 18
  • 128
  • 201
  • In my case , [this part](https://stackoverflow.com/questions/5544629/retrieve-list-of-tasks-in-a-queue-in-celery) helped. – Bambier Dec 16 '21 at 21:28

13 Answers13

126

Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:

x = method.delay(1,2)
print x.task_id

When asking, get a new AsyncResult using this task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
  • 4,306
  • 1
  • 22
  • 37
  • 12
    Thanks, but what if I don't have access to `x`? – Marcin Jan 27 '12 at 16:05
  • 6
    Where do you enqueue your jobs into celery? There you have to return the task_id to track the job in the future. – Gregor Jan 27 '12 at 16:08
  • Unlike @Marcin's, this answer does not uses the static method Task.AsyncResult() as the factory of the AsyncResult, which helpfully reuses the backend configuration, otherwise an error is raised when trying to get the result. – ArnauOrriols Nov 16 '15 at 01:18
  • 1
    @ArnauOrriols I don't understand, sorry. `AsyncResult(id)` is different to `task.AsyncResult(id)`? Is `x` above the task? Do you mean `res.state` would raise an error? What backend config? I call `async_result = run_instance.s(4).apply_async()` then later `res = async_result.get() except..` - am I losing config? Sorry, feeling pretty lost and wondering if this might clear up some confusions ([eg](http://stackoverflow.com/questions/35114144/how-can-you-catch-a-custom-exception-from-celery-worker-or-stop-it-being-prefix) and this `get()` call that is v. rarely disappearing on exception). Thanks – Chris Feb 14 '16 at 16:17
  • 2
    @Chris The controversy with @gregor code is in the instantiation of `async_result`. In your use case you already has the instance, you are good to go. But what happen if you only have the task id, and need to instantiate an `async_result` instance to be able to call `async_result.get()`? This is an instance of the `AsyncResult` class, but you cannot use the raw class `celery.result.AsyncResult`, you need to get the class from the function wrapped by `app.task()`. In you case you would do `async_result = run_instance.AsyncResult('task-id')` – ArnauOrriols Feb 14 '16 at 20:03
  • 1
    `but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). ` - I think this is how it was actually supposed to be used. Have a read of the code: https://github.com/celery/celery/blob/c26e30bad8e141e80f2f62900474121ac52476ac/celery/result.py#L92 – nevelis Apr 23 '18 at 17:50
  • how about periodic tasks ran by @periodic_task? – Hojat Modaresi Oct 01 '18 at 10:16
  • @Gregor, do have any idea of how to get the state when `ignore_results is set to true`. Calling `res.ready()` throws `DisabledBackend object has no attribute '_get_task_meta_for'` – unlockme Apr 18 '19 at 12:05
106

Creating an AsyncResult object from the task id is the way recommended in the FAQ to obtain the task status when the only thing you have is the task id.

However, as of Celery 3.x, there are significant caveats that could bite people if they do not pay attention to them. It really depends on the specific use-case scenario.

By default, Celery does not record a "running" state.

In order for Celery to record that a task is running, you must set task_track_started to True. Here is a simple task that tests this:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

When task_track_started is False, which is the default, the state show is PENDING even though the task has started. If you set task_track_started to True, then the state will be STARTED.

The state PENDING means "I don't know."

An AsyncResult with the state PENDING does not mean anything more than that Celery does not know the status of the task. This could be because of any number of reasons.

For one thing, AsyncResult can be constructed with invalid task ids. Such "tasks" will be deemed pending by Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, so nobody is going to feed obviously invalid ids to AsyncResult. Fair enough, but it also has for effect that AsyncResult will also consider a task that has successfully run but that Celery has forgotten as being PENDING. Again, in some use-case scenarios this can be a problem. Part of the issue hinges on how Celery is configured to keep the results of tasks, because it depends on the availability of the "tombstones" in the results backend. ("Tombstones" is the term use in the Celery documentation for the data chunks that record how the task ended.) Using AsyncResult won't work at all if task_ignore_result is True. A more vexing problem is that Celery expires the tombstones by default. The result_expires setting by default is set to 24 hours. So if you launch a task, and record the id in long-term storage, and more 24 hours later, you create an AsyncResult with it, the status will be PENDING.

All "real tasks" start in the PENDING state. So getting PENDING on a task could mean that the task was requested but never progressed further than this (for whatever reason). Or it could mean the task ran but Celery forgot its state.

Ouch! AsyncResult won't work for me. What else can I do?

I prefer to keep track of goals than keep track of the tasks themselves. I do keep some task information but it is really secondary to keeping track of the goals. The goals are stored in storage independent from Celery. When a request needs to perform a computation depends on some goal having been achieved, it checks whether the goal has already been achieved, if yes, then it uses this cached goal, otherwise it starts the task that will effect the goal, and sends to the client that made the HTTP request a response that indicates it should wait for a result.


The variable names and hyperlinks above are for Celery 4.x. In 3.x the corresponding variables and hyperlinks are: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
  • 146,715
  • 28
  • 274
  • 320
  • 1
    So if I want to check result later (maybe even within another process), I'm better off with my own implementation? Storing the result into database manually? – Franklin Yu Dec 06 '17 at 22:05
  • Yes, I'd separate keeping track of "goal" from keeping track of "tasks". I wrote "perform a computation that depends on some goal". Usually, the "goal" is also a computation. For instance if I want to show article X to a user, I must convert it from XML to HTML, but before that, I must have resolved all bibliographical references. (X is like a journal article.) I check whether the goal "article X with all bibliographical references resolved" exists and use that rather than try to check the task status of a Celery task that would have computed the goal I want. – Louis Dec 07 '17 at 15:01
  • And the information "article X with all bibliographical references resolved" is stored in a memory cache and stored in an eXist-db database. – Louis Dec 07 '17 at 15:11
75

Every Task object has a .request property, which contains it AsyncRequest object. Accordingly, the following line gives the state of a Task task:

task.AsyncResult(task.request.id).state
Marcin
  • 48,559
  • 18
  • 128
  • 201
  • 3
    Is there a way to store the percentage of progress of a task? – patrick Apr 25 '12 at 21:33
  • 6
    When I do this, I get a permanently PENDING AsyncResult, even if I wait long enough for the task to finish. Is there a way of making this see state changes? I believe my backend is configured, and I tried setting CELERY_TRACK_STARTED=True to no avail. – dstromberg Aug 09 '16 at 21:39
  • 1
    @dstromberg Unfortunately it's been 4 years since this was an issue for me, so I can't help. You almost certainly need to configure celery to track status. – Marcin Aug 10 '16 at 02:02
  • Adding further to @dstromberg's observation, just for confirmation sake, I picked up a celery task that I knew for sure had succeeded successfully and checked its `state` property, it still returned `PENDING`. This doesn't seem to be a reliable way to track state of celery tasks from terminal. Additionally, I have Celery Flower (Celery Monitoring Tool) running, for some reason it didn't show up the tasks that I was looking for in the list of tasks that it had executed. I may have to look into the Flower settings to see if there's anything that says show only upto certain hours in past. – Deep Sep 22 '20 at 08:33
18

You can also create custom states and update it's value duting task execution. This example is from docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
  • 9,895
  • 3
  • 50
  • 69
18

Old question but I recently ran into this problem.

If you're trying to get the task_id you can do it like this:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Now you know exactly what the task_id is and can now use it to get the AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
  • 201
  • 2
  • 5
  • 7
    There is absolutely no need to create your own task ID and pass it to `apply_async`. The object returned by `apply_async` is an `AsyncResult` object, which does have the id of the task that Celery generated. – Louis Jul 11 '16 at 11:04
  • 1
    Correct me if I'm wrong, but isn't it sometimes useful to generate a UUID based on some inputs, so that all calls getting the same inputs get the same UUID? IOW, maybe sometimes it's useful to specify your task_id. – dstromberg Aug 09 '16 at 21:29
  • 1
    @dstromberg The question asked by the OP is "how do I check task status" and the answer here says "If you're trying to get the task_id...". Neither checking the task status, not getting `task_id` require that you *generate* a task id yourself. In your comment, you've imagined a reason that goes *above and beyond* "how do I check task status" and "If you're trying to get the task_id...` Great if you have that need but it is not the case here. (Besides, using `uuid()` to generate a task id does absolutely *nothing* beyond what Celery does by default.) – Louis Jan 09 '17 at 12:02
  • I agree that the OP didn't specifically ask how to get predictable task IDs, but the answer to the OP's question is currently "track the task ID and do x". It seems to me that tracking the task ID is impractical in a wide variety of situations so that answer may not actually be satisfactory. This answer helps me solve my use case (if I can overcome other noted limitattions) for the same reason @dstromberg points out -- whether or not it was motivated for that reason. – claytond Aug 19 '20 at 22:06
13

Just use this API from celery FAQ

result = app.AsyncResult(task_id)

This works fine.

David Ding
  • 1,473
  • 1
  • 15
  • 13
  • Thanks, this is a lifesaver for me! Our Celery + Jobtastic tasks were working before with Celery 3.x (`result = AsyncResult(task_id)`), but not anymore with Celery 4.x (`result = DownloadFileTask.AsyncResult(task_id)`). Apparently, need to reference now the task class so that the `CELERY_RESULT_BACKEND` will be bootstrapped properly. Otherwise, the `DisabledBackend` will be used for some reason. – Ranel Padon Jan 01 '22 at 04:14
  • UPDATE: using `celery_app.set_default()` works also, and much simpler (i.e. no need to adjust the existing task calls) since it auto-binds the fully configured/bootstrapped app as default, including in the isolated `AsyncResult()` calls. – Ranel Padon Jan 02 '22 at 19:51
5

Answer of 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
0

Try:

task.AsyncResult(task.request.id).state

this will provide the Celery Task status. If Celery Task is already is under FAILURE state it will throw an Exception:

raised unexpected: KeyError('exc_type',)

gogasca
  • 9,283
  • 6
  • 80
  • 125
0

I found helpful information in the

Celery Project Workers Guide inspecting-workers

For my case, I am checking to see if Celery is running.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

You can play with inspect to get your needs.

zerocog
  • 1,703
  • 21
  • 32
0
  • First,in your celery APP:

vi my_celery_apps/app1.py

app = Celery(worker_name)
  • and next, change to the task file,import app from your celery app module.

vi tasks/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
jizhihaoSAMA
  • 12,336
  • 9
  • 27
  • 49
Saurabh I
  • 17
  • 3
  • 8
    Please don't post only code as answer, but also provide an explanation what your code does and how it solves the problem of the question. Answers with an explanation are usually more helpful and of better quality, and are more likely to attract upvotes. – Mark Rotteveel Jul 26 '20 at 09:06
-2

for simple tasks, we can use http://flower.readthedocs.io/en/latest/screenshots.html and http://policystat.github.io/jobtastic/ to do the monitoring.

and for complicated tasks, say a task which deals with a lot other modules. We recommend manually record the progress and message on the specific task unit.

taotao.li
  • 1,050
  • 2
  • 11
  • 23
-3

Apart from above Programmatic approach Using Flower Task status can be easily seen.

Real-time monitoring using Celery Events. Flower is a web based tool for monitoring and administrating Celery clusters.

  1. Task progress and history
  2. Ability to show task details (arguments, start time, runtime, and more)
  3. Graphs and statistics

Official Document: Flower - Celery monitoring tool

Installation:

$ pip install flower

Usage:

http://localhost:5555

Update: This has issue with versioning, flower (version=0.9.7) works only with celery (version=4.4.7) more over when you install flower, it uninstalls your higher version of celery into 4.4.7 and this never works for registered tasks

Community
  • 1
  • 1
Roshan Bagdiya
  • 2,048
  • 21
  • 41