0

This question is similar to Retrieve list of tasks in a queue in Celery, but I'd like to obtain the actual Task objects (cf. http://docs.celeryproject.org/en/latest/reference/celery.events.state.html#celery.events.state.Task) rather than a dictionary representation.

When I do

from celery.task.control import inspect
i = inspect()

and then do i.scheduled() from the shell, I get a result like

In [75]: i.scheduled()
Out[75]: 
{'celery@Kurts-MacBook-Pro-3.local': [{'eta': '2019-08-01T01:31:37.843141+00:00',
   'priority': 6,
   'request': {'acknowledged': False,
    'args': "[126, 'Business Signup', {'actualCompanyName': 'Gilmore and Beck LLC', 'gigsEnabledRegionMapping': True, 'companyName': 'Gilmore and Beck LLC', 'companyRegionMapping': 'Lesliechester', 'companyId': 'ow9DMA8'}]",
    'delivery_info': {'exchange': '',
     'priority': 0,
     'redelivered': None,
     'routing_key': 'celery'},
    'hostname': 'celery@Kurts-MacBook-Pro-3.local',
    'id': '4ecdc400-8421-4a06-babc-98493362ec67',
    'kwargs': '{}',
    'name': 'backend.tasks.task_send_event_to_iterable',
    'time_start': None,
    'type': 'backend.tasks.task_send_event_to_iterable',
    'worker_pid': None}},
  {'eta': '2019-08-01T01:39:21.205879+00:00',
   'priority': 6,
   'request': {'acknowledged': False,
    'args': "('hyjomuz@mailinator.net',)",
    'delivery_info': {'exchange': '',
     'priority': 0,
     'redelivered': None,
     'routing_key': 'celery'},
    'hostname': 'celery@Kurts-MacBook-Pro-3.local',
    'id': '294910a3-2323-4fcf-9768-115c1a8c5e06',
    'kwargs': '{}',
    'name': 'backend.tasks.task_send_business_lead_notification',
    'time_start': None,
    'type': 'backend.tasks.task_send_business_lead_notification',
    'worker_pid': None}}]}

I would like to do a search through these tasks and conditionally revoke one. However, I would like iterate over the results and have the actual tasks handy, like in the example in How to inspect and cancel Celery tasks by task name. But if I try to do celery.events.state.State(), I get no events:

In [76]: celery.events.state.State()
Out[76]: <State: events=0 tasks=0>

Is it possible to obtain the actual Task objects for scheduled tasks? In particular, I'm interested in obtaining the args without having to parse or try an ast.literal_eval().

Kurt Peek
  • 52,165
  • 91
  • 301
  • 526

1 Answers1

1

I am not sure you can actually get the actual Task instance, but you can easily create AsyncResult by simply instantiating it with the task ID you want to inspect, and you need (naturally) to pass the Celery application object to it too.

Some pseudo-code:

from celery.result import AsyncResult
from my.project.celeryapp import myapp

task_res = AsyncResult("9ed888fe-f6b6-4443-85d3-787c5c1b26b0", app=myapp)
print(task_res.state)
DejanLekic
  • 18,787
  • 4
  • 46
  • 77