1

I want to send a chain task at startup o worker like in this https://stackoverflow.com/a/14589445/3922534 question, but task run out of order.

Logs from worker

[2022-07-12 20:51:47,369: INFO/MainProcess] Task task.add_newspapers[5de1f446-65af-472a-a4b6-d9752142b588] received
[2022-07-12 20:51:47,372: WARNING/MainProcess] Now Runing Newspaper Function
[2022-07-12 20:51:47,408: INFO/MainProcess] Task task.check_tasks_are_created[33d6b9d1-660b-4a80-a726-6f167e246480] received
[2022-07-12 20:51:47,412: WARNING/MainProcess] Now Runing Podcast Function
[2022-07-12 20:51:47,427: INFO/MainProcess] Task task.add_newspapers[5de1f446-65af-472a-a4b6-d9752142b588] succeeded in 0.0470000000204891s: 'Now Runing Podcast Function'
[2022-07-12 20:51:47,432: INFO/MainProcess] Task task.add_yt_channels[26179491-2632-46bd-95c1-9e9dbb9e8130] received
[2022-07-12 20:51:47,433: WARNING/MainProcess] None
[2022-07-12 20:51:47,457: INFO/MainProcess] Task task.check_tasks_are_created[33d6b9d1-660b-4a80-a726-6f167e246480] succeeded in 0.0470000000204891s: None
[2022-07-12 20:51:47,463: INFO/MainProcess] Task task.add_podcasts[ad94a119-c6b2-475a-807b-b1a73bef589e] received
[2022-07-12 20:51:47,468: WARNING/MainProcess] Now Runing Check Tasks are Created Function
[2022-07-12 20:51:47,501: INFO/MainProcess] Task task.add_yt_channels[26179491-2632-46bd-95c1-9e9dbb9e8130] succeeded in 0.06299999984912574s: 'Now Runing Check Tasks are Created Function'
[2022-07-12 20:51:47,504: INFO/MainProcess] Task task.add_podcasts[ad94a119-c6b2-475a-807b-b1a73bef589e] succeeded in 0.030999999959021807s: 'Now Runing Yotube Channels Function'

Code How i send the task:

@worker_ready.connect
def at_start(sender, **k):
    with sender.app.connection() as conn:
        #sender.app.send_task(name='task.print_word', args=["I Send Task On Startup"],connection=conn,)
        #ch = [add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()]
        ch = [
            signature("task.add_podcasts"),
            signature("task.add_yt_channels"),
            signature("task.check_tasks_are_created"),
        ]
        sender.app.send_task(name='task.add_newspapers',chain=ch,connection=conn,)

Then I try it to run chain task like normally run apply_async(), but it runs at every worker. I want to run just once at one worker

@worker_ready.connect
def at_start(sender, **k):
    chain(add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()).apply_async()      

Then I try it to recognize the worker then apply .apply_async(), but it does not catch the if statment.

Documentation https://docs.celeryq.dev/en/latest/userguide/signals.html#celeryd-init

celery -A celery_app.celery worker --loglevel=INFO -P gevent --concurrency=40 -n celeryworker1

@worker_ready.connect
def at_start(sender, **k):
        print("This is host name ", sender.hostname)
        if sender == "celery@celeryworker1":
            with sender.app.connection() as conn:
                chain(add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()).apply_async()

Am I doing something wrong or is it just a bug?

AcK
  • 2,063
  • 2
  • 20
  • 27
Edin.A
  • 21
  • 5
  • `chain` promise you the order: `add_podcasts` won't start before `add_newspapers` end and so on. It doesn't promise on which worker each one of them runs. Do you expect them to run on the same worker? (that's a different issue) – ItayB Jul 12 '22 at 18:21
  • yes but, when i run with with app.send_task they run out of order,even in one worker – Edin.A Jul 12 '22 at 18:42
  • one question why , at my 3d code sample, do u have any idea why it does note catch the if statment, i did same as in the docunation ? – Edin.A Jul 12 '22 at 18:45
  • because `sender` is an object and you're comparing it to `str` – ItayB Jul 12 '22 at 18:47
  • how u supose to do it then ? any idea – Edin.A Jul 12 '22 at 18:48
  • `if sender.hostname == "celeryworker1":` is enough? – ItayB Jul 12 '22 at 18:54
  • i postet the log so u can see they run out order – Edin.A Jul 12 '22 at 18:54
  • do you need the return value of each task for the one that comes after? (as an input) – ItayB Jul 12 '22 at 18:58
  • i try it wont work,i dont know why but hostanme is empty by me – Edin.A Jul 12 '22 at 19:02
  • not really i dont need return at all, but i dont know how chain works with out return value,i just need to run task one after another without return – Edin.A Jul 12 '22 at 19:03

2 Answers2

0

Since a task doesn't need the return value of the previous task you can run it as:

chain(add_newspapers.si(),add_podcasts.si(),add_yt_channels.si(),check_tasks_are_created.si()).apply_async()

(change call from s() to si()

You can read about immutability here.

ItayB
  • 10,377
  • 9
  • 50
  • 77
0

@worker_ready.connect handler will run on every worker. So, if you have 10 workers, you will send the same task 10 times, when they broadcast the "worker_ready" signal. Is this intentional?

DejanLekic
  • 18,787
  • 4
  • 46
  • 77