I am using celery to do a long-time task. The task will create a subprocess using subprocess.Popen
. To make the task abortable, I write the code below:
from celery.contrib import abortable
@task(bind=True, base=abortable.AbortableTask)
def my_task(self, *args):
p = subprocess.Popen([...])
while True:
try:
p.wait(1)
except subprocess.TimeoutExpired:
if self.is_aborted():
p.terminate()
return
else:
break
# Other codes...
I try it in my console and it works well. But when I decide to close the worker by pressing Ctrl+C
, the program prints out 'worker: Warm shutdown (MainProcess)'
and blocked for a long time, which is not what I expect to be. It seems that task abortion doesn't happen when a worker is about to shut down.
From the documentation I know that if I want to abort a task, I should manually instantiate a AbortableAsyncResult
using a task id and call its .abort()
method. But I can find nowhere to place this code, because it requires the ids of all running tasks, which I have no approach to access.
So, how to invoke .abort()
for all running tasks when workers are about to shut down? Or is there any alternative?
I am using celery 4.1.0 with python 3.6.2.