Related: How to chain a Celery task that returns a list into a group? and Chain a celery task's results into a distributed group, but they lack a MWE, to my understanding do not fully cover my needs, and are maybe outdated.
I need to dynamically pass the output of a task to a parallel group of tasks, then pass the results of this group to a final task. I would like to make this a single "meta task".
Here's my attempt so far:
# chaintasks.py
import random
from typing import List, Iterable, Callable
from celery import Celery, signature, group, chord
app = Celery(
"chaintasks",
backend="redis://localhost:6379",
broker="pyamqp://guest@localhost//",
)
app.conf.update(task_track_started=True, result_persistent=True)
@app.task
def select_items(items: List[str]):
print("In select_items, received", items)
selection = tuple(set(random.choices(items, k=random.randint(1, len(items)))))
print("In select_items, sending", selection)
return selection
@app.task
def process_item(item: str):
print("In process_item, received", item)
return item + "_processed"
@app.task
def group_items(items: List[str]):
print("In group_items, received", items)
return ":".join(items)
@app.task
def dynamic_map_group(iterable: Iterable, callback: Callable):
# Map a callback over an iterator and return as a group
# Credit to https://stackoverflow.com/a/13569873/5902284
callback = signature(callback)
return group(callback.clone((arg,)) for arg in iterable).delay()
@app.task
def dynamic_map_chord(iterable: Iterable, callback: Callable):
callback = signature(callback)
return chord(callback.clone((arg,)) for arg in iterable).delay()
Now, to try this out, let's spin up a celery worker and use docker to for rabbitmq and redis
$ docker run -p 5672:5672 rabbitmq
$ docker run -p 6379:6379 redis
$ celery -A chaintasks:app worker -E
Now, let's try to illustrate what I need:
items = ["item1", "item2", "item3", "item4"]
print(select_items.s(items).apply_async().get())
This outputs: ['item3']
, great.
meta_task1 = select_items.s(items) | dynamic_map_group.s(process_item.s())
meta_task1_result = meta_task1.apply_async().get()
print(meta_task1_result)
It gets trickier here: the output is [['b916f5d3-4367-46c5-896f-f2d2e2e59a00', None], [[['3f78d31b-e374-484e-b05b-40c7a2038081', None], None], [['bce50d49-466a-43e9-b6ad-1b78b973b50f', None], None]]]
.
I am not sure what the None
s, mean, but I guess the UUIDs represents the subtasks of my meta task. But how do I get their results from here?
I tried:
subtasks_ids = [i[0][0] for i in meta_task1_result[1]]
processed_items = [app.AsyncResult(i).get() for i in subtasks_ids]
but this just hangs. What am I doing wrong here? I'm expecting an output looking like ['processed_item1', 'processed_item3']
What about trying to chord the output of select_items
to group_items
?
meta_task2 = select_items.s(items) | dynamic_map_chord.s(group_items.s())
print(meta_task2.apply_async().get())
Ouch, this raises an AttributeError
that I don't really understand.
Traceback (most recent call last):
File "chaintasks.py", line 70, in <module>
print(meta_task2.apply_async().get())
File "/site-packages/celery/result.py", line 223, in get
return self.backend.wait_for_pending(
File "/site-packages/celery/backends/asynchronous.py", line 201, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/site-packages/celery/result.py", line 335, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/site-packages/celery/result.py", line 328, in throw
self.on_ready.throw(*args, **kwargs)
File "/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/site-packages/vine/utils.py", line 30, in reraise
raise value
AttributeError: 'NoneType' object has no attribute 'clone'
Process finished with exit code 1
And finally, what I really am trying to do is something like:
ultimate_meta_task = (
select_items.s(items)
| dynamic_map_group.s(process_item.s())
| dynamic_map_chord.s(group_items.s())
)
print(ultimate_meta_task.apply_async().get())
but this leads to:
Traceback (most recent call last):
File "chaintasks.py", line 77, in <module>
print(ultimate_meta_task.apply_async().get())
File "/site-packages/celery/result.py", line 223, in get
return self.backend.wait_for_pending(
File "/site-packages/celery/backends/asynchronous.py", line 199, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/site-packages/celery/backends/asynchronous.py", line 265, in _wait_for_pending
for _ in self.drain_events_until(
File "/site-packages/celery/backends/asynchronous.py", line 58, in drain_events_until
on_interval()
File "/site-packages/vine/promises.py", line 160, in __call__
return self.throw()
File "/site-packages/vine/promises.py", line 157, in __call__
retval = fun(*final_args, **final_kwargs)
File "/site-packages/celery/result.py", line 236, in _maybe_reraise_parent_error
node.maybe_throw()
File "/site-packages/celery/result.py", line 335, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/site-packages/celery/result.py", line 328, in throw
self.on_ready.throw(*args, **kwargs)
File "/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/site-packages/vine/utils.py", line 30, in reraise
raise value
kombu.exceptions.EncodeError: TypeError('Object of type GroupResult is not JSON serializable')
Process finished with exit code 1
What I try to achieve here, is to get an output looking like 'processed_item1:processed_item3'
. Is this even doable using celery? Any help here is appreciated.