12

In a django app I'm running async tasks and would like to show progress, errors etc to the user. If there are errors, the user should be redirect to a page where additional input or some action is required to fix the problem. What is the best way to communicate from the celery work back to the front end?

Here's a basic structure in pseudo code:

# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

What I would ideally want:

  • compute_module.py module uses python native logger. By separation of duties I want to keep the logging as generic as possible and use the standard python/django loggers. But they don't seem to be designed to send messages to front end.
  • celery task somehow handles the logs and instead of displaying them on stdout redirects them to pusher
  • front-end js shows and handles the messages

There might be standard ways of communicating between celery worker and front end that I'm not aware off. this scenario must happen often and I am surprised it's so difficult to implement. in a way the rabbitmq message queue or aws sns should be designed for this. below are resources that I looked at but don't feel either of them work very well but maybe I am just confused.

logging: this seems to be more about logging on the server side, not sending messages to user

Celery cam seems to be about admin monitoring tasks, not sending messages to user

pusher I like but I don't want to have compute_module.py deal with it. That is For example I would prefer not to do any pusher.com integration inside compute_module.py. Guess I could pass a pusher object that has already been instantiated so the module can just push messages but again I would prefer it to be generic

citynorman
  • 4,918
  • 3
  • 38
  • 39
  • what would be a progress report location in your case? you run a task, it's done or it errors. if you ran a task decomposed as subtasks, could you you use a webworker to push each sub's final output back to the client? i am also not really *feeling* python logging as a user feedback mechanism - I suspect getting *nice* output, esp for html will be more hassle than it's worth. – JL Peyret Dec 17 '17 at 19:08

3 Answers3

2

EDIT: Moved to django-channels now, works well but more complex than solution below.

Previous:

Ok so below is pseudo code for how I've solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module. One downside is that the pusher messages are ephermeral so I'm going to have to do some extra work in LogPusher to store them in a db, something for another day...

Also in my real implementation I trigger the task via a $.post() ajax call in $(document).ready() because small tasks completed so fast the user would never see the pusher messages because the connection wasn't established (back to that historic message problem).

Another alternative route which I hadn't mentioned above is https://channels.readthedocs.io/en/latest/

[Edit] Another solutions is Server-sent events which has django implementations, havent tested it. But it looks good for uni-directional updates eg from server to client (vs websockets bidirectional). You would need a messaging system like redis pubsub to get updates to the server sse route.

Front-end updates from django server via pusher:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>
citynorman
  • 4,918
  • 3
  • 38
  • 39
1

The only way I've managed to get realtime statuses is to simply put some SQL writes/api calls into the task itself. Doing things with the return value of the task is far easier since you can just write a custom task class.

I'm not entirely sure how this works using Django but it should look something like this.

class CustomTask(celery.Task):
    def __call__(self, *args, **kwargs):
        self.start_time = time.time()

    def on_success(self, retval, task_id, args, kwargs):
        do_success_stuff()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        do_failure_stuff()

@shared_task(base=CustomTask)
def do_stuff():
    return create_widgets()

The full list can be found here: http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

lpiner
  • 467
  • 4
  • 13
  • ok, how these widgets would be shown on the UI without page refersh? – Avinash Raj Dec 18 '17 at 10:43
  • I'm sure theres an elegant solution somewhere but I write my tasks to a table and then update a status column. Since you have the task_id as soon as you kick off the job you could just do some jquery magic to get the new status. Maybe something like http://www.giantflyingsaucer.com/blog/?p=4310 – lpiner Dec 18 '17 at 10:55
0

There is a library called celery-progress that might be helpful celery-progress library

also he made a blog post about doing it manually: blog about celery progress bars

  • While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - [From Review](/review/late-answers/30553861) – Flair Dec 10 '21 at 19:16