2

I am new to flask - wsgi - APIs in general. I want to write APIs to start a process multiple times and get the current status. Also, based on the first run time, I want to estimate the total time. I want to deploy it to wsgi server.

I followed these instructions to deploy flask application to wsgi server. Although, used python mod_wsgi as I wanted python3.6.

The code works fine in multi-threaded environment, but fails to share the variable in multi-processing environment. And as I understand, wsgi is a multi-processing environment. I have tried to use multiprocessing Manager, but it doesn't help.

My code for APIs here

from flask import Flask, request, session, copy_current_request_context
from json import dumps
import requests
from threading import Thread as thread
from flask_session import Session
import logging
import time

from multiprocessing import Manager, Process

app = Flask(__name__)

#sess = Session()
#app.secret_key = 'THISISASECRETKEY'
#app.config['SESSION_TYPE'] = 'null'
#sess.init_app(app)
#app.debug =True

logger = logging.getLogger('Gen API Logger')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("/tmp/api.log")
formatter = logging.Formatter('%(process)d %(threadName)s - %(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)

man = Manager()
st_dict = man.dict()
cnt = man.Value('i',-1)

time_per_run = []

st_dict['status'] = 'notstarted'
print(f"State of run dict is {st_dict.get('status')}")

class Main:
    def __init__(self):
        self.g = -1
        self.runs = 5

    def run(self):
        st_dict['status'] = 'running'
        for g in range(self.runs):
            self.g = g
            cnt.value = g
            time.sleep(30)
        return 1

m = Main()

def run_running():
    running_done = m.run()
    st_dict['status'] = 'finished'

@app.route('/start-running',methods=['GET'])
def api_start():
    try:
        global time_per_run, startTime
        logger.info('Got request for start running.')
        if st_dict.get('status') == 'running':
            msg = {"success":"running already started"}
            logger.info(f'Start running responded with response: {msg}')
            return dumps(msg)
        startTime = time.time()
        start_time = time.time()
        try:
            logger.debug(f"Value of run status before starting thread: {st_dict.get('status')}")
            #thread(target=run_running).start()
            Process(target=run_running).start()
            st_dict['status'] = 'started'
            logger.debug(f"Value of run status after starting thread: {st_dict.get('status')}")
        except Exception as e:
            logger.error(f"Error while starting thread: {e}")
            raise Exception(e)

        while cnt.get() < 1:
            logger.debug(f"Value of cnt: {cnt.get()}")
            time.sleep(10)
            msg = {"success":"true"}
        time_per_run = time.time() - start_time
        logger.debug(f"Value of run status after finishing first run: {st_dict.get('status')}")

    except Exception as e:
        msg = {"error":str(e)}

    logger.info(f'Start running responded with response: {msg}')
    return dumps(msg)


@app.route('/running-status',methods=['GET'])
def api_running_status():
    logger.info('Got request for running status.')
    logger.debug(f"Value of run status in running-status: {st_dict.get('status')}")
    try:
        if st_dict.get('status') != 'notstarted':
            if st_dict.get('status') == 'started':
                msg = {"timeLeft":"calculating"}
            else:
                if st_dict.get('status') == 'running':
                    time_left = ((m.runs) * time_per_run / float(cnt.get())) - time_per_run
                    time_left = time_left if time_left >= 0 else 0
                    msg = {"timeLeft":str(int(time_left))}
                else:
                    msg = {"timeLeft":"0"}
                    st_dict['status'] = 'finished'
        else:
            msg = {"error":"running has not yet started"}
    except Exception as e:
        msg = {"error":str(e)}
    logger.info(f'Running status responded with response: {msg}')
    return dumps(msg)

if __name__ == "__main__":
    #app.config.from_object(__name__)
    app.config['SESSION_TYPE'] = 'memcache'
    #sess.init_app(app)
    app.debug =True
    app.run(threaded=False,processes=5)
    #app.run(threaded=True)

Here is a demo code to test it:

import requests
import time

gen_url = 'http://127.0.0.1:5000'

run_resp = requests.get(f'{gen_url}/running-status')
print(f'Running status response {run_resp.content}')

if run_resp.json()['error'] == 'running has not yet started':

    tr_resp = requests.get(f'{gen_url}/start-running')
    print(f"Running response: {tr_resp.content}")

    run_resp = requests.get(f'{gen_url}/running-status')
    print(f'Running status response {run_resp.content}')

    while(requests.get(f'{gen_url}/running-status').json()['timeLeft'] != '0'):
        print("Entered while")
        time.sleep(10)
    print("While exited")

The logs for when it runs correctly in threaded environment:

4222 Thread-2 - 2018-12-15 12:12:44,265 - Gen API Logger - INFO - Got request for running status.
4222 Thread-2 - 2018-12-15 12:12:44,267 - Gen API Logger - DEBUG - Value of run status in running-status: notstarted
4222 Thread-2 - 2018-12-15 12:12:44,267 - Gen API Logger - INFO - Running status responded with response: {'error': 'running has not yet started'}
4222 Thread-3 - 2018-12-15 12:12:44,273 - Gen API Logger - INFO - Got request for start running.
4222 Thread-3 - 2018-12-15 12:12:44,275 - Gen API Logger - DEBUG - Value of run status before starting thread: notstarted
4222 Thread-3 - 2018-12-15 12:12:44,278 - Gen API Logger - DEBUG - Value of run status after starting thread: started
4222 Thread-3 - 2018-12-15 12:12:44,279 - Gen API Logger - DEBUG - Value of cnt: -1
4222 Thread-3 - 2018-12-15 12:12:54,289 - Gen API Logger - DEBUG - Value of cnt: 0
4222 Thread-3 - 2018-12-15 12:13:04,300 - Gen API Logger - DEBUG - Value of cnt: 0
4222 Thread-3 - 2018-12-15 12:13:14,311 - Gen API Logger - DEBUG - Value of cnt: 0
4222 Thread-3 - 2018-12-15 12:13:24,323 - Gen API Logger - DEBUG - Value of run status after finishing first run: running
4222 Thread-3 - 2018-12-15 12:13:24,323 - Gen API Logger - INFO - Start running responded with response: {'success': 'true'}
4222 Thread-4 - 2018-12-15 12:13:24,333 - Gen API Logger - INFO - Got request for running status.
4222 Thread-4 - 2018-12-15 12:13:24,336 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-4 - 2018-12-15 12:13:24,337 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '160'}
4222 Thread-5 - 2018-12-15 12:13:24,344 - Gen API Logger - INFO - Got request for running status.
4222 Thread-5 - 2018-12-15 12:13:24,346 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-5 - 2018-12-15 12:13:24,346 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '160'}
4222 Thread-6 - 2018-12-15 12:13:34,363 - Gen API Logger - INFO - Got request for running status.
4222 Thread-6 - 2018-12-15 12:13:34,365 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-6 - 2018-12-15 12:13:34,366 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '160'}
4222 Thread-7 - 2018-12-15 12:13:44,378 - Gen API Logger - INFO - Got request for running status.
4222 Thread-7 - 2018-12-15 12:13:44,381 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-7 - 2018-12-15 12:13:44,382 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '60'}
4222 Thread-8 - 2018-12-15 12:13:54,400 - Gen API Logger - INFO - Got request for running status.
4222 Thread-8 - 2018-12-15 12:13:54,402 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-8 - 2018-12-15 12:13:54,403 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '60'}
4222 Thread-9 - 2018-12-15 12:14:04,418 - Gen API Logger - INFO - Got request for running status.
4222 Thread-9 - 2018-12-15 12:14:04,421 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-9 - 2018-12-15 12:14:04,422 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '60'}
4222 Thread-10 - 2018-12-15 12:14:14,439 - Gen API Logger - INFO - Got request for running status.
4222 Thread-10 - 2018-12-15 12:14:14,442 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-10 - 2018-12-15 12:14:14,443 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '26'}
4222 Thread-11 - 2018-12-15 12:14:24,461 - Gen API Logger - INFO - Got request for running status.
4222 Thread-11 - 2018-12-15 12:14:24,463 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-11 - 2018-12-15 12:14:24,465 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '26'}
4222 Thread-12 - 2018-12-15 12:14:34,483 - Gen API Logger - INFO - Got request for running status.
4222 Thread-12 - 2018-12-15 12:14:34,485 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-12 - 2018-12-15 12:14:34,487 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '26'}
4222 Thread-13 - 2018-12-15 12:14:44,504 - Gen API Logger - INFO - Got request for running status.
4222 Thread-13 - 2018-12-15 12:14:44,507 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-13 - 2018-12-15 12:14:44,508 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '10'}
4222 Thread-14 - 2018-12-15 12:14:54,526 - Gen API Logger - INFO - Got request for running status.
4222 Thread-14 - 2018-12-15 12:14:54,529 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-14 - 2018-12-15 12:14:54,530 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '10'}
4222 Thread-15 - 2018-12-15 12:15:04,544 - Gen API Logger - INFO - Got request for running status.
4222 Thread-15 - 2018-12-15 12:15:04,547 - Gen API Logger - DEBUG - Value of run status in running-status: running
4222 Thread-15 - 2018-12-15 12:15:04,548 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '10'}
4222 Thread-16 - 2018-12-15 12:15:14,566 - Gen API Logger - INFO - Got request for running status.
4222 Thread-16 - 2018-12-15 12:15:14,568 - Gen API Logger - DEBUG - Value of run status in running-status: finished
4222 Thread-16 - 2018-12-15 12:15:14,570 - Gen API Logger - INFO - Running status responded with response: {'timeLeft': '0'}

Logs for multiprocess environment where it fails:

5029 Thread-1 - 2018-12-15 12:29:04,487 - Gen API Logger - INFO - Got request for running status.
5029 Thread-1 - 2018-12-15 12:29:04,490 - Gen API Logger - DEBUG - Value of run status in running-status: notstarted
5029 Thread-1 - 2018-12-15 12:29:04,490 - Gen API Logger - INFO - Running status responded with response: {'error': 'running has not yet started'}
5031 Thread-1 - 2018-12-15 12:29:04,501 - Gen API Logger - INFO - Got request for start running.
5031 Thread-1 - 2018-12-15 12:29:04,505 - Gen API Logger - DEBUG - Value of run status before starting thread: notstarted
5031 Thread-1 - 2018-12-15 12:29:04,509 - Gen API Logger - DEBUG - Value of run status after starting thread: started
5031 Thread-1 - 2018-12-15 12:29:04,510 - Gen API Logger - DEBUG - Value of cnt: -1
5031 Thread-1 - 2018-12-15 12:29:14,521 - Gen API Logger - DEBUG - Value of cnt: 0
5031 Thread-1 - 2018-12-15 12:29:24,532 - Gen API Logger - DEBUG - Value of cnt: 0
5031 Thread-1 - 2018-12-15 12:29:34,543 - Gen API Logger - DEBUG - Value of cnt: 0
5031 Thread-1 - 2018-12-15 12:29:44,555 - Gen API Logger - DEBUG - Value of run status after finishing first run: running
5031 Thread-1 - 2018-12-15 12:29:44,555 - Gen API Logger - INFO - Start running responded with response: {'success': 'true'}
5071 Thread-1 - 2018-12-15 12:29:44,571 - Gen API Logger - INFO - Got request for running status.
5071 Thread-1 - 2018-12-15 12:29:44,573 - Gen API Logger - DEBUG - Value of run status in running-status: running
5071 Thread-1 - 2018-12-15 12:29:44,573 - Gen API Logger - INFO - Running status responded with response: {'error': "unsupported operand type(s) for /: 'list' and 'float'"}
5073 Thread-1 - 2018-12-15 12:29:44,584 - Gen API Logger - INFO - Got request for running status.
5073 Thread-1 - 2018-12-15 12:29:44,588 - Gen API Logger - DEBUG - Value of run status in running-status: running
5073 Thread-1 - 2018-12-15 12:29:44,589 - Gen API Logger - INFO - Running status responded with response: {'error': "unsupported operand type(s) for /: 'list' and 'float'"}

How can I correctly share the value of st_dict across multiple requests?

EDIT1: I have also tried to use session from flask as suggested here. But the results were similar. Sometimes, the main thread will handle the request, which results in resetting the st_dict to default value.

Anand
  • 21
  • 4
  • You will need to have a common persistence: database, key-value store (redis for example). For small amounts of data in sessions you can use rich tokens like a JWT. – Klaus D. Dec 15 '18 at 20:55
  • Thanks for the info. I am fairly new to all this. Would it be possible for you to point me to some tutorial or show a small working code? – Anand Dec 15 '18 at 22:49
  • What if the object is not serializable? – Eternal Nov 06 '19 at 09:15
  • For anyone who's looking for a simple task queue using `multiprocessing.managers`, check out this [answer](https://stackoverflow.com/a/57810915/6109537). It should work with multiple workers forked by uwsgi. – laido yagamii Apr 17 '21 at 07:56

0 Answers0