4

I have Celery 3.1.18 running with Django 1.6.11 and RabbitMQ 3.5.4, and trying to test my async task in a failure state (CELERY_ALWAYS_EAGER=True). However, I cannot get the proper "result" in the error callback. The example in the Celery docs shows:

@app.task(bind=True)
def error_handler(self, uuid):
    result = self.app.AsyncResult(uuid)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, result.result, result.traceback))

When I do this, my result is still "PENDING", result.result = '', and result.traceback=''. But the actual result returned by my .apply_async call has the right "FAILURE" state and traceback.

My code (basically a Django Rest Framework RESTful endpoint that parses a .tar.gz file, and then sends a notification back to the user, when the file is done parsing):

views.py:

from producer_main.celery import app as celery_app

@celery_app.task()
def _upload_error_simple(uuid):
    print uuid
    result = celery_app.AsyncResult(uuid)
    print result.backend
    print result.state
    print result.result
    print result.traceback
    msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
                                                           result.result,
                                                           result.traceback)


class UploadNewFile(APIView):
    def post(self, request, repository_id, format=None):
        try:    
            uploaded_file = self.data['files'][self.data['files'].keys()[0]]
            self.path = default_storage.save('{0}/{1}'.format(settings.MEDIA_ROOT,
                                                              uploaded_file.name),
                                             uploaded_file)
            print type(import_file)
            self.async_result = import_file.apply_async((self.path,  request.user),
                                                        link_error=_upload_error_simple.s())


            print 'results from self.async_result:'
            print self.async_result.id
            print self.async_result.backend
            print self.async_result.state
            print self.async_result.result
            print self.async_result.traceback
            return Response()
        except (PermissionDenied, InvalidArgument, NotFound, KeyError) as ex:
            gutils.handle_exceptions(ex)

tasks.py:

from producer_main.celery import app
from utilities.general import upload_class


@app.task
def import_file(path, user):
    """Asynchronously import a course."""
    upload_class(path, user)

celery.py:

"""
As described in
http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
from __future__ import absolute_import

import os
import logging

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'producer_main.settings')

from django.conf import settings

log = logging.getLogger(__name__)

app = Celery('producer')  # pylint: disable=invalid-name

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # pragma: no cover

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

My backend is configured as such:

CELERY_ALWAYS_EAGER = True
CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_RESULT_PERSISTENT = True
CELERY_IGNORE_RESULT = False

When I run my unittest for the link_error state, I get:

Creating test database for alias 'default'...
<class 'celery.local.PromiseProxy'>
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
<celery.backends.redis.RedisBackend object at 0x10aa2e110>
PENDING
None
None
results from self.async_result:
130ccf13-c2a0-4bde-8d49-e17eeb1b0115
None
FAILURE
Non .zip / .tar.gz file passed in.
Traceback (most recent call last):

So the task results are not available in my _upload_error_simple() method, but they are available from the self.async_result returned variable...

user
  • 4,651
  • 5
  • 32
  • 60
  • the updated version of the `_upload_error()` function has the wrong signature - according to the [docs](http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) it should be `def _upload_error(uuid)` – scytale Aug 14 '15 at 09:16
  • unless you're passing a partially evaluated signature of `_upload_error()` as the `link_error` paramater - if so please update your question to make everything more clear – scytale Aug 14 '15 at 09:18
  • please don't just keep dumping code onto the end - edit the question to make it coherent – scytale Aug 14 '15 at 15:40
  • and for `import_file` you use `@app.task` but for `_upload_error_simple` you use `@celery_app.task` – scytale Aug 14 '15 at 15:41
  • They are in different files, where the app is imported as different names...okay, I will clean up the question then. – user Aug 14 '15 at 15:43
  • hm... that may be confusing things - try moving your tasks into one file and having only 1 celery app object – scytale Aug 14 '15 at 15:54

3 Answers3

5

I could not get the link and link_error callbacks to work, so I finally had to use the on_failure and on_success task methods described in the docs and this SO question. My tasks.py then looks like:

class ErrorHandlingTask(Task):
    abstract = True

    def on_failure(self, exc, task_id, targs, tkwargs, einfo):
        msg = 'Import of {0} raised exception: {1!r}'.format(targs[0].split('/')[-1],
                                                             str(exc))

    def on_success(self, retval, task_id, targs, tkwargs):
        msg = "Upload successful. You may now view your course."    

@app.task(base=ErrorHandlingTask)
def import_file(path, user):
    """Asynchronously import a course."""
    upload_class(path, user)
Community
  • 1
  • 1
user
  • 4,651
  • 5
  • 32
  • 60
0

You appear to have _upload_error() as a bound method of your class - this is probably not what you want. try making it a stand-along task:

@celery_app.task(bind=True)
def _upload_error(self, uuid):
    result = celery_app.AsyncResult(uuid)
    msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
                                                       result.result,
                                                       result.traceback)

class Whatever(object):
    ....
    self.async_result = import_file.apply_async((self.path, request.user),
                                                link=self._upload_success.s(
                                                    "Upload finished."),
                                                link_error=_upload_error.s())

in fact there's no need for the self paramater since it's not used so you could just do this:

@celery_app.task()
def _upload_error(uuid):
    result = celery_app.AsyncResult(uuid)
    msg = 'Task {0} raised exception: {1!r}\n{2!r}'.format(uuid,
                                                       result.result,
                                                       result.traceback)

note the absence of bind=True and self

scytale
  • 12,346
  • 3
  • 32
  • 46
  • That did not work, it still shows `result.state = 'PENDING'` and `result.result = None` in _upload_error... – user Aug 13 '15 at 18:22
  • print out the uuid, see if it is valid. is your results backend configured correctly? – scytale Aug 14 '15 at 09:17
  • It looks like a valid uuid, and I assume the backend is configured correctly because I can get the results from my self.async_result...? First time using Celery and RabbitMQ, will include more info in an update. – user Aug 14 '15 at 14:42
  • what are you using for the results backend? – scytale Aug 14 '15 at 15:28
  • For results backend, I've tried RPC, AMPQ, and now trying Redis...when I print result.backend I get something like or depending on which one I have configured. I guess a follow-on is how do I check those are configured properly... – user Aug 14 '15 at 15:29
  • in this case you can't use rabbitmq as a results backend - see [here](http://docs.celeryproject.org/en/stable/userguide/tasks.html#rabbitmq-result-backend). Also check the uuid received by `_upload_error` against the uuid of `self.async_result` to make sure they are the same. Finally, are you sure `import_file` is an Task and not a canvas object? – scytale Aug 14 '15 at 15:32
  • They are the same ID (see updated question above). I'm also pretty sure `import_file` is a task... – user Aug 14 '15 at 15:36
  • pretty sure? if it's a canvas object then `AsyncResult(uuid)` won't work. – scytale Aug 14 '15 at 15:38
0

Be careful with UUID instance!

If you will try to get status of a task with id not string type but UUID type, you will only get PENDING status.

from uuid import UUID
from celery.result import AsyncResult

task_id = UUID('d4337c01-4402-48e9-9e9c-6e9919d5e282')

print(AsyncResult(task_id).state)
# PENDING

print(AsyncResult(str(task_id)).state)
# SUCCESS
rappongy
  • 957
  • 8
  • 10