0

But when I just left my computer open and run celery that runs a task every 50 seconds overnight and I saw some skips like for 1 hour. It's actually executing nice except for the unexpected skips. Why is this happening? How to resolve this?

Here's an example of the skipped log in my worker -l info

2016-11-03 10:13:36,264: INFO/MainProcess] Task core.tasks.sample[8efcedc5-1e08-41c4-80b9-1f82a9ddbaad] succeeded in 1.062010367s: None
[2016-11-03 11:14:19,751: INFO/MainProcess] Received task: core.tasks.sample[ca9d6ef4-2cdc-4546-a9fb-c413541a80ee]

Here's an example of the skipped log in my beat -l info

[2016-11-03 10:13:35,199: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)
[2016-11-03 11:14:19,748: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)

Here is my task code:

# 50 seconds
@periodic_task(run_every=timedelta(**settings.XXX_XML_PERIODIC_TASK))
def sample():
    global GLOBAL_CURRENT_DATE
    if cache.get('XXX_xml_today_saved_data') is None:
        cache.set('XXX_xml_today_saved_data', [])
    saved_data = cache.get('XXX_xml_today_saved_data')
    ftp = FTP('xxxxx')
    ftp.login(user='xxxxx', passwd='xxxxx')
    ftp.cwd('XXX')
    date_dir = GLOBAL_CURRENT_DATE.replace("-", "")
    try:
        ftp.cwd(date_dir)
    except:
        ftp.cwd(str(int(date_dir) - 1))
    _str = StringIO()
    files = ftp.nlst()
    if (GLOBAL_CURRENT_DATE != datetime.now().strftime("%Y-%m-%d") and
            files == saved_data):
        GLOBAL_CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
        cache.delete('XXX_xml_today_saved_data')
        return
    print files
    print "-----"
    print saved_data
    unsaved = list(set(files) - set(saved_data))
    print "-----"
    print unsaved
    if unsaved:
        file = min(unsaved)
        # modified_time = ftp.sendcmd('MDTM '+ file)
        print file
        ftp.retrbinary('RETR ' + file, _str.write)
        xml = '<root>'
        xml += _str.getvalue()
        xml += '</root>'
        if cache.get('XXX_provider_id') is None:
            cache.set('XXX_provider_id', Provider.objects.get(code="XXX").id)
        _id = cache.get('XXX_provider_id')
        _dict = xmltodict.parse(xml, process_namespaces=True,
                                dict_constructor=dict, attr_prefix="")
        row = _dict['root']['row']
        if type(_dict['root']['row']) == dict:
            _dict['root']['row'] = []
            _dict['root']['row'].append(row)
            row = _dict['root']['row']
        for x in row:
            if cache.get('XXX_data_type_' + x['dataType']) is None:
                obj, created = DataType.objects.get_or_create(code=x['dataType'])
                obj, created = ProviderDataType.objects.get_or_create(provider_id=_id, data_type=obj)
                if created:
                    cache.set('XXX_data_type_' + x['dataType'], obj.id)
            _id = cache.get('XXX_data_type_' + x['dataType'])
            obj, created = Transaction.objects.get_or_create(data=x, file_name=file,
                                       provider_data_type_id=_id)
            if created:
                if x['dataType'] == "BR":
                    print "Transact"
                    br_transfer(**x)
            else:
                print "Not transacting"

        saved_data.append(file)
        cache.set('XXX_xml_today_saved_data', saved_data)
    ftp.close()

Here is my CELERY CONFIGS in settings.py:

BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'
XXX_XML_PERIODIC_TASK = {'seconds': 50}

CACHES = {
    'default': {
        'BACKEND': 'redis_cache.RedisCache',
        'LOCATION': 'localhost:6379',
        'TIMEOUT': None,
    },
}

Any explanations or suggestions?

I am using python 2.7.10 and django 1.10

Dean Christian Armada
  • 6,724
  • 9
  • 67
  • 116

2 Answers2

1

Celery workers pop tasks from a queue when they're ready but if the task has a countdown it will pop other tasks in the mean time and wait for the time to expire by doing other things. It doesn't guarantee that tasks will be run at that time, just at least at that time or later.

theWanderer4865
  • 861
  • 13
  • 20
  • So what can be my solution? Is there a way to always follow the period task time that is set? – Dean Christian Armada Nov 04 '16 at 01:08
  • use cron for better guarantees – theWanderer4865 Nov 04 '16 at 01:10
  • Sorry, I'm not really familiar with these things? Can you suggest or make a code for alternate with mine? – Dean Christian Armada Nov 04 '16 at 01:11
  • cron is a program that does exactly what beat does but a little better - super stable, been around forever, not hard to use. I'd go that direction or you'd have to set up a dead letter exchange which still wouldn't get you that consistency. – theWanderer4865 Nov 04 '16 at 01:17
  • I have no idea - I haven't used it before but I have used cron in these situations plenty of times and never had a problem - it comes installed on most linux distros and there's a task scheduler on windows that works fine as well. – theWanderer4865 Nov 06 '16 at 16:37
1

There could be a couple of problems. The most likely is that your worker is busy when your task is triggered. You can prevent this by having more workers. The docs explain a --concurrency option for a single worker, as well as an option for running multiple worker processes.

You could also run different workers attached to different projects so that certain tasks are assigned to certain projects. i.e., dedicated queues for certain tasks: Starting worker with dynamic routing_key?

What I've also seen is that a worker can prefetch tasks and hold onto them -- but if the task it is currently running runs past the countdown, your task may be delayed.

You'll want to read up on CELERYD_PREFETCH_MULTIPLIER:

Community
  • 1
  • 1
rrauenza
  • 6,285
  • 4
  • 32
  • 57