1

Related: How to chain a Celery task that returns a list into a group? and Chain a celery task's results into a distributed group, but they lack a MWE, to my understanding do not fully cover my needs, and are maybe outdated.

I need to dynamically pass the output of a task to a parallel group of tasks, then pass the results of this group to a final task. I would like to make this a single "meta task".

Here's my attempt so far:

# chaintasks.py
import random
from typing import List, Iterable, Callable

from celery import Celery, signature, group, chord

app = Celery(
    "chaintasks",
    backend="redis://localhost:6379",
    broker="pyamqp://guest@localhost//",
)
app.conf.update(task_track_started=True, result_persistent=True)


@app.task
def select_items(items: List[str]):
    print("In select_items, received", items)
    selection = tuple(set(random.choices(items, k=random.randint(1, len(items)))))
    print("In select_items, sending", selection)
    return selection


@app.task
def process_item(item: str):
    print("In process_item, received", item)
    return item + "_processed"


@app.task
def group_items(items: List[str]):
    print("In group_items, received", items)
    return ":".join(items)


@app.task
def dynamic_map_group(iterable: Iterable, callback: Callable):
    # Map a callback over an iterator and return as a group
    # Credit to https://stackoverflow.com/a/13569873/5902284
    callback = signature(callback)
    return group(callback.clone((arg,)) for arg in iterable).delay()


@app.task
def dynamic_map_chord(iterable: Iterable, callback: Callable):
    callback = signature(callback)
    return chord(callback.clone((arg,)) for arg in iterable).delay()

Now, to try this out, let's spin up a celery worker and use docker to for rabbitmq and redis

$ docker run -p 5672:5672 rabbitmq
$ docker run -p 6379:6379 redis
$ celery -A chaintasks:app worker -E

Now, let's try to illustrate what I need:

items = ["item1", "item2", "item3", "item4"]
print(select_items.s(items).apply_async().get())

This outputs: ['item3'], great.

meta_task1 = select_items.s(items) | dynamic_map_group.s(process_item.s())
meta_task1_result = meta_task1.apply_async().get()
print(meta_task1_result)

It gets trickier here: the output is [['b916f5d3-4367-46c5-896f-f2d2e2e59a00', None], [[['3f78d31b-e374-484e-b05b-40c7a2038081', None], None], [['bce50d49-466a-43e9-b6ad-1b78b973b50f', None], None]]].

I am not sure what the Nones, mean, but I guess the UUIDs represents the subtasks of my meta task. But how do I get their results from here?

I tried:

subtasks_ids = [i[0][0] for i in meta_task1_result[1]]
processed_items = [app.AsyncResult(i).get() for i in subtasks_ids]

but this just hangs. What am I doing wrong here? I'm expecting an output looking like ['processed_item1', 'processed_item3']

What about trying to chord the output of select_items to group_items?

meta_task2 = select_items.s(items) | dynamic_map_chord.s(group_items.s())
print(meta_task2.apply_async().get())

Ouch, this raises an AttributeError that I don't really understand.

Traceback (most recent call last):
  File "chaintasks.py", line 70, in <module>
    print(meta_task2.apply_async().get())
  File "/site-packages/celery/result.py", line 223, in get
    return self.backend.wait_for_pending(
  File "/site-packages/celery/backends/asynchronous.py", line 201, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/site-packages/celery/result.py", line 335, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/site-packages/celery/result.py", line 328, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/site-packages/vine/utils.py", line 30, in reraise
    raise value
AttributeError: 'NoneType' object has no attribute 'clone'

Process finished with exit code 1

And finally, what I really am trying to do is something like:

ultimate_meta_task = (
    select_items.s(items)
    | dynamic_map_group.s(process_item.s())
    | dynamic_map_chord.s(group_items.s())
)
print(ultimate_meta_task.apply_async().get())

but this leads to:

Traceback (most recent call last):
  File "chaintasks.py", line 77, in <module>
    print(ultimate_meta_task.apply_async().get())
  File "/site-packages/celery/result.py", line 223, in get
    return self.backend.wait_for_pending(
  File "/site-packages/celery/backends/asynchronous.py", line 199, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/site-packages/celery/backends/asynchronous.py", line 265, in _wait_for_pending
    for _ in self.drain_events_until(
  File "/site-packages/celery/backends/asynchronous.py", line 58, in drain_events_until
    on_interval()
  File "/site-packages/vine/promises.py", line 160, in __call__
    return self.throw()
  File "/site-packages/vine/promises.py", line 157, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/site-packages/celery/result.py", line 236, in _maybe_reraise_parent_error
    node.maybe_throw()
  File "/site-packages/celery/result.py", line 335, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/site-packages/celery/result.py", line 328, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/site-packages/vine/utils.py", line 30, in reraise
    raise value
kombu.exceptions.EncodeError: TypeError('Object of type GroupResult is not JSON serializable')

Process finished with exit code 1

What I try to achieve here, is to get an output looking like 'processed_item1:processed_item3'. Is this even doable using celery? Any help here is appreciated.

nicoco
  • 1,421
  • 9
  • 30

0 Answers0