[Note: using Python 2.7 and jupyter notebook environment]
I would like to be able to schedule instance methods as jobs using APscheduler and store these jobs in a persistent DB [in this case, mongodb].
When trying to do so however, I hit the following error: unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)
Previous to this i've successfully: (a) scheduling instance methods as jobs (b) storing jobs in mongodb
However, I can't get these two to work in conjunction.
What works:
A basic example of (a) scheduling instance methods as jobs...
from apscheduler.schedulers.background import BackgroundScheduler
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
Runs as expected, printing 'test' every 5 seconds.
What breaks:
However, as soon as I add a jobstore...
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
The job successfully runs the first time, but on all subsequent calls errors due to 'unbound method'. I'd guess that the job instance is not being retrieved from the jobstore, so that no 'self' is passed to the use_variable method...
From the docs...
From the docs: In case of a bound method, passing the unbound version (YourClass.method_name) as the target function to add_job() with the class instance as the first argument (so it gets passed as the self argument)
As such, I tried:
scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)
No luck either.
Current workaround
I'm currently using the following workaround, however it's pretty hacky and I'd like to find a more elegant solution!
def proxy(job):
job.use_variable()
scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)
Update with blocking scheduler as suggested by Alex
Minimal example code:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(job):
print(job.variable)
job = Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)
def my_listener(event):
if (event.exception):
print('Exception: {}'.format(event.exception))
scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)
scheduler.start()
Reproduces error. First call successfully prints 'test', but subsequent calls hit the unbound error [at least in a jupyter notebook environment]...