I use a list of processes with queues for each one. Another thread is used to fill these queues one after the other and the processes fetch the data from it. The problem is that after a while the queues raise an empty exception from within the processes but the thread get a full exception. When I check the queue size it is consistent with the exceptions. To make it worse this behavior can only be reproduced as part of a large code base, i can’t generate a small program to reproduce this. Anyone had similar issues with multiprocessing queues not being consistent in different processes?
Edit
To add more to the description of the pipeline. I have multiple worker objects, each worker has an input queue (multiprocessing.Queue
), a worker queue (multiprocessing.Queue
), an output queue (threading.Queue
), a worker process (multiprocessing.Process
) and a manager thread (threading.Thread
).
Against all these workers, I have a single feeder thread (threading.Thread
) that adds sample identifiers to the input queues of all workers, one by one. The sample identifiers are very small in size (paths of files) so the feeder thread can keep up with the processes.
The worker gets the sample identifiers from the input queue, reads these samples, processes them and puts them into the worker queue on by one. The manager thread reads the data in the worker queues and puts it into the output queue because multiprocessing.Queue
is slower on read.
All .get()
and .put()
calls have timeouts and I keep track of time it takes to get new data from this pipeline. I also have mechanisms for closing it and reopening it, by joining all processes and threads (even for queues) and then recreating all of them from scratch. When everything is working, the main process goes over the workers and reads the data off of their output queue one by one. It also takes a few ms to read new data most of the time.
This whole pipeline exists two times in my code (used for machine learning with Tensorflow). One instance is used for training and is created close to the beginning of the program, the other is used for testing. The second instance is created after a while of training, it goes over all of my dataset and then resets. When the second instance is run for the second time it gets stuck after 1000 samples or so. When it is stuck and I break on debug mode in the main process, I see that the input queue is full and the worker and output queues are empty. When I then break inside one of the worker processes I see that their input queue is empty. It seems like for some reason the worker process sees a different input queue than it should. Note that this is not some race issue because this result is stable.
Edit 2
I zeroed in on the point that the program hangs on. It seems like performing json.loads()
on read file data. This means that the problem is different than what originally described. The processes hang and don't see an empty queue.
code for opening the file:
with open(json_path, 'r') as f:
data = f.read()
json_data = json.loads(data) # <== program hangs at this line
I tried using signal.alarm
package to pinpoint where in json.loads()
the program hangs but it doesn't raise the exception. The problem is reproduced with a single multiprocessing.Process
as well, but not when all processing is done in the main process.
Rings a bell to anyone?