I am building a Flask MethodView driven API. For a particular endpoint, I use the request data to launch a potentially long running command. Instead of waiting for the command to finish, I wrap it in a multiprocessing.Process
, call start, and then return an HTTP 202 to the user along with a url they can use to monitor the status of the process.
class EndPointAPI(MethodView):
def __init__(self):
""" On init, filter requests missing JSON body."""
# Check for json payload
self.except = ["GET", "PUT", "DELETE" ]
if (request.method not in self.except) and not request.json:
abort(400)
def _long_running_function(self, json_data):
"""
In this function, I use the input JSON data
to write a script to the file system, then
use subprocess.run to execute it.
"""
return
def post(self):
""" """
# Get input data
json_data = request.json
# Kick off the long running function
p = Process(target=long_running_function, args=(json_data,))
p.start()
response = {
"result" : "job accepted",
"links" : {
"href" : "/monitor_job/",
}
}
return jsonify(response), 202
It looks like the processes launched in the post
method are becoming zombies after they finish, but I can't figure out how to properly track and clean them up without blocking the execution of the parent method. I tried implementing a monitoring thread as suggested in Python join a process without blocking parent. As I understand it, it suggests running a seperate thread that monitors a FIFO Queue, and then putting the process handle in the queue before returning the parent function. I tried an implementation (below), but it looks like you can't pass the process object into the Thread because it contains a protected AuthenticationString
attribute.
Traceback (most recent call last):
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
| obj = _ForkingPickler.dumps(obj)
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
| cls(buf, protocol).dump(obj)
| File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/process.py", line 291, in __reduce__
| 'Pickling an AuthenticationString object is '
| TypeError: Pickling an AuthenticationString object is disallowed for security reasons
This is my implementation of Python join a process without blocking parent. I have no idea if this would work because the above error shuts the whole system down right from the start. Any thoughts or suggestions on how I can responsibly launch these processes without blocking the calling method is very much appreciated.
from threading import Thread
from multiprocessing import Queue, ...
class Joiner(Thread):
def __init__(self, q):
super().__init__()
self.__q = q
def run(self):
while True:
child = self.__q.get()
if child == None:
return
child.join()
class EndPointAPI(MethodView):
def __init__(self):
""" On init, filter requests missing JSON body."""
self._jobs = Queue()
self._babysitter = Joiner(self._jobs)
self._babysitter.start()
# Check for json payload
self.except = ["GET", "PUT", "DELETE" ]
if (request.method not in self.except) and not request.json:
abort(400)
def _long_running_function(self, json_data):
"""
In this function, I use the input JSON data
to write a script to the file system, then
use subprocess.run to execute it.
"""
return
def post(self):
""" """
# Get input data
json_data = request.json
# Kick off the long running function
p = Process(target=long_running_function, args=(json_data,))
p.start()
self._jobs.put(p)
response = {
"result" : "job accepted",
"links" : {
"href" : "/monitor_job/",
}
}
return jsonify(response), 202