14

I want to create the following flow using celery configuration\api:

  • Send TaskA(argB) Only if celery queue has no TaskA(argB) already pending

Is it possible? how?

arikg
  • 402
  • 2
  • 4
  • 17

5 Answers5

8

You can make your job aware of other tasks by some sort of memoization. If you use a cache control key (redis, memcached, /tmp, whatever is handy), you can make execution depend on that key. I'm using redis as an example.

from redis import Redis

@app.task
def run_only_one_instance(params):
    try:
        sentinel =  Redis().incr("run_only_one_instance_sentinel")
        if sentinel == 1:
            #I am the legitimate running task
            perform_task()
        else:
            #Do you want to do something else on task duplicate?
            pass
        Redis().decr("run_only_one_instance_sentinel")
    except Exception as e:
        Redis().decr("run_only_one_instance_sentinel")
        # potentially log error with Sentry?
        # decrement the counter to insure tasks can run
        # or: raise e
  • This will prevent task from running if the same task is *still running*. If same task present 3 times in the queue it will be executed three times as long as they are not being executed simultaneously – Ramast Jul 15 '15 at 11:00
  • 1
    Doesn't change the fact that my suggestion is not in line with the question (at least anymore). I think the question has gone through changes since I answered. The accepted answer is heavy runtimewise, but will do what is asked. – Árni St. Steinunnarson Jul 15 '15 at 11:07
  • 1
    I think Ramast might be right, but the main idea is there. It'd be better to reset counter to 0 in the if branch after `perform_task` and remove all `decr` calls. This way all duplicate calls won't run again until the reset call. Currently it could happen that `task1` calls incr and runs task, `task2` calls incr but doesn't run since `sentinel !=1` ; `task1` calls decr. So basically any tasks can run now since there are no more duplicate tasks running `perform_task`, but that's not going to happen when a new `task3` comes now, because `task2` hasn't yet called decr and counter is 1 now, not 0. – Andrei-Niculae Petre Oct 11 '16 at 13:10
1

I cannot think of a way but to

  1. Retrieve all executing and scheduled tasks via celery inspect

  2. Iterate through them to see if your task is there.

check this SO question to see how the first point is done.

good luck

Community
  • 1
  • 1
srj
  • 9,591
  • 2
  • 23
  • 27
1

I don't know it's gonna help you more than the other answers, but there goes my approach, following the same idea given by srj. I needed a way to block my server to launch tasks with same id to queue. So I made a general function to help me.

def is_task_active_or_registered(app, task_id):

    i = app.control.inspect()

    active_dict = i.active()
    scheduled_dict = i.scheduled()
    keys_set = set(active_dict.keys() + scheduled_dict.keys())
    tasks_ids_set = set()

    for _dict in [active_dict, scheduled_dict]:
        for k in keys_set:
            for task in _dict[k]:
                tasks_ids_set.add(task['id'])

    if task_id in tasks_ids_set:
        return True
    else:
        return False

So, I use it like this:

In the context where my celery-app object is available, I define:

def check_task_can_not_run(task_id):
    return is_task_active_or_registered(app=celery, task_id=task_id)

And so, from my client request, I call this check_task_can_not_run(...) and block task from being launched in case of True.

1

My answer is on assuming

  1. Same job is running multiple times
  2. Reason may be due to unable to ack the message
  3. Consumers somehow get disconnected and unacked messages become ready

Solution

  1. First pass bind=True to the task so that we can get task uuid as self.request.id
from redis import Redis

@app.task(bind=True)
def your_task(self, *args, **kwargs):
  task_id = self.request.id
  # check if already executed 
  redis_conn = Redis.from_url(redis_url, charset="utf-8", decode_responses=True)
  if redis_conn.get(task_id):
    logger.info(f"{task_id} is a duplicate job")
    raise Exception('Duplicate job') # Handle this accordingly

  # expire in 1 day
  ex_1_day = 24*60*60
  redis_conn.set(task_id, 1, ex=ex_1_day)
  # From here run your job now
  # ...

Thus this way, it will ensure that your job never runs parallely neither run sequencially duplicately

kedar nath
  • 43
  • 3
0

I was facing similar problem. The Beat was making duplicates in my queue. I wanted to use expires but this feature isn't working properly https://github.com/celery/celery/issues/4300.

So here is scheduler which checks if task has been already enqueued (based on task name).

# -*- coding: UTF-8 -*-
from __future__ import unicode_literals

import json
from heapq import heappop, heappush

from celery.beat import event_t
from celery.schedules import schedstate
from django_celery_beat.schedulers import DatabaseScheduler
from typing import List, Optional
from typing import TYPE_CHECKING

from your_project import celery_app

if TYPE_CHECKING:
    from celery.beat import ScheduleEntry


def is_task_in_queue(task, queue_name=None):
    # type: (str, Optional[str]) -> bool
    queues = [queue_name] if queue_name else celery_app.amqp.queues.keys()

    for queue in queues:
        if task in get_celery_queue_tasks(queue):
            return True
    return False


def get_celery_queue_tasks(queue_name):
    # type: (str) -> List[str]
    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        task = j['headers']['task']
        if task not in decoded_tasks:
            decoded_tasks.append(task)

    return decoded_tasks


class SmartScheduler(DatabaseScheduler):
    """
    Smart means that prevents duplicating of tasks in queues.
    """
    def is_due(self, entry):
        # type: (ScheduleEntry) -> schedstate
        is_due, next_time_to_run = entry.is_due()

        if (
            not is_due or  # duplicate wouldn't be created
            not is_task_in_queue(entry.task)  # not in queue so let it run
        ):
            return schedstate(is_due, next_time_to_run)

        # Task should be run (is_due) and it is present in queue (is_task_in_queue)

        H = self._heap
        if not H:
            return schedstate(False, self.max_interval)

        event = H[0]
        verify = heappop(H)
        if verify is event:
            next_entry = self.reserve(entry)
            heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry))
        else:
            heappush(H, verify)
            next_time_to_run = min(verify[0], next_time_to_run)

        return schedstate(False, min(next_time_to_run, self.max_interval))
Petr Přikryl
  • 1,641
  • 4
  • 22
  • 34