0

Is there a way to access the callable Python object from a Celery task, either directly or through a lookup method (like Django's `get_model()). I've got a dot-notation reference to where the callable is declared but I need the actual object to get access to it's attributes before sending / calling the task.

I've scoured through the public and private methods but none of them seem obviously what I'm looking for:

$ dir(celery_task_instance)

> ['AsyncResult', 'MaxRetriesExceededError', 'OperationalError', 'Strategy',
'__bound__', '__call__', '__class__', '__delattr__', '__dict__', '__dir__',
'__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__',
'__hash__', '__header__', '__init__', '__le__', '__lt__', '__module__',
'__name__', '__ne__', '__new__', '__qualname__', '__reduce__',
'__reduce_ex__', '__repr__', '__self__', '__setattr__', '__sizeof__',
'__str__', '__subclasshook__', '__trace__', '__v2_compat__', '__weakref__',
'__wrapped__', '_app', '_backend', '_decorated', '_default_request',
'_exec_options', '_get_app', '_get_exec_options', '_get_request',
'abstract', 'acks_late', 'add_around', 'add_to_chord', 'add_trail',
'after_return', 'annotate', 'app', 'apply', 'apply_async', 'autoregister',
'backend', 'bind', 'chunks', 'default_retry_delay', 'delay', 'expires',
'from_config', 'ignore_result', 'map', 'max_retries', 'name', 'on_bound',
'on_failure', 'on_retry', 'on_success', 'pop_request', 'push_request',
'rate_limit', 'reject_on_worker_lost', 'replace', 'request',
'request_stack', 'resultrepr_maxsize', 'retry', 'run', 's', 'send_event',
'send_events', 'serializer', 'shadow_name', 'si', 'signature', 'signature_from_request',
'soft_time_limit', 'starmap', 'start_strategy', 'store_errors_even_if_ignored', 'subtask',
'subtask_from_request', 'throws', 'time_limit', 'track_started', 'trail', 'update_state']

Background

I'm using django-celery-beat to create scheduled tasks for Celery within a Django project.

I'd like to validate the schema of the args and kwargs fields at the time of creating the scheduled task, to anticipate and prevent future runtime issues. For example, API paramaters defined supplied via the kwargs field.

I can access the list of registered tasks in my Django forms.ModelForm.clean() method I've been able to access the registered Celery tasks. I thought that I could store the validation schema as an attribute of the the target @shared_task and then access that during task creation. The schema is defined as a marshmallow.Schema in case it's relevant but this could equally be hardcoded.

Phil Sheard
  • 2,102
  • 1
  • 17
  • 38

2 Answers2

0

I was looking in the wrong place - importlib makes this fairly trivial. I was thinking it was something that I needed to get from Celery.

import importlib
split_path = dot_notation_path.split(".")
module_name, callable_name = ".".join(split_path[:-1]), split_path[-1]
module_object = importlib.import_module(module_name)
callable_object = getattr(somemodule, callable_name)
Phil Sheard
  • 2,102
  • 1
  • 17
  • 38
0

Having

@app.task(name='some_task')
def some_function_with_task_decorator(params: dict):
    do_sth()

some_function_with_task_decorator is of type PromiseProxy. To get the callable name use some_function_with_task_decorator._get_current_object().__name__

some_function_with_task_decorator._get_current_object() gets the Task out of PromiseProxy

Since Task is a decorator use __name__ to get the decorated function. Then you could dynamically call the function as suggested in Calling a function of a module by using its name (a string)

dbow
  • 637
  • 8
  • 18