0

I am trying to build an application that lets you schedule and execute multiple long running jobs in a background thread using APScheduler. To control the job schedules and view the (live) output of the jobs I want to send messages to the Flask application that runs in the same process (using Blinker) so I can stream them to a web client using Flask-SocketIO.

I came up with the following code but it seems send_log_update() is not being called at all. Please note that I have not yet added Flask-SocketIO to this example. I first wanted to make sure I could communicate to the Flask application before further complicating things.

Is this a sensible way to go about things? And if so: Am I doing something wrong here? I am not married to any of the used solutions specifically but I do need something like APScheduler to schedule jobs at specific times (instead of just intervals, like in this example).

I have considered the possibility of also using websockets to provide the communication between the background job and the rest of the application but that would be too unreliable. I have to process all output coming from the background process (to send to a log ingester) in addition to streaming it to a web client and I would like to keep the background job as agnostic of any databases and logging frameworks as possible.

# pip install flask apscheduler sqlalchemy blinker

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
    # eventually I want to send this to the web client using
    # Flask-SocketIO
    print('received signal: ' + log_line)


# Background job that will run in the scheduler thread
def background_job():
    print('starting background job')
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    app.run()
peterrus
  • 651
  • 2
  • 6
  • 18

1 Answers1

0

The answer was quite simple, I was using @logsignal.connect_via(app) which restricts the send_log_update() handler to only respond to signals originating from the Flask app. After using the regular @logsignal.connect method the handler got executed. I made into a fully working example with a web interface that shows the log being streamed.

# Runs a scheduled job in a background thread using APScheduler and streams
# it's output to a web client using websockets. Communication between the Flask
# thread and APScheduler thread is being done through (blinker) signals.
#
# Install dependencies (preferably in your virtualenv)
#   pip install flask apscheduler sqlalchemy blinker flask-socketio simple-websocket
# and then run with:
#   python this_script.py

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from flask_socketio import SocketIO
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)
socketio = SocketIO(app)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect
def send_log_update(log_line):
    socketio.emit('logUpdate', log_line)


# Background job that will run in the scheduler thread
def background_job():
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


# simple websocket client for testing purposes
@app.route("/")
def info():
    return """
    <html>
    <head>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js" integrity="sha512-q/dWJ3kcmjBLU4Qc47E4A9kTB4m3wuTY7vkFJDTZKjTs8jhyGQnaUrxa0Ytd0ssMZhbNua9hE+E7Qv1j+DyZwA==" crossorigin="anonymous"></script>
    </head>
    <body>
    <h1>Streaming log</h1>
    <pre id="log"></pre>
    <script type="text/javascript" charset="utf-8">
        var socket = io();

        socket.on('logUpdate', function(msg) {
            let log = document.getElementById('log');
            log.append(msg + '\\n');
        });
    </script>
    </body>
    </html>
"""


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    socketio.run(app)
peterrus
  • 651
  • 2
  • 6
  • 18