2

I work on a machine learning input pipeline. I wrote a data loader that reads in data from a large .hdf file and returns slices, which takes roughly 2 seconds per slice. Therefore I would like to use a queue, that takes in objects from several data loaders and can return single objects from the queue via a next function (like a generator). Furthermore the processes that fill the queue should run somehow in the background, refilling the queue when it is not full. I do not get it to work properly. It worked with a single dataloader, giving me 4 times the same slices..

import multiprocessing as mp

class Queue_Generator():
    def __init__(self, data_loader_list):
        self.pool = mp.Pool(4)
        self.data_loader_list = data_loader_list
        self.queue = mp.Queue(maxsize=16)
        self.pool.map(self.fill_queue, self.data_loader_list)
    def fill_queue(self,gen):
        self.queue.put(next(gen))
    def __next__(self):
        yield self.queue.get()

What I get from this: NotImplementedError: pool objects cannot be passed between processes or pickled Thanks in advance

2 Answers2

1

Your specific error means that you cannot have a pool as part of your class when you are passing class methods to a pool. What I would suggest could be the following:

import multiprocessing as mp
from queue import Empty


class QueueGenerator(object):
    def __init__(self, data_loader_list):
        self.data_loader_list = data_loader_list
        self.queue = mp.Queue(maxsize=16)

    def __iter__(self):
        processes = list()
        for _ in range(4):
            pr = mp.Process(target=fill_queue, args=(self.queue, self.data_loader_list))
            pr.start()
            processes.append(pr)
        return self

    def __next__(self):
        try:
            return self.queue.get(timeout=1) # this should have a value, otherwise your loop will never stop. make it something that ensures your processes have enough time to update the queue but not too long that your program freezes for an extended period of time after all information is processed
        except Empty:
            raise StopIteration

# have fill queue as a separate function
def fill_queue(queue, gen):
    while True:
        try:
            value = next(gen)
            queue.put(value)
        except StopIteration: # assumes the given data_loader_list is an iterator
            break
    print('stopping')


gen = iter(range(70))

qg = QueueGenerator(gen)


for val in qg:
    print(val)
# test if it works several times:
for val in qg:
    print(val)

The next issue for you to solve I think is to have the data_loader_list be something that provides new information in every separate process. But since you have not given any information about that I can't help you with that. The above does however provide you a way to have the processes fill your queue which is then passed out as an iterator.

Marc
  • 1,539
  • 8
  • 14
  • That looks good. I'll try it today, thank you so much. The data_loader objects returns random crops of a 3D image, so they should provide new information in each process. – Lorenz Rumberger Nov 13 '19 at 16:10
  • Works like a charm. Thanks for the fast solution. The next problem i ran into was that the processes gave back identical image crops, since np.random.randint() generates a pseudo random number based on time and the processes run sychronous. Therefore different seeds must be set for each rng as described in https://stackoverflow.com/questions/24345637/why-doesnt-numpy-random-and-multiprocessing-play-nice – Lorenz Rumberger Nov 13 '19 at 17:03
0

Not quite sure why you are yielding in __next__, that doesn't look quite right to me. __next__ should return a value, not a generator object.

Here is a simple way that you can return the results of parallel functions as a generator. It may or may not meet your specific requirements but can be tweaked to suit. It will keep on processing data_loader_list until it is exhausted. This may use a lot of memory compared to keeping, for example, 4 items in a Queue at all times.

import multiprocessing as mp


def read_lines(data_loader):
    from time import sleep
    sleep(2)
    return f'did something with {data_loader}'


def make_gen(data_loader_list):
    with mp.Pool(4) as pool:
        for result in pool.imap(read_lines, data_loader_list):
            yield result


if __name__ == '__main__':
    data_loader_list = [i for i in range(15)]
    result_generator = make_gen(data_loader_list)
    print(type(result_generator))

    for i in result_generator:
        print(i)

Using imap means that the results can be processed as they are produced. map and map_async would block in the for loop until all results were ready. See this question for more.

FiddleStix
  • 3,016
  • 20
  • 21