I have a flask app with waitress that gets some data in post-request, then it runs some long computations in long_function
and returns result. These computations are parallel and I'm using pebble
because I need a timeout option. Also I want the user to be able to send a request to restart the server (i.e. he want to change the number of threads for waitress
)
I've found this solution https://gist.github.com/naushadzaman/b65534d912f1551c7d8366b326b7a151
It mostly works, but it doesn't interact well with my pebble
pool. I'm having trouble reloading the server while it is in the pool. If I use long_function_without_pool
, that doesn't use any multiprocessing, I can reload server even if it is currently does some job (results are lost, of course, but this is what I want). But with long_function
I have to wait for the pool to be closed and only then can I restart the server. If I try to send restart request while the pool is still open, I get an error:
OSError: [Errno 98] Address already in use
So I suppose that p.terminate()
doesn't work if there is a Pool
running.
How can I fix this code, or maybe should I use a different solution?
Brief instructions on how to replicate this error:
start the app
send POST-request with empty body to http://localhost:5221/
before you get a response (you'll have 5 seconds) send GET-request without variables to http://localhost:5221/restart/
enjoy. Server is stuck now and is not responding to anything
import subprocess from flask import Flask from flask_restful import Api, Resource from flask_cors import CORS from webargs.flaskparser import parser, abort import json import time import sys from waitress import serve from multiprocessing import Process, Queue from concurrent.futures import TimeoutError from pebble import ProcessPool, ProcessExpired import functools some_queue = None APP = Flask(__name__) API = Api(APP) CORS(APP) @APP.route('/restart/', methods=['GET'], endpoint='start_flaskapp') def restart(): try: some_queue.put("something") print("Restarted successfully") return("Quit") except: print("Failed in restart") return "Failed" def start_flaskapp(queue): global some_queue some_queue = queue API.add_resource(FractionsResource, "/") serve(APP, host='0.0.0.0', port=5221, threads=2) def long_function(): with ProcessPool(5) as pool: data = [0, 1, 2, 3, 4] future = pool.map(functools.partial(add_const, const=1), data, timeout=5) iterator = future.result() result=[] while True: try: result.append(next(iterator)) except StopIteration: break except TimeoutError as error: print("function took longer than %d seconds" % error.args[1]) return(result) def long_function_without_pool(): data = [0, 1, 2, 3, 4] result = list(map(functools.partial(add_const, const=1), data)) return(result) def add_const(number, const=0): time.sleep(5) return number+const class FractionsResource(Resource): @APP.route('/', methods=['POST']) def post(): response = long_function() return(json.dumps(response)) if __name__ == "__main__": q = Queue() p = Process(target=start_flaskapp, args=(q,)) p.start() while True: #wathing queue, if there is no call than sleep, otherwise break if q.empty(): time.sleep(1) else: break p.terminate() #terminate flaskapp and then restart the app on subprocess args = [sys.executable] + [sys.argv[0]] subprocess.call(args)