1

How can someone invoke a celery task from tornado, and get the result via a callback?

This post claims that someone must simply put a message via RabbitMQ and then the task shall be executed. This makes sense, but can someone give an example in python (even better in tornado, with a callback)? Personally, I use mongodb as my message broker, but I can switch to Redis or RabbitMQ as well..

EDIT: To clarify things, I want an example with a callback. For example, this tornado code

TestTask.delay(callback = self._on_celery_response) 
...
def _on_celery_response(self, result):
    print "hello from _on_celery_repsonse" , result

does not work. My TestTask is:

class TestTask(Task):
    name = "tornadoServer.Test"
    def run(self, callback=None,  **kwargs):
        result = {'result': "hello from celery task invoked by tornado"}
        if callback is not None:
            subtask(callback).delay(result)
        return result

and the traceback:

    File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/connection.py", line 183, in _parse_response
    callback(response)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/cursor.py", line 399, in _handle_response
    orig_callback(result['data'], error=None)
  File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/basic_auth_handlers.py", line 66, in _on_response
    celery_tasks.TestTask.delay(self._on_celery_response)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 338, in delay
    return self.apply_async(args, kwargs)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 460, in apply_async
    **options)
  File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/app/amqp.py", line 230, in delay_task
    send(body, exchange=exchange, **extract_msg_options(kwargs))
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/compat.py", line 101, in send
    return self.publish(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 124, in publish
    compression, headers)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 147, in _prepare
    body) = encode(body, serializer=serializer)
  File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/serialization.py", line 119, in encode
    payload = encoder(data)
  File "/usr/lib/python2.6/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

The Task works ok without the callback.. Any suggestions?

hymloth
  • 6,869
  • 5
  • 36
  • 47

2 Answers2

0

The callback object should be a celery Task too or your code doesn't work.

You can use signals inside the task body if your callback function don't have to be a celery task.

http://docs.python.org/library/signal.html

Mauro Rocco
  • 4,980
  • 1
  • 26
  • 40
  • I am unfamiliar with signals. Can you provide an example? How can you capture the result with a SignalHandler function? – hymloth Nov 03 '11 at 17:13
  • You can't pass argument to a an handler because this is suppose to be cross platform. The solution is to set the the result object of the celery task in a global variable and than access it from the handler. global CELERY_RESULT CELERY_RESULT = TestTask.delay() Than you set the handler for the signal and you launch a timer of five seconds or more. After that in the handler you have only to access the global var and call on it get_Result. – Mauro Rocco Nov 04 '11 at 10:37
  • Another solution is to change approach. Why you don't pass you handler function as normal argument? In this way you can override the method of celery task class after_return and here check if the task is completed and with which status, in this way you can call here you handler function. http://ask.github.com/celery/userguide/tasks.html?highlight=after_return#after_return – Mauro Rocco Nov 04 '11 at 10:49
-1

celery has callback function, you could go to http://ask.github.com/celery/userguide/tasksets.html

from celery.task import task
from celery.task.sets import subtask

@task
def add(x, y, callback=None):
    result = x + y

    if callback is not None:
        subtask(callback).delay(result)

    return result
Tim McNamara
  • 18,019
  • 4
  • 52
  • 83
Zhenyu Li
  • 655
  • 1
  • 6
  • 11