4

I'm trying to enqueue a list of lists in a process (in the function named proc below), and then have the process terminate itself after I call event.set(). My function proc always finishes, judging by the printout, but the process itself is still going. I can get this to work if I make the number of lists enqueued in a call to put lower (batchperq variable) (or the size of each nested list smaller).

import multiprocessing as mp
import queue
import numpy as np
import time

def main():
    trainbatch_q = mp.Queue(10)

    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000 
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")

When I do a keyboard interrupt, I get the trace below... what could the "lock" it's trying to acquire be? Something to do with the queue?

Traceback (most recent call last):
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function
    _run_finalizers()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/software/Anaconda3-5.0.0.1-el7-x86_64/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
phdscm
  • 233
  • 1
  • 8
  • May be a duplicate of https://stackoverflow.com/questions/20742637/understanding-multiprocessing-shared-memory-management-locks-and-queues-in-pyt – mrQWERTY May 23 '18 at 03:22

1 Answers1

1

The reason your process is never exiting is because you're never telling it to exit. I added a return to the end of your function and your process appears to exit correctly now.

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return            # Added this line, You can have it return whatever is most relevant to you.

Here is the full program I ran including my changes to make it exit successfully.

import multiprocessing as mp
import queue
import numpy as np
import random
import time

def main():
    trainbatch_q = mp.Queue(10)
    batchperq = 50  
    event = mp.Event()

    tl1 = mp.Process(target=proc,
                            args=( trainbatch_q, 20, batchperq, event))
    print("Starting")
    tl1.start()
    time.sleep(3)
    event.set()
    tl1.join()
    print("Never printed..")    

def proc(batch_q, batch_size, batchperentry, the_event):
    nrow = 100000
    i0 = 0
    to_q = []
    while i0 < nrow:
        rowend = min(i0 + batch_size,nrow)
        somerows = np.random.randint(0,5,(rowend-i0,2))
        to_q.append(somerows.tolist())  
        if len(to_q) == batchperentry:
            print("adding..", i0, len(to_q))
            while not the_event.is_set():
                try: 
                    batch_q.put(to_q, block=False)
                    to_q = []
                    break
                except queue.Full:
                    time.sleep(1)
        i0 += batch_size                    
    print("proc finishes")
    return      # Added this line, You can have it return whatever is most relevant to you.

if __name__ == "__main__":
  main()

Hope this is helps.

K-Log
  • 608
  • 3
  • 18
  • This just makes the main complete, but it doesn't make the process actually stop, right? I think I could accomplish the same with tl1.daemon = True, right? – phdscm May 23 '18 at 03:35
  • I believe so. Essentially, you are just stop the process from blocking after the timeout You could try calling terminate on the process or check exitcode to see if it has exited but be **careful** when using terminate. – K-Log May 23 '18 at 03:40
  • Thanks I will definitely do that. But my main question is why my process is not finishing. – phdscm May 23 '18 at 03:42
  • @phdscm Take a look at my new answer. While my previous answer works, this is probably more what you're looking for. – K-Log May 23 '18 at 03:51
  • hm, adding `return True` to the end didn't change anything for me. – phdscm May 23 '18 at 04:52
  • @phdscm I changed my answer to include the full program I ran which may be different to how you are calling your functions so maybe try that and see if it works for you. – K-Log May 23 '18 at 05:14
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/171592/discussion-between-k-log-and-phdscm). – K-Log May 23 '18 at 05:19
  • I tried running it as a script but I get the same result. Feel free to chat me if you have another thought. – phdscm May 23 '18 at 11:47