0

I have a python flask app that waits for requests from user app and then spawns a process with job based on the request it receives. It keeps the status and queue of the jobs in memory. The requests to this service will always have this pattern:

  1. Submit job
  2. (optional) upload additional data
  3. check every 5 seconds if the job is finished
  4. download results
  5. delete job data

I have simply created lists and dicts for the queue, running and finished jobs. The logic is that first submit request is called with information if it needs to wait for additional data or not. If no data is needed, it will put it in the queue or spawn a process if there is any CPU free.

If additional data was needed, it will be put into separate queue and once the request with additional data is completed, it will spawn the job (or put it into queue). Since every user app that has submitted a submit request will check every few seconds if their job is done, I have used this to check if any job is done and move it to finished and spawn another job from the queue.

Once the check request returns that it is done, the user app will download the results and than call for termination of the job data.

What surprised me, was that the flask app would spawn together with each thread if it is not protected by if __name__ == '__main__': even when the spawned job was in different file and is not referencing anything from the flask app file.

Since I'm relatively new to flask and multiprocessing, is there anything else I should be worrying about?

This is the code:

import os
import uuid
from os.path import isfile, isdir

from flask import Flask, request, Response, send_from_directory
from multiprocessing import Process
from werkzeug.utils import secure_filename

from helpers import str2bool
from temporary_test_task import run_task

if __name__ == '__main__':
    app = Flask(__name__)
    running = {}
    finished = []
    queue = []
    waiting_for_data = {}
    queue_list = set()


    @app.route('/status', methods=['GET', 'POST'])
    def status():
        return "OK", 200


    def finish_job(job_id):
        finished.append(job_id)
        last = running.pop(job_id)
        last.close()
        if len(queue) > 0:
            next_job = queue.pop()
            queue_list.remove(next_job[0])
            start_job(next_job)


    def start_job(job=None):
        if job is None:
            job = queue.pop()
            queue_list.remove(job[0])
        task_cb = Process(target=run_task, args=(job[0], job[1]))
        task_cb.start()
        print('started thread')
        running[job[0]] = task_cb


    def remove_finished():
        for j in list(running.keys()):
            if not running[j].is_alive():
                finish_job(j)


    @app.route("/Simulation", methods=['POST'])
    def submit_job():
        # create id
        job_id = str(uuid.uuid4())
        job_data = request.data.decode('utf-8')

        # check if waiting for data
        if str2bool(request.headers.get('UseMlData', False)):
            waiting_for_data[str(job_id)] = job_data
            status = 'WAITING_FOR_DATA'
        else:
            status = submit_job_local(job_id, job_data)
        return status, 200


    def submit_job_local(job_id, job_data):
        # check not too many processing jobs
        if len(running) >= config.threads:
            queue.append((job_id, job_data))
            queue_list.add(job_id)
            status = 'QUEUED'
        else:
            start_job((job_id, job_data))
            status = 'RUNNING'
        return status


    @app.route("/Simulation/<uuid:job_id>", methods=['GET'])
    def check_status(job_id: uuid):
        job_id = str(job_id)
        remove_finished()
        if job_id in running:
            r = 'RUNNING'
        elif job_id in queue_list:
            r = 'QUEUED'
        elif job_id in finished:
            r = 'COMPLETED'
        else:
            r = 'FAILED'

        return r, 200


    @app.route('/Simulation/<uuid:job_id>/UploadData', methods=['POST'])
    def upload_file(job_id):
        job_id = str(job_id)
        if job_id not in waiting_for_data:
            return 'uuid not in waiting for data mode', 400
        number_of_files = 0
        base_path = os.path.join('uploadedData', job_id)
        if not os.path.exists(base_path):
            os.makedirs(base_path)
        for file in request.files.values():
            if file.filename == '':
                return 'no file name', 400
            if file:
                filename = secure_filename(file.filename)
                path = os.path.join(base_path, filename)
                file.save(path)
                number_of_files += 1
        submit_job_local(job_id, waiting_for_data.pop(job_id))
        return str(number_of_files) + ' files uploaded', 200


    @app.route('/Simulation/<uuid:job_id>/Results', methods=['GET'])
    def download_results(job_id):
        if str(job_id) not in finished:
            return 'job id not found', 404
        base_path = os.path.join('results', str(job_id))
        file_path = os.path.join(base_path, 'result.xml')
        if not isfile(file_path):
            return 'file not found', 404

        return send_from_directory(base_path, 'result.xml')


    app.run()
Brian
  • 12,145
  • 20
  • 90
  • 153
Ondrej
  • 414
  • 1
  • 5
  • 13
  • 1
    I think it should be enough to put only the last line in the protected `if __name__...` statement. Other than that, I don't spot anything unsual. You might get more answers on https://codereview.stackexchange.com/ – Dschoni Nov 03 '20 at 13:23
  • well, The thing is I don't understand why is it called at all in the new process. I also have some more initialization functions (download certificate) so that's why I put every thing in that if. And thanks for the hint with the codereview. Didn't know about that one – Ondrej Nov 03 '20 at 14:04
  • 1
    Please see this question (and answers) for an explanation of the `__name__` construct in windows. https://stackoverflow.com/questions/20222534/python-multiprocessing-on-windows-if-name-main TL;DR: The calling module is imported by the child on subprocess creation on windows as there is no `fork` – Dschoni Nov 03 '20 at 16:56

0 Answers0