0

First of all I apologize if the title is bit weird but i literally could not think of how to put into a single line the problem i am facing.

So I have the following code

import time
from multiprocessing import Process, current_process, Manager
from multiprocessing import JoinableQueue as Queue

# from threading import Thread, current_thread
# from queue import Queue


def checker(q):
    count = 0
    while True:
        if not q.empty():
            data = q.get()
            # print(f'{data} fetched by {current_process().name}')
            # print(f'{data} fetched by {current_thread().name}')
            q.task_done()
            count += 1
        else:
            print('Queue is empty now')
            print(current_process().name, '-----', count)
            # print(current_thread().name, '-----', count)


if __name__ == '__main__':
    t = time.time()
    # m = Manager()
    q = Queue()
    # with open("/tmp/c.txt") as ifile:
    #     for line in ifile:
    #         q.put((line.strip()))
    for i in range(1000):
        q.put(i)
    time.sleep(0.1)
    procs = []
    for _ in range(2):
        p = Process(target=checker, args=(q,), daemon=True)
        # p = Thread(target=checker, args=(q,))
        p.start()
        procs.append(p)
    q.join()
    for p in procs:
        p.join()

Sample outputs

1: When the process just hangs

Queue is empty now
Process-2 ----- 501
output hangs at this point

2: When everything works just fine.

Queue is empty now
Process-1 ----- 515
Queue is empty now
Process-2 ----- 485

Process finished with exit code 0

Now the behavior is intermittent and happens sometimes but not always.

I have tried using Manager.Queue() as well in place of multiprocessing.Queue() but no success and both exhibits same issue.

I tested this with both multiprocessing and multithreading and i get exactly same behavior, with one slight difference that with multithreading the rate of this behavior is much less compared to multiprocessing.

So I think there is something I am missing conceptually or doing wrong, but i am not able to catch it now since I have spent way too much time on this and now my mind is not seeing something which may be very basic.

So any help is appreciated.

Rohit
  • 3,659
  • 3
  • 35
  • 57
  • Apparently if you call `join()` before the queue `q` is empty, there is potential deadlock: https://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em – TuanDT Feb 04 '19 at 13:05
  • Even if i add a `q.join()` before joining the processes, it still does not work as expected. – Rohit Feb 04 '19 at 13:58
  • Maybe that's because `multiprocessing.Queue` doesn't have any method called `join()`. From the doc: `Queue implements all the methods of queue.Queue except for task_done() and join().` The first "Queue" refers to `multiprocessing.Queue` which you use in the above example. – TuanDT Feb 04 '19 at 14:22
  • I tried with `JoinabaleQueue` – Rohit Feb 04 '19 at 14:23
  • @TuanDT for your information just updated the question. – Rohit Feb 04 '19 at 14:49

1 Answers1

0

I believe you have a race condition in the checker method. You check whether the queue is empty and then dequeue the next task in separate steps. It's usually not a good idea to separate these two kinds of operations without mutual exclusion or locking, because the state of the queue may change between the check and the pop. It may be non-empty, but another process may then dequeue the waiting work before the process which passed the check is able to do so.

However I generally prefer communication over locking whenever possible; it's less error prone and makes one's intentions clearer. In this case, I would send a sentinel value to the worker processes (such as None) to indicate that all work is done. Each worker then just dequeues the next object (which is always thread-safe), and, if the object is None, the sub-process exits.

The example code below is a simplified version of your program, and should work without races:

def checker(q):
    while True:
        data = q.get()
        if data is None:
            print(f'process f{current_process().name} ending')
            return
        else:
            pass # do work

if __name__ == '__main__':
    q = Queue()
    for i in range(1000):
        q.put(i)
    procs = []
    for _ in range(2):
        q.put(None) # Sentinel value
        p = Process(target=checker, args=(q,), daemon=True)
        p.start()
        procs.append(p)
    for proc in procs:
        proc.join()
bnaecker
  • 6,152
  • 1
  • 20
  • 33
  • Can you explain what u mean by queues are thread safe, because that means we don't have to put locks around queues when used in thread function. Also i did fetch the value from queue in same step in which I check if q is not empty. So how is separate step? – Rohit Feb 04 '19 at 15:42
  • *Thread-safe* means that an operation can be used from multiple threads (or processes) without a race condition. So no explicit locking is required. And you *do* have separate steps: You first called `if not q.empty()`, and on the next line called `q.get()`. In between these two lines, another process may retrieve the next item from the queue, meaning this process would "think" the queue has data on it, but in fact it's empty by the time it tries to retrieve it. – bnaecker Feb 04 '19 at 16:23
  • I don't get it, because if queues thread safe, then how does `q.empty` and `q.get` result in race conditions because these are methods on queues. – Rohit Feb 04 '19 at 16:44
  • The *individual methods* on queues are thread-safe, but that doesn't mean any sequence of operations on them is thread-safe. The latter idea could not possibly work, because we can put together operations in different sequences, right? How could the queue "know" that some set of operations constitutes a thread-safe sequence? There would have to be some sort of lock which says, "Until I unlock, all operations are thread-safe". But that is by definition a mutual exclusion mechanism, which is separate from the queue itself. – bnaecker Feb 04 '19 at 16:56
  • But as far as I know queues have mutex implemented in them which makes them thread safe. Also one would only get items from queue after ensuring if queue is not empty, so i assume thats correct sequence – Rohit Feb 04 '19 at 17:04
  • Oh i see, so you are seeing if q.empty and q.get are used in tandem they might end up making a race conditions, so then why to use q.empty at all because q.get have to be used almost all times. – Rohit Feb 04 '19 at 17:08
  • 1
    Yes, that's correct. `q.empty()` can return `True`, but a subsequent `q.get()` call can still block, because the queue has been emptied by another process in between them. This is a textbook race condition. You *could* add mutual exclusion, but it will most likely be faster, clearer, and less error-prone to add a sentinel, as I've done in my example. – bnaecker Feb 04 '19 at 18:48