1

I am building a Flask MethodView driven API. For a particular endpoint, I use the request data to launch a potentially long running command. Instead of waiting for the command to finish, I wrap it in a multiprocessing.Process, call start, and then return an HTTP 202 to the user along with a url they can use to monitor the status of the process.

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202

It looks like the processes launched in the post method are becoming zombies after they finish, but I can't figure out how to properly track and clean them up without blocking the execution of the parent method. I tried implementing a monitoring thread as suggested in Python join a process without blocking parent. As I understand it, it suggests running a seperate thread that monitors a FIFO Queue, and then putting the process handle in the queue before returning the parent function. I tried an implementation (below), but it looks like you can't pass the process object into the Thread because it contains a protected AuthenticationString attribute.

Traceback (most recent call last):
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
|     obj = _ForkingPickler.dumps(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
|     cls(buf, protocol).dump(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/process.py", line 291, in __reduce__
 |     'Pickling an AuthenticationString object is '
| TypeError: Pickling an AuthenticationString object is disallowed for security reasons

This is my implementation of Python join a process without blocking parent. I have no idea if this would work because the above error shuts the whole system down right from the start. Any thoughts or suggestions on how I can responsibly launch these processes without blocking the calling method is very much appreciated.

from threading import Thread
from multiprocessing import Queue, ...

class Joiner(Thread):

    def __init__(self, q):
        super().__init__()
        self.__q = q

    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""
        self._jobs = Queue()            
        self._babysitter = Joiner(self._jobs)
        self._babysitter.start()

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()
        self._jobs.put(p)

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202
jeremiahbuddha
  • 9,701
  • 5
  • 28
  • 34

1 Answers1

1

You were so close :) Everything looks just fine except one thing, you're using multiprocessing.Queue to store running processes in order to join them later on with the Joiner instance. From the docs you'll know the following

Note: When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

That is, the process gets serialized when putting into the queue which gives the following error

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

This happens because of a unique authentication key each process has. This key is a byte string which can be thought of as a password which is of type multiprocessing.process.AuthenticationString and cannot be pickled.

The solution is simple, just use queue.Queue instance to store your long-running processes. Here is a working example:

#!/usr/bin/env python3
import os
import time
from queue import Queue
from threading import Thread
from multiprocessing import Process


class Joiner(Thread):

    def __init__(self):
        super().__init__()
        self.workers = Queue()

    def run(self):

        while True:
            worker = self.workers.get()

            if worker is None:
                break

            worker.join()


def do_work(t):
    pid = os.getpid()
    print('Process', pid, 'STARTED')
    time.sleep(t)
    print('Process', pid, 'FINISHED')


if __name__ == '__main__':
    joiner = Joiner()
    joiner.start()

    for t in range(1, 6, 2):
        p = Process(target=do_work, args=(t,))
        p.start()
        joiner.workers.put(p)

    joiner.workers.put(None)
    joiner.join()

Output:

Process 14498 STARTED
Process 14500 STARTED
Process 14499 STARTED
Process 14498 FINISHED
Process 14499 FINISHED
Process 14500 FINISHED
constt
  • 2,250
  • 1
  • 17
  • 18
  • 1
    Thanks for the great answer! Do you understand why the multiprocessing.Queue class is behaving differently in this respect than the queue.Queue class? I'm having a hard time understanding the difference between the two and their intended use cases. – jeremiahbuddha Oct 09 '19 at 19:12
  • @jeremiahbuddha it's because processes don't share a memory space unlike threads that are running in the same memory space, that's why in order to share an object between processes one has to serialize it first and then transfer it to another process. – constt Oct 10 '19 at 01:21