0

According to this thread, the problem is resolved but seems like it is not . Setting Time Limit on specific task with celery

My current Celery version is 3.1.18 (Cipater).

I am trying overwrite default settings of a Task. Objective is to change the softtimelimit and hard time limit of the task because the same task is being used for multiple purposes.

Passing soft_time_limit and time_limit to MyTask constructor to change default settings.

///celery/app/ task.py
class MyTask(task.Task):   
    time_limit = 100
    soft_time_limit = 110
    max_retries = 0

def __init__(self, time_limit=None, soft_time_limit=None,
             max_retries=None, *args, **kwargs):
    if time_limit:
        self.time_limit = time_limit
    if soft_time_limit:
       self.soft_time_limit = soft_time_limit
    if max_retries:
       self.max_retries = max_retries
    task.Task.__init__(self, *args, **kwargs)


t1 = MyTask(time_limit=30, soft_time_limit=20,
        max_retries=5)
or

t1 = MyTask()
t1.time_limit = 30
t1.soft_time_limit = 20

Then pass the t1.si() to task.RetryableChain(...)

job = task.RetryableChain(...)
job.delay()

When the run method is being called by worker, it still receives the old value (time_limit = 100) where as I have set time_limit = 30.

Please let me know if the issue is still exist in 3.1.18 version.

Community
  • 1
  • 1
S.Kar
  • 1
  • 1

1 Answers1

0

I had to fix celery code to make it work. This is definitely a temporary fix but it works. I am not sure when the attributes are set with new values then why those are not transferred to worker.job. I can sense that when we called task.si or s() it creates a Signature instance which does not hold these time_limit attributes, so it takes from the original values stored in class. Just a thought.

t1 = MyTask()
kwargs = {}
kwargs['time_limit'] = 30
kwargs['soft_time_limit'] = 40

t.s(kwargs)

---->>> /celery/worker/job.py

def execute_using_pool(self, pool, **kwargs):
    """Used by the worker to send this task to the pool.

    :param pool: A :class:`celery.concurrency.base.TaskPool` instance.

    :raises celery.exceptions.TaskRevokedError: if the task was revoked
        and ignored.

    """
    uuid = self.id
    task = self.task
    if self.revoked():
        raise TaskRevokedError(uuid)

    hostname = self.hostname
    kwargs = self.kwargs
    if task.accept_magic_kwargs:
        kwargs = self.extend_with_default_kwargs()
    request = self.request_dict
    request.update({'hostname': hostname, 'is_eager': False,
                    'delivery_info': self.delivery_info,
                    'group': self.request_dict.get('taskset')})
    timeout, soft_timeout = request.get('timelimit', (None, None))
    # timeout = timeout or task.time_limit
    # soft_timeout = soft_timeout or task.soft_time_limit
    **# SKAR  request.get(‘time limit’) always returns the original value stored in Task.
    timeout = kwargs.get('time_limit', task.time_limit)
    soft_timeout = kwargs.get('soft_time_limit', task.soft_time_limit)**
    result = pool.apply_async(
        trace_task_ret,
        args=(self.name, uuid, self.args, kwargs, request),
        accept_callback=self.on_accepted,
S.Kar
  • 1
  • 1