2

I'm using Flask 1.0.2 with Python 3.6 on Ubuntu 18.04. My app should use asyncio and asyncio.create_subprocess_exec() to lauch a background script, read stdout from it, and then return status when the script is done.

I am basically trying to implement an answer from this post: Non-blocking read on a subprocess.PIPE in python

The script is successfully launched, and I get all of my expected output from it, but the problem is that it never returns ( meaning the Killing subprocess now line is never reached). When I check the process list (ps) from the Linux terminal, the background script has exited.

What am I doing wrong and how can I successfully break out of the async for line in process.stdout loop?

At the top of my file after my imports I create my event loop:

# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)

I define my async coroutine above my route:

async def readAsyncFunctionAndKill(cmd):
    # Use global event loop
    global eventLoop

    print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
    process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
    print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
    async for line in process.stdout:
        line = line.decode(locale.getpreferredencoding(False))
        print("%s"%line, flush=True)
    print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
    process.kill()
    print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
    return await process.wait()  # wait for the child process to exit

And my (abbreviated) route is here:

@app.route("/train_model", methods=["GET"])
def train_new_model():
    # Use global event loop
    global eventLoop   

    with closing(eventLoop):        
        eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))

    return jsonify("done"), 200

The "s.py" script called is marked as executable and is in the same working directory. The abbreviated script is shown here ( it contains several subprocesses and instantiates PyTorch classes ):

def main():

    # Ensure that swap is activated since we don't have enough RAM to train our model otherwise
    print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
    subprocess.call("swapon -a", shell=True)

    # Need to initialize GPU
    print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    defaults.device = torch.device("cuda")
    with torch.cuda.device(0):
        torch.tensor([1.]).cuda()

    print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))

    try:

        print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))


        # Batch size
        bs = 16
        #bs = 8

        # Create ImageBunch
        tfms = get_transforms(do_flip=True,
                              flip_vert=True,
                              max_rotate=180.,
                              max_zoom=1.1,
                              max_lighting=0.5,
                              max_warp=0.1,
                              p_affine=0.75,
                              p_lighting=0.75)

        # Create databunch using folder names as class names
        # This also applies the transforms and batch size to the data
        os.chdir(TRAINING_DIR)
        data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)

        ...    

        # Create a new learner with an early stop callback
        learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
            partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])

        ... 

        print("[%s] All done training ..." % (os.path.basename(__file__)))

        # Success
        sys.exit(0)

    except Exception as err:

        print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
        sys.exit(255)

if __name__== "__main__":
  main()
PhilBot
  • 748
  • 18
  • 85
  • 173
  • I've tried myself on a minimal example, works as expected. Does your `s.py` script actually ends or exits without error if you run it yourself from the command line? – frankie567 Oct 29 '19 at 08:55
  • Why are you closing the event loop after each `run_until_complete()`? You never re-open the loop. You really don't need to manage the loop to this extent, however. – Martijn Pieters Oct 29 '19 at 11:50
  • Can you please share your output that you **do** see? Is `s.py` made executable? What is the current working directory when you try to run this, does it match the directory where `s.py` is located? – Martijn Pieters Oct 29 '19 at 11:51
  • As for your debug print statements, why not just use `print("[%s] Starting async Training Script ..." % (__name__,))` instead of extracting the base name each time? Or better, use the `logging` module and `logging.getLogger(__name__)` to attach the current module name to the logged records? – Martijn Pieters Oct 29 '19 at 12:14
  • Thanks for the comments. The logger is the way to go - I'll be changing that. I added the s.py script above, it gets all the way down to the final print statement "All done training ..." and then it never returns. The next line is "sys.exit(0)". – PhilBot Oct 29 '19 at 12:49
  • I tried with and without closing the event loop - it did not work either way. It shouldn't be closed each time. – PhilBot Oct 29 '19 at 12:51

1 Answers1

9

There are several concerns here:

  • You are creating a new event loop on import, once, but close the event loop in your view. There is no need to close the loop, at all, because a second request will now fail because the loop is closed.

  • The asyncio event loop is not thread safe, and should not be shared between threads. The vast majority of Flask deployments will use threads to handle incoming requests. Your code carries echoes of how this should be handled instead but unfortunately it is not the correct approach. E.g. asyncio.get_child_watcher().attach_loop(eventLoop) is mostly redundant because eventLoop = asyncio.new_event_loop(), if run on the main thread, already does exactly that.

    This is the main candidate for the issues you are seeing.

  • Your code assumes that the executable is in fact present and executable. You should be handling OSError exceptions (and subclasses), because an unqualified s.py would only work if it is made executable, starts with a #! shebang line and is found on the PATH. It won't work just because it is in the same directory, nor would you want to rely on the current working directory anyway.

  • Your code assumes that the process closes stdout at some point. If the subprocess never closes stdout (something that happens automatically when the process exits) then your async for line in process.stdout: loop will wait forever too. Consider adding timeouts to the code to avoid getting blocked on a faulty subprocess.

There are two sections in the Python asyncio documentation that you really would want to read when using asyncio subprocesses in a multi-threaded application:

For your use case, there really no need to need to run a new event loop per thread. Run a single loop, in a separate thread as needed. If you use a loop in a separate thread, depending on your Python version, you may need to have a running loop on the main thread as well or use a different process watcher. Generally speaking, running an asyncio loop on the main thread in a WSGI server is not going to be easy or even possible.

So you need to run a loop, permanently, in a separate thread, and you need to use a child process watcher that works without a main thread loop. Here is an implementation for just that, and this should work for Python versions 3.6 and newer:

import asyncio
import itertools
import logging
import time
import threading

try:
    # Python 3.8 or newer has a suitable process watcher
    asyncio.ThreadedChildWatcher
except AttributeError:
    # backport the Python 3.8 threaded child watcher
    import os
    import warnings

    # Python 3.7 preferred API
    _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

    class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
        def __init__(self):
            self._pid_counter = itertools.count(0)
            self._threads = {}

        def is_active(self):
            return True

        def close(self):
            pass

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        def __del__(self, _warn=warnings.warn):
            threads = [t for t in list(self._threads.values()) if t.is_alive()]
            if threads:
                _warn(
                    f"{self.__class__} has registered but not finished child processes",
                    ResourceWarning,
                    source=self,
                )

        def add_child_handler(self, pid, callback, *args):
            loop = _get_running_loop()
            thread = threading.Thread(
                target=self._do_waitpid,
                name=f"waitpid-{next(self._pid_counter)}",
                args=(loop, pid, callback, args),
                daemon=True,
            )
            self._threads[pid] = thread
            thread.start()

        def remove_child_handler(self, pid):
            # asyncio never calls remove_child_handler() !!!
            # The method is no-op but is implemented because
            # abstract base class requires it
            return True

        def attach_loop(self, loop):
            pass

        def _do_waitpid(self, loop, expected_pid, callback, args):
            assert expected_pid > 0

            try:
                pid, status = os.waitpid(expected_pid, 0)
            except ChildProcessError:
                # The child process is already reaped
                # (may happen if waitpid() is called elsewhere).
                pid = expected_pid
                returncode = 255
                logger.warning(
                    "Unknown child process pid %d, will report returncode 255", pid
                )
            else:
                if os.WIFSIGNALED(status):
                    returncode = -os.WTERMSIG(status)
                elif os.WIFEXITED(status):
                    returncode = os.WEXITSTATUS(status)
                else:
                    returncode = status

                if loop.get_debug():
                    logger.debug(
                        "process %s exited with returncode %s", expected_pid, returncode
                    )

            if loop.is_closed():
                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
            else:
                loop.call_soon_threadsafe(callback, pid, returncode, *args)

            self._threads.pop(expected_pid)

    # add the watcher to the loop policy
    asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

logger = logging.getLogger(__name__)

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            loop.close()
            asyncio.set_event_loop(None)

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread
    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                deadline = time.time() + 1
                while not _loop_thread.loop and time.time() < deadline:
                    time.sleep(0.001)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

The above is the same general 'run async with Flask' solution as I posted for Make a Python asyncio call from a Flask route, but with the addition of the ThreadedChildWatcher backport.

You can then use the loop returned from get_event_loop() to run child processes, by calling run_coroutine_threadsafe():

import asyncio
import locale
import logging

logger = logging.getLogger(__name__)


def get_command_output(cmd, timeout=None):
    encoding = locale.getpreferredencoding(False)

    async def run_async():
        try:
            process = await asyncio.create_subprocess_exec(
                cmd, stdout=asyncio.subprocess.PIPE)
        except OSError:
            logging.exception("Process %s could not be started", cmd)
            return
        
        async for line in process.stdout:
            line = line.decode(encoding)
            # TODO: actually do something with the data.
            print(line, flush=True)

        process.kill()
        logging.debug("Process for %s exiting with %i", cmd, process.returncode)

        return await process.wait()

    future = run_coroutine(run_async())
    result = None
    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        logger.warn('The child process took too long, cancelling the task...')
        future.cancel()
    except Exception as exc:
        logger.exception(f'The child process raised an exception')
    return result

Note that the above function can take a timeout, in seconds, the maximum amount of time you'll wait for the subprocess to complete.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • Great answer but running Python3.6 on Windows10 I get - AttributeError: module 'asyncio' has no attribute 'AbstractChildWatcher' – Sam Redway Jul 22 '20 at 15:04
  • @SamRedway: correct, `AbstractChildWatcher` is POSIX-specific, because [Windows support for subprocesses uses a different mechanism](https://docs.python.org/3/library/asyncio-platforms.html#asyncio-windows-subprocess). – Martijn Pieters Jul 25 '20 at 15:57
  • I got error like: "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py", line 725, in run_coroutine_threadsafe loop.call_soon_threadsafe(callback) AttributeError: 'NoneType' object has no attribute 'call_soon_threadsafe' – Menglong Li May 26 '21 at 07:15
  • @MenglongLi: I checked, and I have indeed updated the `get_event_loop()` implementation in production use to add a timeout; there is a race condition where the thread isn't quite booted up yet. I've updated the code in my answer here to account for that. – Martijn Pieters May 27 '21 at 20:08