1

I have created a celery chain that consists of group of tasks and normal tasks, shown below

@app.task
def task_initial(id):
   # do something
   print(id)
@app.task
def task_to_group(id):
   # do something
   # raise exception
   try:   
      print(id)
   except Exception:
      raise
@app.task
def task_final(id):
   # do something
   # update the status (end process)
   if id == 1:
      id = 2
   print(id)
@app.task
def task_error(id):
   # do something
   # handle the failure. update status in db to failed.
   if id == 1:
      id = 2
   print(id)

Created canvas as:

id = 1  # this some primary key id from db.
chunk = [
          task_to_group.si(id).on_error(task_error.s(id)) 
          for id in range(0,5)
        ]
c = chain(
      task_initial.si(id)
      group(chunk),
      task_final.si(id)
    ).on_error(task_error.s(id))
c.apply_async()

So, basically the problem I am facing is, if a task in task_to_group fails/raises exception the on_error on that task is called but in the end in the chain the next task to execute should be either task_final or on_error of chain the task_error should be called. But none of them are called. I have also verified that none of these tasks are waiting in queue to execute.

The thing I want is, if any task in group fails, the task_error of that group should be called and keep running rest of tasks in group. After group is done the next task task_final on task_error should be called.

Reema Parakh
  • 1,347
  • 3
  • 19
  • 46

1 Answers1

0

In your current implementation, the on_error callback specified for the tasks in the group is not being triggered because exceptions raised within a task group are not automatically propagated to the group itself.

Try this code-

from celery import chain, group, chord

@app.task
def task_initial(id):
   # do something
   print(id)

@app.task
def task_to_group(id):
   # do something
   # raise exception
   try:
      print(id)
   except Exception:
      raise

@app.task
def task_final(id):
   # do something
   # update the status (end process)
   if id == 1:
      id = 2
   print(id)

@app.task
def task_error(id):
   # do something
   # handle the failure. update status in db to failed.
   if id == 1:
      id = 2
   print(id)

def create_group(id):
   chunk = [
      task_to_group.si(i).on_error(task_error.s(i))
      for i in range(0, 5)
   ]
   return group(chunk)

id = 1  # some primary key id from the database

c = chain(
   task_initial.si(id),
   chord(create_group.si(id), body=task_final.s(id)).on_error(task_error.s(id))
)
c.apply_async()