0

I'm implementing a multi-processed web crawler. I used Manager().Queue() to create a queue of file names that the crawler will download. All processes share the same queue, and they also do put() and get() operations on the queue. As I read about the Queue() class, it is thread-safe and can make my queue a blocking queue, and this resolves the problem of a process trying to get() an item from an empty queue. However, another problem occurs: when the queue finally has only a few items, some process, say process 1, detects that it is nonempty, and then all these items are popped by another process, say process 2, and then process 1 tries to get() an item from the queue but finds that it's empty, so its get() call will block forever.

Attempted solution: allow a timeout for get(), so when timeout occurs, get() will raise a "queue empty" exception, and after this make the process exit

But it seems this solution couldn't work, because sometimes the program will be stuck on the process.join() stages.

Could anyone help me with it? My English is not that good, but I would really appreciate if you could think about this problem. Thank you.

  • 1
    Don't use `queue.empty` checks for multiple consumer scenarios. It's an anti-pattern because checking and then acting upon the result of this check is not an atomic operation without a lock around it. My answer [here](https://stackoverflow.com/a/53132779/9059420) describes how to `get()` from a `Multiprocessing.Queue`. The question there is not a perfect fit because `Manager.Queue` is a threading-queue (`queue.Queue`) under the hood, but the solution is the same... – Darkonaut Dec 14 '18 at 23:43
  • Don't "check() and get()", just "get()" and pass a sentinel as final value for breaking the get-loop. For x consumers, pass x sentinels. – Darkonaut Dec 14 '18 at 23:44
  • Thank you! I solved the problem in your way. – vicissitude1999 Dec 20 '18 at 00:17
  • You're welcome and thanks for your feedback! – Darkonaut Dec 20 '18 at 00:19

0 Answers0