15

How do I pull the result of a task if I do not know previously which task was performed? Here's the setup: Given the following source('tasks.py'):

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y

with RabbitMQ 3.3.2 running locally:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

with Celery 3.1.12 running locally:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x105dea3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

I can then import the method and retrieve the result with the 'task_id':

from tasks import add, mul
from celery.result import AsyncResult

result = add.delay(2,2)
task_id = result.task_id
result.get() # 4

result = AsyncResult(id=task_id)
result.get() # 4

result = add.AsyncResult(id=task_id)
result.get() # 4

# and the same for the 'mul' task. Just imagine I put it here

In the next example I split up these steps between processes. In one process I retrieve the 'task_id' like so:

from tasks import add

result = add.delay(5,5)
task_id = result.task_id

And in another process if I use the same 'task_id' (copied and pasted to another REPL, or in a different HTTP request) like so:

from celery.result import AsyncResult

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db")
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta'
result.state # AttributeError: 'str' object has no attribute 'get_task_meta'
result.status # AttributeError: 'str' object has no attribute 'get_task_meta'

And in another process if I do:

from task import add # in this instance I know that an add task was performed

result = add.AsyncResult(id="copied_task_id")
result.status # "SUCCESSFUL"
result.state # "SUCCESSFUL"
result.get() # 10

I'd like to be able to get the result without knowing before hand what task is generating the result. In my real environment I plan on returning this task_id to the client and let them query the status of their job via an HTTP request.

Marc
  • 4,820
  • 3
  • 38
  • 36

2 Answers2

26

Ok - so I've been looking for a solution for a long time, and now that I've finally formally posted this and looked over the documentation I found this gem:

class celery.result.AsyncResult(id, backend=None, task_name=None, app=None, parent=None)

Query task state.

Parameters:

id – see id.

backend – see backend.

exception TimeoutError

Error raised for timeouts.

AsyncResult.app = None

So instead of providing the backend parameter I provided the "app" argument instead like so:

from celery.result import AsyncResult
from task import app

# Assuming add.delay(10,10) was called in another process
# and that I'm using a 'task_id' I retrieved from that process

result = AsyncResult(id='copied_task_id', app=app)
result.state # 'SUCCESSFUL'
result.get() # 20

This is probably obvious to many. It wasn't to me. For now all I can say is that this solution "just works", but I'd feel more comfortable if I knew it was the sanctioned way to do it. If you know of a section in the documentation that makes this more clear please post it in the comments or as an answer and I'll select it as the answer if I can.

Marc
  • 4,820
  • 3
  • 38
  • 36
  • Exactly what I was looking for; I share your view that this is very unclear from the documentation so your post helped me a lot. – markjan Oct 20 '14 at 08:05
  • You may want to set a small timeout on this call, because some Celery 'get' calls may not return for a very long time in case of an invalid task ID, or a task that is no longer known to the broker. See http://stackoverflow.com/a/10074280/992887 – RichVel Oct 04 '15 at 16:37
  • Thank you for this fix. It made me pull my hair for 2 weeks. – Mahadeva Feb 09 '17 at 22:57
  • 1
    Many years later, the behavior seems to be that if you import a properly configured Celery instance as you do with `from task import app`, this is automatically set on the `AsyncResult`. So without explicitly setting the arguments I can retrieve `result.backend` and `result.app` and thus the state is also correct. – Midnighter Jan 21 '19 at 12:46
0

In case it helps anyone, it turns out that backend parameter doesn't expect a string, but a Backend object: How do I override the backend for celery tasks

What worked for me was:

from celery.backends.rpc import RPCBackend
from myapp.workers.main import app as worker

@worker.task(backend=RPCBackend(app=worker))
def status_check():
    return "OK"
NublicPablo
  • 959
  • 11
  • 21