0

I want to schedule 2 task in python.

The first task periodic_exctract_urls is working fine

But the second task periodic_check_urls prints

Execution of job "periodic_check_urls (trigger: interval[0:05:00], next run at: 2022-08-12 01:34:46 MSK)" skipped: maximum number of running instances reached (1)
Execution of job "periodic_check_urls (trigger: interval[0:05:00], next run at: 2022-08-12 01:39:46 MSK)" skipped: maximum number of running instances reached (1)
Execution of job "periodic_check_urls (trigger: interval[0:05:00], next run at: 2022-08-12 01:44:46 MSK)" skipped: maximum number of running instances reached (1)

I don't want the task to be skipped. I want to replace existing task with a new task each 5 minuts. I was trying replace_existing=True but it didn't help. How can I avoid task skipping?

Did I understand corectly that two tasks (periodic_exctract_urls and periodic_check_urls) are working paralelly?

My code:

from apscheduler.schedulers.background import BackgroundScheduler
import datetime

def schedule_task():
    scheduler = BackgroundScheduler()
    first_run_time = datetime.datetime.now() + datetime.timedelta(seconds=60)

    scheduler.add_job( # works OK
        periodic_exctract_urls,
        'interval',
        minutes=15,
        next_run_time=first_run_time
    )
    scheduler.add_job(periodic_check_urls, 'interval', minutes=5, replace_existing=True)
    scheduler.start()
mascai
  • 1,373
  • 1
  • 9
  • 30
  • 2
    What version of Python are you using? And what version of APScheduler are you using? – Life is complex Aug 19 '22 at 10:49
  • I'm highly interesting in how you solved this problem with @DialFrost's answer, because I couldn't get his answer to work. So please post your working code. Posting a working solution in your question will help others that use Stack Overflow. Also post the version of Python and APSchedular in your question. Thanks in advance for doing this and helping the SO community learn. – Life is complex Aug 22 '22 at 14:28

2 Answers2

5

UPDATED ANSWER (WORKING) -- 08-18-2022

As I previously stated I could not get @DialFrost recommendations in the other answer to work using Python 3.9 and APScheduler versions 3.9.x, 3.8.x, 3.7.x, 3.6.x and 3.5.x.

I decided to look at APScheduler 4.0.01a, which has been completely redesigned. BackgroundScheduler and BlockingScheduler were both removed in this new version.

Here is working code for Scheduler:

from datetime import datetime
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.interval import IntervalTrigger


first_run_time = datetime.now()


def periodic_extract_urls():
    print(f'Extraction time: {datetime.now()}')
    print(datetime.now() - first_run_time)
    print('\n')


def periodic_check_urls():
    print(f'Checking time: {datetime.now()}')
    print(datetime.now() - first_run_time)
    print('\n')


with Scheduler() as scheduler:
    scheduler.add_schedule(func_or_task_id=periodic_extract_urls,
                           trigger=IntervalTrigger(minutes=15),
                           id="extract_urls")

    scheduler.add_schedule(func_or_task_id=periodic_check_urls,
                           trigger=IntervalTrigger(minutes=5),
                           id="check_urls"
                           )

    scheduler.wait_until_stopped()

This outputs this:

Extraction time: 2022-08-18 09:03:31.797858
0:00:00.009066


Checking time: 2022-08-18 09:03:31.797992
0:00:00.009181


Checking time: 2022-08-18 09:08:31.799883
0:05:00.011096

You can also use AsyncScheduler in APScheduler 4.0.01a.

from asyncio import run
from datetime import datetime, timedelta
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

first_run_time = datetime.now()


def periodic_extract_urls():
    print(f'Extraction time: {datetime.now()}')
    print(datetime.now() - first_run_time)
    print('\n')


def periodic_check_urls():
    print(f'Checking time: {datetime.now()}')
    print(datetime.now() - first_run_time)
    print('\n')


async def jobs():
    async with AsyncScheduler() as scheduler:
        await scheduler.add_schedule(func_or_task_id=periodic_extract_urls,
                                     trigger=IntervalTrigger(minutes=15, start_time=first_run_time),
                                     id="extract_urls"
                                     )

        await scheduler.add_schedule(func_or_task_id=periodic_check_urls,
                                     trigger=IntervalTrigger(minutes=5,
                                                             start_time=first_run_time + timedelta(seconds=300)),
                                     id="check_urls"
                                     )

        await scheduler.wait_until_stopped()


run(jobs())

Here is the output:

Extraction time: 2022-08-18 12:03:54.617456
0:00:00.015132


Checking time: 2022-08-18 12:08:54.615003
0:05:00.012665


Checking time: 2022-08-18 12:13:54.616444
0:10:00.014104

UPDATED ANSWER -- 08-17-2022

I have been trying to get misfire_grace_time to work with Python 3.9 and APScheduler versions 3.9.x, 3.8.x, 3.7.x, 3.6.x and 3.5.x.

I have also tried using coalesce and max_instances to solve the BackgroundScheduler issues. So far, nothing has worked and new errors have popped-up.

I looked at the APScheduler issues. I noted that this issue related to the BackgroundScheduler was raised in 2018 and is resolved in 4.0.1a And based on the released notes for 4.x, which is in alpha BackgroundScheduler is being replaced.

Repeating job with very short interval triggers exception on shutdown

I still recommend using BlockingScheduler either as an interval or as a cron.

Here is one of my old answers on using BlockingScheduler with cron

Here is the code that I'm currently testing with Python 3.9 and APScheduler versions 3.9.x, 3.8.x, 3.7.x, 3.6.x and 3.5.x.

from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
# 15*60
jobDefaults = {
        'coalesce': True,
        'max_instances': 100,
        'misfire_grace_time': 15*60
    }

scheduler = BackgroundScheduler(daemon=False, executors=executors, job_defaults=jobDefaults, timezone='UTC')
first_run_time = datetime.now()


def periodic_extract_urls():
    print(f'Extraction time: {datetime.now()}')
    print(datetime.now() - first_run_time)


def periodic_check_urls():
    print(f'Checking time: {datetime.now()}')
    print(datetime.now() - first_run_time)


def schedule_task():
    print(f'Start time: {first_run_time}')

    scheduler.add_job(func=periodic_extract_urls,
                      id="extract_urls",
                      trigger='interval',
                      minute=15,
                      next_run_time=first_run_time,
                      replace_existing=True,
                      )

    scheduler.add_job(func=periodic_check_urls,
                      id="check_urls",
                      trigger='interval',
                      minute=5,
                      next_run_time=datetime.now() + timedelta(seconds=300),
                      replace_existing=True,
                      )

    try:
        scheduler.start()
        print(scheduler.print_jobs())
    except (KeyboardInterrupt, SystemExit):
        print("shutdown scheduler")
        scheduler.shutdown()


schedule_task()

ORIGINAL ANSWER -- 08-16-2022

I'm still trying to get the BackgroundScheduler to work correctly. I'm using Python 3.9, which required me to modify apscheduler util.py, which has a non-compliant time zone function for Python 3.9.

I was able to get something working using BlockingScheduler:

import logging
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def periodic_extract_urls():
    print(f'Extraction time: { datetime.datetime.now()}')
    print(datetime.datetime.now() - first_run_time)

def periodic_check_urls():
    print(f'Checking time: {datetime.datetime.now()}')
    print(datetime.datetime.now() - first_run_time)

def schedule_task():
    print(f'Start time: {first_run_time}')

     scheduler.add_job(
        periodic_extract_urls, 
        'interval', 
        minutes=15, 
        next_run_time=first_run_time
    )
    
    scheduler.add_job(
        periodic_check_urls, 
        'interval', 
        minutes=5, 
        replace_existing=True,
        next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=300)
    )

    scheduler.start()

# run function
schedule_task()

This code output this:

Start time: 2022-08-16 15:19:12.727035
Extraction time: 2022-08-16 15:19:12.786244
0:00:00.059298
Checking time: 2022-08-16 15:24:12.796427
0:05:00.069420
Checking time: 2022-08-16 15:29:12.796830
0:10:00.069823
Extraction time: 2022-08-16 15:34:12.737923
0:15:00.010918
Checking time: 2022-08-16 15:34:12.790511
0:15:00.063520
Checking time: 2022-08-16 15:39:12.796448
0:20:00.069444
----------------------------------------
My system information
----------------------------------------
Platform:               macOS
Python:                 3.9.0
apscheduler:            3.9.1
----------------------------------------
Life is complex
  • 15,374
  • 5
  • 29
  • 58
2

APScheduler has a grace period which is strictly followed during which jobs are allowed to run. If the scheduler is busy and/or the load of the host is too high, APScheduler might fail to start the job in time and will be discarded or start late.

In some cases, the job will be discarded if it could not be started during the grace time.

You should try using misfire_grace_period according to SO - python apscheduler not consistent:

scheduler.add_job(periodic_check_urls, 'interval', minutes=5, replace_existing=True, misfire_grace_period=None)

If not, I think you you may want to either increase the number of threads/processes in the executor, or adjust the misfire_grace_time setting to a higher value.

According to Apscheduler docs:

Sometimes the scheduler may be unable to execute a scheduled job at the time it was scheduled to run. The most common case is when a job is scheduled in a persistent job store and the scheduler is shut down and restarted after the job was supposed to execute. When this happens, the job is considered to have "misfired". The scheduler will then check each missed execution time against the job's misfire_grace_time option (which can be set on per-job basis or globally in the scheduler) to see if the execution should still be triggered. This can lead into the job being executed several times in succession.

If this behavior is undesirable for your particular use case, it is possible to use coalescing to roll all these missed executions into one. In other words, if coalescing is enabled for the job and the scheduler sees one or more queued executions for the job, it will only trigger it once. No misfire events will be sent for the "bypassed" runs.

Additional resources:

Henry Ecker
  • 34,399
  • 18
  • 41
  • 57
DialFrost
  • 1,610
  • 1
  • 8
  • 28