0

I use a list of processes with queues for each one. Another thread is used to fill these queues one after the other and the processes fetch the data from it. The problem is that after a while the queues raise an empty exception from within the processes but the thread get a full exception. When I check the queue size it is consistent with the exceptions. To make it worse this behavior can only be reproduced as part of a large code base, i can’t generate a small program to reproduce this. Anyone had similar issues with multiprocessing queues not being consistent in different processes?

Edit

To add more to the description of the pipeline. I have multiple worker objects, each worker has an input queue (multiprocessing.Queue), a worker queue (multiprocessing.Queue), an output queue (threading.Queue), a worker process (multiprocessing.Process) and a manager thread (threading.Thread).

Against all these workers, I have a single feeder thread (threading.Thread) that adds sample identifiers to the input queues of all workers, one by one. The sample identifiers are very small in size (paths of files) so the feeder thread can keep up with the processes.

The worker gets the sample identifiers from the input queue, reads these samples, processes them and puts them into the worker queue on by one. The manager thread reads the data in the worker queues and puts it into the output queue because multiprocessing.Queue is slower on read.
All .get() and .put() calls have timeouts and I keep track of time it takes to get new data from this pipeline. I also have mechanisms for closing it and reopening it, by joining all processes and threads (even for queues) and then recreating all of them from scratch. When everything is working, the main process goes over the workers and reads the data off of their output queue one by one. It also takes a few ms to read new data most of the time.

This whole pipeline exists two times in my code (used for machine learning with Tensorflow). One instance is used for training and is created close to the beginning of the program, the other is used for testing. The second instance is created after a while of training, it goes over all of my dataset and then resets. When the second instance is run for the second time it gets stuck after 1000 samples or so. When it is stuck and I break on debug mode in the main process, I see that the input queue is full and the worker and output queues are empty. When I then break inside one of the worker processes I see that their input queue is empty. It seems like for some reason the worker process sees a different input queue than it should. Note that this is not some race issue because this result is stable.

Edit 2

I zeroed in on the point that the program hangs on. It seems like performing json.loads() on read file data. This means that the problem is different than what originally described. The processes hang and don't see an empty queue. code for opening the file:

with open(json_path, 'r') as f:
    data = f.read()
json_data = json.loads(data)  # <== program hangs at this line

I tried using signal.alarm package to pinpoint where in json.loads() the program hangs but it doesn't raise the exception. The problem is reproduced with a single multiprocessing.Process as well, but not when all processing is done in the main process. Rings a bell to anyone?

Guy Zohar
  • 53
  • 8
  • Like I said, it is part of a very large codebase, and I can’t reproduce it without running a lot of it. It is part of an ML training script using Tensorflow. – Guy Zohar Oct 30 '18 at 18:54
  • It's not clear from what you write how you synchronize the feeding of the queues. If you use separate queues for every process, why shouldn't it be possible that one queue is empty while another is full? If some processes just take longer to process their input but you're feeding all queues with the same thread and (supposedly) with the same frequency, this sounds like a very likely scenario. – Darkonaut Oct 30 '18 at 19:16
  • I have a different queue for every process and a single feeder thread. The processing of each element takes much longer than what the feeder thread needs to keep up. What I meant is that the same queue looks full from the feeder thread but empty from its corresponding process (the one that extracts the data from it). – Guy Zohar Oct 30 '18 at 19:21
  • You are using `multiprocessing.Queue?` Did you specify `maxsize`? – Darkonaut Oct 30 '18 at 19:27
  • Yes i did. (Additional characters to fit minimum) – Guy Zohar Oct 30 '18 at 19:29
  • Why not ask for the queue size and avoid pushing to it if have reached its max? Just put the "pushing" thread/worker to sleep until some space becomes available in the queue. – Raydel Miranda Oct 30 '18 at 19:32
  • The problem is that the process that tries to get the data from the queue sees it as empty. Looks like something with the connection between the two processes breaks. – Guy Zohar Oct 30 '18 at 19:34
  • Would you let us know to which size you set `maxsize` it? ;) If it's a very low number it could be a reason. – Darkonaut Oct 30 '18 at 19:41
  • Why don't you let the worker block-wait with `.get()` instead of `.get_nowait()` for input? You could also catch the errors and handle them. You would need to import them from the `queue` module (`queue.Empty`and `queue.Full`) – Darkonaut Oct 30 '18 at 19:41
  • I used .get() with timeout. The problem is that the worker blocks forever. The size of the queue is 2 but again, at the same time, the feeder sees 2 elements in the queue while the worker sees 0. – Guy Zohar Oct 30 '18 at 19:44
  • Are you using multiprocessing.Queue? – Raydel Miranda Oct 30 '18 at 19:45
  • Yes I do.,.,.,. – Guy Zohar Oct 30 '18 at 19:46
  • I would recommend to debug and make shure the queue that looks empty is the same that's full. I only can guess (based in question and comments) that you have more than one queue. – Raydel Miranda Oct 30 '18 at 19:48
  • I have multiple queues. All of them look empty from worker processes and full from feeder thread. I’m past looking at queues in debug mode. – Guy Zohar Oct 30 '18 at 19:52
  • There is no "at the same time" for status-reads on queues from different processes. The time will always differ and the state can change in between. Also note the multiprocessing docs contain this warning "After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty." – Darkonaut Oct 30 '18 at 20:04
  • You would use a sentinel value within `iter(queue.get, SENTINEL)` for breaking a blocking `get()` in case you want to shut down the worker. [example](https://stackoverflow.com/a/52349697/9059420) – Darkonaut Oct 30 '18 at 20:05
  • The problem I see is not something that comes and goes, it doesn't look like a race or something like that. I also use `.get()` and `.put()` with timeouts. See the edit I added to further explain the system. – Guy Zohar Oct 31 '18 at 15:59
  • "The manager thread reads the data in the worker queues and puts it into the output queue because multiprocessing.Queue is slower on read." This doesn't make sense to me if I understand you right. The output queue is a `threading.Queue` how you say, you cannot pass messages to another process over such a queue. – Darkonaut Nov 01 '18 at 18:07
  • If your OS additionaly uses "fork" for process creation and you already are multi-threaded at that point, this would be another recipe for chaos. – Darkonaut Nov 01 '18 at 18:15
  • Note that there is a worker queue which is a multiprocessing.Queue and an output queue which is a threading.Queue. The manager thread reads data from the worker thread and puts it into the output thread. The manager thread lives in the main process, as well as the code that reads data from the output queue. – Guy Zohar Nov 01 '18 at 18:15
  • I’m on Linux. Therefore there is use of forking. Can you explain further? – Guy Zohar Nov 01 '18 at 18:17
  • I think I understand. You say that the order of starting the threads and processes matters. I should start threads after all processes are up. I’ll see if that might be the issue and update. – Guy Zohar Nov 01 '18 at 18:20
  • Forking creates a clone of your whole process. I've written an answer on this [here](https://stackoverflow.com/questions/52247576/attributeerror-dupfd-in-multiprocessing-resource-sharer-python-multiproces/52249401#52249401). Yes, starting threads after all processes are up would be one option. – Darkonaut Nov 01 '18 at 18:25
  • Seems like it doesn't solve the issue, I got a little further with the problem though. see edit. – Guy Zohar Nov 04 '18 at 14:02
  • Cannot replicate and I don't see how it could hang on `json.loads`. If you add `print(data, flush=True)` at the last line within the open-block, will it print? – Darkonaut Nov 04 '18 at 15:49

0 Answers0