3

Let me start by saying that I'm not using a Queue, so this question is not a duplicate of this one and I'm not using a process pool, so it's not a duplicate of this one.

I have a Process object that uses a pool of thread workers to accomplish some task. For the sake of an MCVE, this task is just constructing a list of the integers from 0 to 9. Here's my source:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg):
        super(Test, self).__init__()
        self.arg = arg
        self.pool = Pool()

    def run(self):
        quest = Quest()
        done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
        stdout.flush()

        self.arg = [item for item in done.get()]

    def __str__(self):
        return str(self.arg)

    # I tried both with and without this method
    def join(self, timeout=None):
        self.pool.close()
        self.pool.join()
        super(Test, self).join(timeout)


test = Test("test")

print(test) # should print 'test' (and does)

test.start()

# this line hangs forever
_ = test.join()

print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

This is a pretty rough model of what I want my actual program to do. The problem, as indicated in the comments, is that Test.join always hangs forever. That's totally independent of whether or not that method is overridden in the Test class. It also never prints anything, but the output when I send a KeyboardInterrupt signal indicates that the problem lies in getting the results from the workers:

test
^CTraceback (most recent call last):
  File "./test.py", line 44, in <module>
Process Test-1:
    _ = test.join()
  File "./test.py", line 34, in join
    super(Test, self).join(timeout)
  File "/path/to/multiprocessing/process.py", line 124, in join
    res = self._popen.wait(timeout)
  File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "./test.py", line 25, in run
    self.arg = [item for item in done.get()]
  File "/path/to/multiprocessing/pool.py", line 638, in get
    self.wait(timeout)
  File "/path/to/multiprocessing/pool.py", line 635, in wait
    self._event.wait(timeout)
  File "/path/to/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/path/to/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

Why doesn't the stupid process stupid exit? The only thing a worker does is a single dereference and function call that executes one operation, it should be really simple.

I forgot to mention: This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.

anoneemus
  • 399
  • 3
  • 14

1 Answers1

2
  1. Your goal is to do this work asynchronously. Why not spawn the asynchronous subprocess workers from your main process WITHOUT spawning a child process (class Test)? The results will be available in your main process and no fancy stuff needs to be done. You can stop reading here if you choose to do this. Otherwise, read on.

  2. Your join is running forever because there are two separate pools, one when you create the process object (local to your main process), and another when you fork the process by calling process.start() (local to the spawned process)

For example, this doesn't work:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared
    self.pool = Pool()

def run(self):
    iterable = list(range(10))
    self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    self.pool.close()

However, this works:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared

def run(self):
    pool = Pool()
    iterable = list(range(10))
    self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    pool.close()

This has to do with the fact that when you spawn a process, the entire code, stack, and heap segments of your process is cloned into the process such that your main process and subprocess have separate contexts.

So, you are calling join() on the pool object created local to your main process, and that calls close() on the pool. Then, in run() there's another pool object that was cloned into the subprocess when start() was called, and that pool was never closed and cannot be joined in the way you're doing it. Simply put, your main process has no reference to the cloned pool object in the subprocess.

This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.

Makes sense, because threads differ from processes in that they have independent call stacks, but share the other segments of memory, so any updates you make to an object created in another thread is visible in your main process (which is the parent of these threads) and vice versa.

Resolution is to create the pool object local to the run() function. Close the pool object in the subprocess context, and join the subprocess in the main process. Which brings us to #2...

  1. Shared state: There are these multiprocessing.Manager() objects that allow for some sort of magical process-safe shared state between processes. Doesn't seem like the manager allows for reassignment of object references, which makes sense, because if you reassign the managed value in a subprocess, when the subprocess is terminated, that process context (code, stack, heap) disappears and your main process never sees this assignment (since it was done referencing an object local to the context of the subprocess). It may work for ctype primitive values, though.

If someone more experienced with Manager() wants to chime in on its innards, that'd be cool. But, the following code gives you your expected behavior:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg, shared):
        super(Test, self).__init__()
        self.arg = arg
        self.quest = Quest()
        self.shared = shared

    def run(self):
        with Pool() as pool:
            iterable = list(range(10))
            self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
            print("1" + str(self.shared)) # can remove, just to make sure we've updated state

    def __str__(self):
        return str(self.arg)

with Manager() as manager:
    res = manager.list()
    test = Test("test", res)

    print(test) # should print 'test' (and does)

    test.start()
    test.join()

    print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

Outputs:

rpg711$ python multiprocess_async_join.py 
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
TTT
  • 1,952
  • 18
  • 33
  • Thanks for the answer, I'm still checking it out and I'll get back to this once I know my problem is solved. You asked why I bothered spawning another process: this MCVE doesn't quite capture the full scope of what I'm trying to accomplish. I'm creating about 500 complex connections using 6 sockets each, and performing some calculations on the data sent and received. When I tried this using only threads, I found that with a high enough latency, I could never print results, because lower-latency hosts would always take precedence for GIL. Hence a compute-bound process that uses io-bound workers – anoneemus Mar 21 '18 at 16:24
  • For anyone stumbling across this: I can't testify to the use of `Manager`s - because I opted to use `Pipe`s instead - but the bit about the pool duplication between process led me to my solution. – anoneemus Mar 21 '18 at 17:19