I am trying to build an application that lets you schedule and execute multiple long running jobs in a background thread using APScheduler. To control the job schedules and view the (live) output of the jobs I want to send messages to the Flask application that runs in the same process (using Blinker) so I can stream them to a web client using Flask-SocketIO.
I came up with the following code but it seems send_log_update()
is not being called at all. Please note that I have not yet added Flask-SocketIO to this example. I first wanted to make sure I could communicate to the Flask application before further complicating things.
Is this a sensible way to go about things? And if so: Am I doing something wrong here? I am not married to any of the used solutions specifically but I do need something like APScheduler to schedule jobs at specific times (instead of just intervals, like in this example).
I have considered the possibility of also using websockets to provide the communication between the background job and the rest of the application but that would be too unreliable. I have to process all output coming from the background process (to send to a log ingester) in addition to streaming it to a web client and I would like to keep the background job as agnostic of any databases and logging frameworks as possible.
# pip install flask apscheduler sqlalchemy blinker
from time import sleep
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc
# initialize Flask+SocketIO
app = Flask(__name__)
# signal to communicate between background thread and Flask
logsignal = signal('log')
# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
# eventually I want to send this to the web client using
# Flask-SocketIO
print('received signal: ' + log_line)
# Background job that will run in the scheduler thread
def background_job():
print('starting background job')
logsignal.send('starting job')
sleep(3)
logsignal.send('job done')
# configure APScheduler
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
# create and start scheduler
scheduler = BackgroundScheduler(
job_defaults=job_defaults, jobstores=jobstores, timezone=utc)
if __name__ == '__main__':
scheduler.add_job(background_job, 'interval', seconds=5,
replace_existing=True, id='sample_job',
args=[])
scheduler.start()
app.run()