In your code self.worker.work(self.args)
in run
is blocking until it
finishes the whole loop. If you just terminate the process, the part sending
back anything to the parent will never run.
Instead we need a way to let the process finish gracefully, so it can send back
the object to the parent. worker.run
is not allowed to block for that, so
my code below wraps it in an extra thread. The main-thread in the child process
starts this thread and runs a while-loop, checking for any message sent over a
pipe and checking for the worker-thread being alive. This loop will break if your
worker finishes naturally or the parent sends a "poison-pill". When this
happens saving and sending can occur and the parent can .get()
the instance.
import time
import logging
from threading import Thread
from multiprocessing import Process, Pipe
def init_logging(level=logging.DEBUG):
fmt = '[%(asctime)s %(levelname)-8s %(processName)s' \
' %(funcName)s()] --- %(message)s'
logging.basicConfig(format=fmt, level=level)
class Worker:
def __init__(self, n):
self.n = n
def run(self):
for i in range(int(self.n)):
self.n -= 1
return self
def __str__(self):
return f'{self.n}'
def __repr__(self):
return f'Worker(n={self.n})'
class MyProcess(Process):
def __init__(self, n, log_level=logging.DEBUG):
super().__init__()
self.args = n
self.log_level = log_level
self.worker = None
self.worker_thread = None
self.parent_conn, self.child_conn = Pipe()
logging.getLogger().debug('process instantiated')
def run(self):
init_logging(self.log_level)
logging.getLogger().debug('process started')
self.worker = Worker(self.args)
self.worker_thread = Thread(target=self.worker.run)
self.worker_thread.daemon = True
self.worker_thread.start()
while not self.child_conn.poll() and self.worker_thread.is_alive():
self.worker_thread.join(0.5) # heartbeat for checking
self._save()
def _save(self):
"""Send worker instance to parent."""
logging.getLogger().debug('sending instance to parent')
self.child_conn.send(self.worker)
self.child_conn.close()
def close(self):
"""Close process and receive result."""
logging.getLogger().debug('closing process')
# The actual value we are sending to child does not matter because
# the while loop in `run` will break upon receipt of any object.
self.parent_conn.send('POISON')
def get(self):
"""Get result from child."""
logging.getLogger().debug('get result from child')
self.worker = self.parent_conn.recv()
return self.worker
I tested this under Linux but with start_method set to 'spawn', default on Windows, so I expect it to run.
if __name__ == '__main__':
init_logging()
logger = logging.getLogger()
p = MyProcess(100e6) # try 10 vs 100e6 to toggle behaviour
p.start()
p.join(2)
if p.is_alive():
p.close()
p.get()
logger.info('killed worker')
time.sleep(0.1) # just to keep stdout in order
print('shared object ' + repr(p.worker))
else:
p.get()
logger.info('worker was in time')
time.sleep(0.1) # just to keep stdout in order
print('shared object ' + repr(p.worker))
assert isinstance(p.worker, Worker)
p.join()
Example Output:
[2018-09-08 05:27:46,316 DEBUG MainProcess __init__()] --- process instantiated
[2018-09-08 05:27:46,370 DEBUG MyProcess-1 run()] --- process started
[2018-09-08 05:27:48,321 DEBUG MainProcess close()] --- closing process
[2018-09-08 05:27:48,322 DEBUG MainProcess get()] --- get result from child
[2018-09-08 05:27:48,396 DEBUG MyProcess-1 _save()] --- sending instance to parent
[2018-09-08 05:27:48,402 INFO MainProcess <module>()] --- killed worker
shared object Worker(n=82683682.0)
Process finished with exit code 0
Note worker.n
counted down from 100M to 82.68M within 2 seconds before the .close()
call.