0

I am running into some deadlock with the following code.

import time
from multiprocessing import Process, Queue
import numpy as np


def f(q):
    q.put([(np.random.rand(50000), 0.993) for _ in range(10000)])

def g(q):
    time.sleep(3)  # to make both functions take approximately the same amount of time
    q.put('X' * 100000)


if __name__ == '__main__':

    print([(np.random.rand(50000), 0.993) for _ in range(10000)])

    queue = Queue()
    p = Process(target=g, args=(queue,))
    p.start()
    obj = queue.get()
    p.join()   # after get() as suggested in the docs
    print("Done g")
    print(obj)

    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    obj = queue.get()
    p.join()  # after get() as suggested in the docs
    print("Done f")
    print(obj)

The function g seems to be computed easily in the subprocess whereas running the process computing f results in a deadlock (it seems).

I am aware that this is similar to enter link description here. However, the accepted answer suggests either switching q.get() and p.join() or removing p.join() entirely. Doing either does not solve my problem.

Furthermore, the print statement in the beginning shows that doing whatever f is doing should not take a long time.

UPDATE: I realise that the reason for this might be that the result of f is too big in size. If that is the case I am wondering what the standard way of exchanging large data between processes is.

julian
  • 73
  • 7
  • 1
    what OS and python version? I cannot replicate... the code does not hang, and the object is properly transferred as you make sure to `get` before `join`. You're not wrong though about this being a huge amount of data (over 4 billion bytes according to a [recursive `sys.getsizeof`](https://code.activestate.com/recipes/577504/)) – Aaron Sep 07 '22 at 21:49
  • I will take note: the call to `get` did take a few extra seconds given the size of `obj`, and I have a decently fast computer... – Aaron Sep 07 '22 at 21:57

1 Answers1

0

I can tell you that on my 8M Windows desktop the machine freezes even if the numpy array contains only 20,000 elements.

The following is the way I would code this so that it runs more quickly with less memory utilization:

import time
from multiprocessing import Process, Queue
import numpy as np

def f(q):
    # Instead of putting a list with 10,000 elements,
    # do 10,000 puts, which can be retrieved by the main process
    # one by one to aggregate the results. This should reduce the
    # amount of memory taken up by the queue since elements are
    # being taken off the queue by the main process as the child process is
    # putting them on the queue:
    for _ in 10000:
        q.put((np.random.rand(50000), 0.993))

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    t = time.time()
    p.start()
    obj = []
    for _ in 1000:
        obj.append(queue.get())
    p.join()   # after get() as suggested in the docs
    print("Done f", time.time() - t)
    #print(obj)

If you cannot know how many items will be put on the queue by f, then f should put a final sentinel item on the queue that signifies that there is no more data to be retrieved. This sentinel just has to be an object that cannot be mistaken for a actual data item. In this case None makes for an ideal sentinel:

import time
from multiprocessing import Process, Queue
import numpy as np

def f(q):
    # Instead of putting a list with 10,000 elements,
    # do 10,000 puts, which can be retrieved by the main process
    # one by one to aggregate the results. This should reduce the
    # amount of memory taken up by the queue since elements are
    # being taken off the queue by the main process as the child process is
    # putting them on the queue:
    for _ in 10000:
        q.put((np.random.rand(50000), 0.993))
    # Put a sentinel:
    q.put(None)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    t = time.time()
    p.start()
    obj = []
    while True:
        item = queue.get()
        # Sentinel?
        if item is None:
            break
        obj.append(item)
    p.join()   # after get() as suggested in the docs
    print("Done f", time.time() - t)
    #print(obj)
Booboo
  • 38,656
  • 3
  • 37
  • 60