Questions tagged [celery-canvas]

19 questions
7
votes
2 answers

With Celery Canvas, what is the difference between chord() and chain(group(), task)

We're implementing a workflow with Celery. First we need to run some tasks in parallel, and when they are all finished we need to run a single task. It seems we can use chord, or group and chain: chord(tasks, task) vs group(tasks) | task What is…
RemcoGerlich
  • 30,470
  • 6
  • 61
  • 79
4
votes
0 answers

Celery Canvas: How to distribute the elements of a result list of a task to a chain and chain others afterwards

I'm currently learn celery and try to build a DAG like data processing. My Idea was to create a pipeline with celery canvas this pipeline should contains task that are done to a list of all objects or applied to one object and applied distributed. I…
Bierbarbar
  • 1,399
  • 15
  • 35
4
votes
1 answer

Celery chain performances

I wonder why celery chain is so slow comparing to an ad hoc solution. In the ad hoc solution I forward the task manually, the drawback is I cannot wait for the end of the chain. In the following code, the canvas solution takes 16 seconds and the ad…
ptitpoulpe
  • 684
  • 4
  • 17
4
votes
1 answer

Calling task chunk with keyword arguments

How do I chunk tasks with keyword arguments? For example, this task: @app.task def add(x, y, multiply=1, unit="GB"): return '%s %s' % ((x + y) * multiply, unit) I can call this task as usual with add.apply_async(args=(1, 2), kwargs={'unit':…
Oskar Persson
  • 6,605
  • 15
  • 63
  • 124
3
votes
1 answer

Cloning a celery chain

I have an interesting issue attempting to clone a celery chain for use in a group, my intended use case is something like group([chain.clone(args=args) for args in it]) however it keeps complaining about not having enough arguments. I have broken…
Bjorn Harpe
  • 374
  • 4
  • 13
2
votes
0 answers

When terminate celery chain conditionally, it not return the data

I have chain of tasks, and want to terminate conditionally, I am following steps in https://stackoverflow.com/a/21106596/243031 but after that, we are not getting the output. I have tasks as from __future__ import absolute_import,…
Nilesh
  • 20,521
  • 16
  • 92
  • 148
2
votes
0 answers

How to add chord in middle of the chain using celery canvas

I want to design celery canvas where there might be chord/group in middle of the task and then process further. is_even(num1) get_numbers -> is_even(num2) -> filter_even -> xsum is_even(num3) Directory structure is…
Nilesh
  • 20,521
  • 16
  • 92
  • 148
2
votes
3 answers

Celery canvas behavior differs between async and eager mode

There are some discrepancies on the way the Celery canvas works in async and eager mode. I've noticed that a group followed by a chain in a dynamic task that replaces itself does not send the results along to the next on the chain. Well, that seems…
gutomaia
  • 91
  • 1
  • 7
2
votes
1 answer

Setting a signature of a task as starting task of multipe chain in celery

Suppose A to G are async tasks and we want to implement a task workflow like this canvas /---> B() A() ----> C() -> D() \---> E() -> F() -> G() According to the problem, we need to use the result of A() for starting of multiple…
Vahid Kharazi
  • 5,723
  • 17
  • 60
  • 103
1
vote
1 answer

Celery chain of groups and tasks doesn't call next task if group fails

I have created a celery chain that consists of group of tasks and normal tasks, shown below @app.task def task_initial(id): # do something print(id) @app.task def task_to_group(id): # do something # raise exception try: …
Reema Parakh
  • 1,347
  • 3
  • 19
  • 46
1
vote
0 answers

Celery task workflow message exhausts rabbitmq default message size

I'm designing a celery task workflow that has task test_request_task, test_chord_callback_task, test_chain_of_chord_callback_task and test_chain_of_chords_task. I need to run task test_request_task, n(1<=n<=100) times per second for m…
1
vote
0 answers

How to send all celery chained task to other queue then celery queue?

I am trying the chaining mentioned in canvas. Below is my project structure. proj/ ├── __init__.py ├── celery.py ├── celeryconfig.py ├── module1 │   ├── __init__.py │   └── tasks.py └── module2 └── tasks.py File: celery.py """Main entry module…
Nilesh
  • 20,521
  • 16
  • 92
  • 148
1
vote
0 answers

How to handle errors in celery chords?

i am trying to do some error handling in a celery workflow containing groups and chains. The following example code describes the situation: from .tasks import task_1, task_2, task_3, task_4, task_finalize, handle_error chain_1 = (task_1 |…
Smie85
  • 21
  • 3
0
votes
0 answers

How can I get information of all tasks which are part of a Celery Canvas?

I have multiple tasks in celery. I run them using a mix and match of chains, groups and chords. However, I still don't get how can I access all of the tasks (in order of execution) that were executed in the canvas. I need to access all of the tasks,…
0
votes
0 answers

Celery chained tasks and django transactions

As many places on the internet say, you have to make sure that if you pass around primary keys from django model instances, the task you're passing them to only gets started after the transaction in the current context has completed. You can do that…
thepandaatemyface
  • 5,034
  • 6
  • 25
  • 30
1
2