103

Before I ask, Cron Jobs and Task Scheduler will be my last options, this script will be used across Windows and Linux and I'd prefer to have a coded out method of doing this than leaving this to the end user to complete.

Is there a library for Python that I can use to schedule tasks? I will need to run a function once every hour, however, over time if I run a script once every hour and use .sleep, "once every hour" will run at a different part of the hour from the previous day due to the delay inherent to executing/running the script and/or function.

What is the best way to schedule a function to run at a specific time of day (more than once) without using a Cron Job or scheduling it with Task Scheduler?

Or if this is not possible, I would like your input as well.

AP Scheduler fit my needs exactly.

Version < 3.0

import datetime
import time
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()

def job_function():
    print("Hello World")
    print(datetime.datetime.now())
    time.sleep(20)

# Schedules job_function to be run once each minute
sched.add_cron_job(job_function,  minute='0-59')

out:

>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110

Version > 3.0

(From Animesh Pandey's answer below)

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()
Community
  • 1
  • 1
sunshinekitty
  • 2,307
  • 6
  • 19
  • 31
  • 2
    The only sensible way would be to use a cron job or Windows scheduled task. There should be no reason you leave it up to the end user to do this: script the creation of the cron job/task on installation. – Duncan Mar 28 '14 at 14:12
  • 5
    Things to consider: 1. What happens if your function takes longer than a minute/hour: do you start the second instance or do you wait for the first instance to complete or do you cancel the second instance completely (skip the interval)? 2. What happens if the computer time jumps back/forward (DST or a manual change): do you repeat/skip the corresponding executions? 3. What happens if the computer wakes up from hibernation: do you start the task immediately or wait for the next cycle to run? 4. Do you know how cron, Task Scheduler, APScheduler behave in those cases? – jfs Mar 28 '14 at 16:02
  • Did you yourself trigger the script at first or it runs on it's own? if that's the case isn't it results in a lot of memory consumption and cpu usage... – NightOwl19 Aug 24 '18 at 10:57

14 Answers14

108

Maybe this can help: Advanced Python Scheduler

Here's a small piece of code from their documentation:

from apscheduler.schedulers.blocking import BlockingScheduler

def some_job():
    print "Decorated job"

scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
LoveGandhi
  • 402
  • 4
  • 13
Unknown
  • 5,722
  • 5
  • 43
  • 64
  • 4
    This is the answer to my problems it looks like, so simple and easy to use. I've appended some test code to my reply. Thanks again. – sunshinekitty Mar 28 '14 at 14:49
  • 3
    This syntax is no longer up to date – Jared Beach Nov 09 '15 at 18:25
  • 4
    This works with apscheduler version < 3.x Could someone give us a simple example like this but with 3.x version? – toscanelli Dec 30 '15 at 10:51
  • 2
    OK, so the scheduler started after I run the command `python test_scheduler.py` from DOS, so how do I stop it? Only by pressing Ctrl+C? Is there anyway to make the scheduler stop after running 100 times? Or at a specific time say at 18:00? How do you specify it in the script. I searched but found no one mentions how to stop it. – StayFoolish Sep 08 '17 at 09:02
  • @StayFoolish did you find a way to stop a job midway during an execution or just stopping a job from running again. – lukik Feb 10 '20 at 02:57
  • Got this TypeError: Only timezones from the pytz library are supported – Karl Feb 04 '22 at 15:31
  • @lukik The API has a section on removing jobs. For example, `job = scheduler.add_job(myfunc, 'interval', minutes=2)` then `job.remove()`. https://apscheduler.readthedocs.io/en/3.x/userguide.html#removing-jobs – himty Jun 23 '23 at 18:49
34

To run something every 10 minutes past the hour.

from datetime import datetime, timedelta

while 1:
    print 'Run something..'

    dt = datetime.now() + timedelta(hours=1)
    dt = dt.replace(minute=10)

    while datetime.now() < dt:
        time.sleep(1)
Shane Davies
  • 940
  • 10
  • 10
28

For apscheduler < 3.0, see Unknown's answer.

For apscheduler > 3.0

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

Update:

apscheduler documentation.

This for apscheduler-3.3.1 on Python 3.6.2.

"""
Following configurations are set for the scheduler:

 - a MongoDBJobStore named “mongo”
 - an SQLAlchemyJobStore named “default” (using SQLite)
 - a ThreadPoolExecutor named “default”, with a worker count of 20
 - a ProcessPoolExecutor named “processpool”, with a worker count of 5
 - UTC as the scheduler’s timezone
 - coalescing turned off for new jobs by default
 - a default maximum instance limit of 3 for new jobs
"""

from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

"""
Method 1:
"""
jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

"""
Method 2 (ini format):
"""
gconfig = {
    'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
}

sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format


@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')


@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')


sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()

sched_method2.configure(gconfig=gconfig)
sched_method2.start()
Animesh Pandey
  • 5,900
  • 13
  • 64
  • 130
13

the simplest option I can suggest is using the schedule library.

In your question, you said "I will need to run a function once every hour" the code to do this is very simple:

    import schedule

    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().hour.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()

you also asked how to do something at a certain time of the day some examples of how to do this are:

    import schedule


    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().day.at("10:30").do(thing_you_wanna_do)
    schedule.every().monday.do(thing_you_wanna_do)
    schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
    # If you would like some randomness / variation you could also do something like this
    schedule.every(1).to(2).hours.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()

90% of the code used is the example code of the schedule library. Happy scheduling!

Hexiro
  • 171
  • 4
  • 9
3

Run the script every 15 minutes of the hour. For example, you want to receive 15 minute stock price quotes, which are updated every 15 minutes.

while True:
    print("Update data:", datetime.now())
    sleep = 15 - datetime.now().minute % 15
    if sleep == 15:
        run_strategy()
        time.sleep(sleep * 60)
    else:
        time.sleep(sleep * 60)
2
   #For scheduling task execution
import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).minutes.do(job)
#schedule.every().hour.do(job)
#schedule.every().day.at("10:30").do(job)
#schedule.every(5).to(10).minutes.do(job)
#schedule.every().monday.do(job)
#schedule.every().wednesday.at("13:15").do(job)
#schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
2

Perhaps Rocketry suits your needs. It's a powerful scheduler that is very easy to use, has a lot of built-in scheduling options and it is easy to extend:

from rocketry import Rocketry
from rocketry.conds import daily, every, after_success

app = Rocketry()

@app.task(every("1 hour 30 minutes"))
def do_things():
    ...    

@app.task(daily.between("12:00", "17:00"))
def do_daily_afternoon():
    ...

@app.task(daily & after_success(do_things))
def do_daily_after_task():
    ...

if __name__ == "__main__":
    app.run()

It has much more though:

  • String based scheduling syntax
  • Logical statements (AND, OR, NOT)
  • A lot of built-in scheduling options
  • Easy to customize (custom conditions, parameters etc.)
  • Parallelization (run on separate thread or process)
  • Paramatrization (execution order and input-output)
  • Persistence: put the logs anywhere you like
  • Modify scheduler on runtime (ie. build API on top of it)

Links:

Disclaimer: I'm the author

miksus
  • 2,426
  • 1
  • 18
  • 34
1

The Python standard library does provide sched and threading for this task. But this means your scheduler script will have be running all the time instead of leaving its execution to the OS, which may or may not be what you want.

Emilia Bopp
  • 866
  • 10
  • 20
  • 2
    would be nice to see a demo. – Russia Must Remove Putin Mar 28 '14 at 15:05
  • 1
    There are examples in the docs I linked. And I bet googling them yields even more. – Emilia Bopp Mar 28 '14 at 15:06
  • 1
    Ah, so you don't want upvotes, and you'd prefer the possibility of being flagged for link-only answers? – Russia Must Remove Putin Mar 28 '14 at 15:07
  • 7
    Uhm, no I'm not really hunting upvotes, just wanted provide some pointers. It's just that I don't quite see the need to provide a completely fleshed out example, when there are so many available. Especially since OP has already settled on another solution. – Emilia Bopp Mar 28 '14 at 15:10
  • Yes, some code here would've been helpful, but here: http://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds import threading def printit(): threading.Timer(5.0, printit).start() print "Hello, World!" printit() – FredFury Sep 25 '16 at 10:22
  • This should be a comment rather than an answer – Josh Noe Sep 14 '19 at 18:20
1

On the version posted by sunshinekitty called "Version < 3.0" , you may need to specify apscheduler 2.1.2 . I accidentally had version 3 on my 2.7 install, so I went:

pip uninstall apscheduler
pip install apscheduler==2.1.2

It worked correctly after that. Hope that helps.

Arkham Angel
  • 309
  • 1
  • 5
  • 18
1

clock.py

from apscheduler.schedulers.blocking import BlockingScheduler
import pytz

sched = BlockingScheduler(timezone=pytz.timezone('Africa/Lagos'))

@sched.scheduled_job('cron', day_of_week='mon-sun', hour=22)
def scheduled_job():
    print('This job is run every week at 10pm.')
    #your job here


sched.start()

Procfile

clock: python clock.py

requirements.txt

APScheduler==3.0.0

After deployment, the final step is to scale up the clock process. This is a singleton process, meaning you’ll never need to scale up more than 1 of these processes. If you run two, the work will be duplicated.

$ heroku ps:scale clock=1

Source: https://devcenter.heroku.com/articles/clock-processes-python

0

Probably you got the solution already @lukik, but if you wanna remove a scheduling, you should use:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

or

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

if you need to use a explicit job ID

For more information, you should check: https://apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs

0

I found that scheduler needs to run the program every second. If using a online server it would be costly. So I have following:

It run at each minute at the 5th second, and you can change it to hours days by recalculating waiting period in seconds

import time
import datetime
Initiating = True
print(datetime.datetime.now())
while True:
    if Initiating == True:
        print("Initiate")
        print( datetime.datetime.now())
        time.sleep(60 - time.time() % 60+5)
        Initiating = False
    else:
        time.sleep(60)
        print("working")
        print(datetime.datetime.now())
0

This method worked for me using relativedelta and datetime and a modulo boolean check for every hour. It runs every hour from the time you start it.

import time
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

 #Track next run outside loop and update the next run time within the loop   
    nxt_run=datetime.now()

#because while loops evaluate at microseconds we basically need to use a boolean evaluation to track when it should run next
    while True:
        cnow = datetime.now() #track the current time
        time.sleep(1) #good to have so cpu doesn't spike
        if (cnow.hour % 1 == 0 and cnow >= nxt_run):
           print(f"start @{cnow}: next run @{nxt_run}")
           nxt_run=cnow+relativedelta(hours=1) #add an hour to the next run
        else:
           print(f"next run @{nxt_run}")
Lelouch
  • 549
  • 6
  • 6
-2

One option is to write a C/C++ wrapper that executes the python script on a regular basis. Your end-user would run the C/C++ executable, which would remain running in the background, and periodically execute the python script. This may not be the best solution, and may not work if you don't know C/C++ or want to keep this 100% python. But it does seem like the most user-friendly approach, since people are used to clicking on executables. All of this assumes that python is installed on your end user's computer.

Another option is to use cron job/Task Scheduler but to put it in the installer as a script so your end user doesn't have to do it.

ubadub
  • 3,571
  • 21
  • 32