I want to send a chain task at startup o worker like in this https://stackoverflow.com/a/14589445/3922534 question, but task run out of order.
Logs from worker
[2022-07-12 20:51:47,369: INFO/MainProcess] Task task.add_newspapers[5de1f446-65af-472a-a4b6-d9752142b588] received
[2022-07-12 20:51:47,372: WARNING/MainProcess] Now Runing Newspaper Function
[2022-07-12 20:51:47,408: INFO/MainProcess] Task task.check_tasks_are_created[33d6b9d1-660b-4a80-a726-6f167e246480] received
[2022-07-12 20:51:47,412: WARNING/MainProcess] Now Runing Podcast Function
[2022-07-12 20:51:47,427: INFO/MainProcess] Task task.add_newspapers[5de1f446-65af-472a-a4b6-d9752142b588] succeeded in 0.0470000000204891s: 'Now Runing Podcast Function'
[2022-07-12 20:51:47,432: INFO/MainProcess] Task task.add_yt_channels[26179491-2632-46bd-95c1-9e9dbb9e8130] received
[2022-07-12 20:51:47,433: WARNING/MainProcess] None
[2022-07-12 20:51:47,457: INFO/MainProcess] Task task.check_tasks_are_created[33d6b9d1-660b-4a80-a726-6f167e246480] succeeded in 0.0470000000204891s: None
[2022-07-12 20:51:47,463: INFO/MainProcess] Task task.add_podcasts[ad94a119-c6b2-475a-807b-b1a73bef589e] received
[2022-07-12 20:51:47,468: WARNING/MainProcess] Now Runing Check Tasks are Created Function
[2022-07-12 20:51:47,501: INFO/MainProcess] Task task.add_yt_channels[26179491-2632-46bd-95c1-9e9dbb9e8130] succeeded in 0.06299999984912574s: 'Now Runing Check Tasks are Created Function'
[2022-07-12 20:51:47,504: INFO/MainProcess] Task task.add_podcasts[ad94a119-c6b2-475a-807b-b1a73bef589e] succeeded in 0.030999999959021807s: 'Now Runing Yotube Channels Function'
Code How i send the task:
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection() as conn:
#sender.app.send_task(name='task.print_word', args=["I Send Task On Startup"],connection=conn,)
#ch = [add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()]
ch = [
signature("task.add_podcasts"),
signature("task.add_yt_channels"),
signature("task.check_tasks_are_created"),
]
sender.app.send_task(name='task.add_newspapers',chain=ch,connection=conn,)
Then I try it to run chain task like normally run apply_async(), but it runs at every worker. I want to run just once at one worker
@worker_ready.connect
def at_start(sender, **k):
chain(add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()).apply_async()
Then I try it to recognize the worker then apply .apply_async(), but it does not catch the if statment.
Documentation https://docs.celeryq.dev/en/latest/userguide/signals.html#celeryd-init
celery -A celery_app.celery worker --loglevel=INFO -P gevent --concurrency=40 -n celeryworker1
@worker_ready.connect
def at_start(sender, **k):
print("This is host name ", sender.hostname)
if sender == "celery@celeryworker1":
with sender.app.connection() as conn:
chain(add_newspapers.s(),add_podcasts.s(),add_yt_channels.s(),check_tasks_are_created.s()).apply_async()
Am I doing something wrong or is it just a bug?