17

Using Django 1.8, I'd like to trigger a delayed celery function after a form is saved in views

def new_topic(request, forum_id):
    form = TopicForm()
    uid = request.user.id
    if request.method == 'POST':
        tform = TopicForm(request.POST)
        if tform.is_valid():
            topic = tform.save(commit=False)
            topic.title = clean_title(tform.cleaned_data['title'])
            topic.description = clean_desc(tform.cleaned_data['description'])
            topic.save()
            notify_new_topic.delay( uid, topic) #<--problem here
            #rest of the views

But I get

EncodeError at /add/topic/
<Topic: Topic object> is not JSON serializable

I don't get any error if I remove delay from the celery task.

The task is:

@shared_task
def notify_new_topic(flwd_id, topic):
    title = topic.title
    link = topic.slug

    flwd= cached_user(flwd_id) #User.objects.get(id = flwd_id)
    print 'flwd is', flwd.username
    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
    flwrs = User.objects.filter(id__in= flwr_ids).values('id', 'username','email') 

    for f in flwrs:
        print 'flwr username:',  f['username']
        if notify_flwdp_applies(int(f['id'])):
            print 'notify flwdp applies'
            make_alerts_new_topic(flwd_id, f['id'], topic)
            print 'back from make_alerts_new_topic'

I'm wondering how can I debug/fix this?

Babr
  • 1,971
  • 14
  • 33
  • 47

3 Answers3

30

Task's argument should be serializable (i.e. string, int, etc.). To fix error you can pass topic_id as argument and fetch topic object inside task method:

notify_new_topic.delay( uid, topic.id)

@shared_task
def notify_new_topic(flwd_id, topic_id):
    topic = Topic.objects.get(pk=topic_id)
    title = topic.title
    link = topic.slug

    flwd= cached_user(flwd_id) #User.objects.get(id = flwd_id)
    print 'flwd is', flwd.username
    flwr_ids = FollowUser.objects.filter(followed=flwd).values('follower_id')
    flwrs = User.objects.filter(id__in= flwr_ids).values('id', 'username','email') 

    for f in flwrs:
        print 'flwr username:',  f['username']
        if notify_flwdp_applies(int(f['id'])):
            print 'notify flwdp applies'
            make_alerts_new_topic(flwd_id, f['id'], topic)
            print 'back from make_alerts_new_topic'
neverwalkaloner
  • 46,181
  • 7
  • 92
  • 100
  • Thanks for the tip. This removes the error, but now notification is not generated. Could it be due to the fact that I call a function `make_alerts_new_topic` inside the delayed function? – Babr Jun 21 '18 at 14:49
  • 2
    @Babr it's hard to say without access to your code. But I suppose it's more likely the error in your program logic. For example check value of `flwrs` variable. If it's empty notification never sent. Same for `if notify_flwdp_applies(int(f['id']))` condition. Check if it's true or not. And at least make sure that `make_alerts_new_topic` works as expected outside of delayed function. – neverwalkaloner Jun 21 '18 at 15:00
  • Well the problem is that the print statements inside delayed function are not printed out in the terminal. So how can I check them? The functions work just fine without `delay` though. – Babr Jun 21 '18 at 15:04
  • 2
    @Babr you can enable logging. See details here https://stackoverflow.com/questions/13366312/django-celery-logging-best-practice – neverwalkaloner Jun 21 '18 at 15:06
5

Since, the solution is already provided, I will try to explain why we can not pass non-serializable objects to celery tasks.

Why do we need to pass serializable objects to celery tasks?

With celery, we use a message broker (like Redis or RabbitMQ). Suppose we use Redis. When a celery task is called, the parameters are passed to Redis so that the broker can read them. And for this to happen, the datatype of those parameters ought to be supported by Redis.

Workaround

Suppose you want to pass a python dictionary as a parameter to a celery task, add these values to the celery configuration:

task_serializer = "json"  
result_serializer = "json"
accept_content = ["json"]

or you might want to do

celery.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"]
)

For other cases, replace json in above with pickle, xml, etc.

Typical text-based serialization formats are csv, json, xml, yaml, toml, etc. Binary-based formats are protobuf and avro. Python also has several packages like pickle, numpy and pandas that supports serializing custom objects into byte format. You can also make your custom serializer.

What do these configurations do?

  1. instruct celery to serialize the python objects first and then pass them to the message broker.
  2. deserialize the objects from the message broker and then provide them to the celery worker.

References

Dharman
  • 30,962
  • 25
  • 85
  • 135
Deepam Gupta
  • 2,374
  • 1
  • 30
  • 33
1

Change to picke enoding

app.conf.event_serializer = 'pickle' # this event_serializer is optional. 
app.conf.task_serializer = 'pickle'
app.conf.result_serializer = 'pickle'
app.conf.accept_content = ['application/json', 'application/x-python-serialize']
Sarath Ak
  • 7,903
  • 2
  • 47
  • 48