0

I am using celery to do a long-time task. The task will create a subprocess using subprocess.Popen. To make the task abortable, I write the code below:

from celery.contrib import abortable

@task(bind=True, base=abortable.AbortableTask)
def my_task(self, *args):
    p = subprocess.Popen([...])
    while True:
        try:
            p.wait(1)
        except subprocess.TimeoutExpired:
            if self.is_aborted():
                p.terminate()
                return
        else:
            break
    # Other codes...

I try it in my console and it works well. But when I decide to close the worker by pressing Ctrl+C, the program prints out 'worker: Warm shutdown (MainProcess)' and blocked for a long time, which is not what I expect to be. It seems that task abortion doesn't happen when a worker is about to shut down.

From the documentation I know that if I want to abort a task, I should manually instantiate a AbortableAsyncResult using a task id and call its .abort() method. But I can find nowhere to place this code, because it requires the ids of all running tasks, which I have no approach to access.

So, how to invoke .abort() for all running tasks when workers are about to shut down? Or is there any alternative?

I am using celery 4.1.0 with python 3.6.2.

hsfzxjy
  • 1,242
  • 4
  • 14
  • 22

2 Answers2

1

You can use worker signals for this purpose. Just get all the running tasks and call .abort() on them.

Ishaan
  • 886
  • 6
  • 12
  • Thanks for answering. But I am confused by how to get the running tasks list. The documentation says I should use `app.control.inspect().active()` but it keeps returning `None` no matter which signal handler it is placed. – hsfzxjy Aug 12 '17 at 14:02
  • You could try https://stackoverflow.com/questions/5544629/retrieve-list-of-tasks-in-a-queue-in-celery. If that also doesn't work maybe you could use task signals to maintain a list of running tasks. – Ishaan Aug 12 '17 at 14:04
  • It doesn't work either... I've just read its source code and find that the inspect service is not available during shutting down. Do you have any advice on doing this kind of task? – hsfzxjy Aug 12 '17 at 14:15
  • The worker never terminates a task under normal (warm) shutdown. If you don't need to perform any cleanup process then Ctrl+C twice will result in a Cold shutdown and the worker will terminate all running tasks. – Ishaan Aug 12 '17 at 14:36
1

Inspired by @Ishaan 's answer, I solve it by myself, using the code as below:

def my_task(*args):
    p = None
    from celery.platforms import signals

    def int_handler(signum, frame):
        if p is not None:
            p.kill()
            p.wait()

    signals['INT'] = int_handler

    p = subprocess.Popen([...])
    p.wait()
    # Other codes...

The solution is based on the following consideration:

  • The inner scope of the task is the only place I can find to be executed per worker.
  • Inside the task I can easily access the created subprocess.
  • The SIGINT signal is not handled by celery worker, and thus it will not override celery's default behaviors.
  • One worker will run one task at a time, and thus this kind of registration is safe.
hsfzxjy
  • 1,242
  • 4
  • 14
  • 22