1

Is there any option to have a multiprocessing Queue where each value can be accessed twice?

My problem is I have one "Generator process" creating a constant flux of data and would like to access this in two different process each doing it's thing with the data.

A minimal "example" of the issue.

import multiprocessing as mp
import numpy as np

class Process1(mp.Process):
    def __init__(self,Data_Queue):
        mp.Process.__init__(self)
        self.Data_Queue = Data_Queue

    def run(self):
        while True:
            self.Data_Queue.get()
            # Do stuff with  
            self.Data_Queue.task_done()

class Process2(mp.Process):
    def __init__(self,Data_Queue):
        mp.Process.__init__(self)
        self.Data_Queue = Data_Queue

    def run(self):
        while True:
            self.Data_Queue.get()
            # Do stuff with  
            self.Data_Queue.task_done()

if __name__ == "__main__":

    data_Queue = mp.Queue()

    P1 = Process1()
    P1.start()
    P2 = Process2()
    P2.start()

    while True: # Generate data
        data_Queue.put(np.random.rand(1000))

The idea is that I would like for both Process1 and Process2 to access all generated data in this example. What would happen is that each one would only just get some random portions of it this way.

Thanks for the help!

Update 1: As pointed in some of the questions and answers this becomes a little more complicated for two reasons I did not include in the initial question.

  • The data is externally generated on a non constant schedule (I may receive tons of data for a few seconds than wait minutes for more to come)
  • As such, data may arrive faster than it's possible to process so it would need to be "Queued" in a way while it waits for its turn to be processed.
Sicks
  • 57
  • 6
  • 1
    What's wrong with using two queues? Also reduces contention – Homer512 Nov 03 '22 at 17:23
  • On my specific case the "Generated data" is a very large 3D array that arrives every 200ms more or less. I believe coping all this data twice would be more slow than it already is. – Sicks Nov 03 '22 at 21:46
  • 1
    In that case you might not want to put the array in there in the first place but put it into shared memory and use the queues just to pass small handles/notifiers. There are several posts on that; here is one at random: https://stackoverflow.com/questions/17785275/share-large-read-only-numpy-array-between-multiprocessing-processes?noredirect=1&lq=1 – Homer512 Nov 03 '22 at 22:16
  • That's an interesting suggestion. Thanks! However, this would create a couple other problems with data arriving either to slow (same bunch would be used multiple times, but that's easy to fix), or more likely arriving too fast while the last one is still being processed. On another note the shared memory may be good solution in my case as this data is actually already a ctypes pointer. so it may be faster to make it a shared memory? Maybe I could just share the pointer around the processes? Not sure if that even makes sense. – Sicks Nov 03 '22 at 23:52

1 Answers1

1

One way to solve your problem is, first, to use multiprocessing.Array to share, let's say, a numpy array with your data between worker processes. Second, use a multiprocessing.Barrier to synchronize the main process and the workers when generating and processing data batches. And, finally, provide each process worker with its own queue to signal them when the next data batch is ready for processing. Below is the complete working example just to show you the idea:

#!/usr/bin/env python3
import os
import time
import ctypes
import multiprocessing as mp

import numpy as np


WORKERS = 5
DATA_SIZE = 10
DATA_BATCHES = 10


def process_data(data, queue, barrier):
    proc = os.getpid()
    print(f'[WORKER: {proc}] Started')

    while True:
        data_batch = queue.get()

        if data_batch is None:
            break

        arr = np.frombuffer(data.get_obj())
        print(f'[WORKER: {proc}] Started processing data {arr}')
        time.sleep(np.random.randint(0, 2))
        print(f'[WORKER: {proc}] Finished processing data {arr}')

        barrier.wait()

    print(f'[WORKER: {proc}] Finished')


def generate_data_array(i):
    print(f'[DATA BATCH: {i}] Start generating data... ', end='')
    time.sleep(np.random.randint(0, 2))
    data = np.random.randint(0, 10, size=DATA_SIZE)
    print(f'Done! {data}')

    return data


if __name__ == '__main__':
    data = mp.Array(ctypes.c_double, DATA_SIZE)
    data_barrier = mp.Barrier(WORKERS + 1)
    workers = []

    # Start workers:
    for _ in range(WORKERS):
        data_queue = mp.Queue()
        p = mp.Process(target=process_data, args=(data, data_queue, data_barrier))
        p.start()
        workers.append((p, data_queue))

    # Generate data batches in the main process:
    for i in range(DATA_BATCHES):
        arr = generate_data_array(i + 1)
        data_arr = np.frombuffer(data.get_obj())
        np.copyto(data_arr, arr)

        for _, data_queue in workers:
            # Signal workers that the new data batch is ready:
            data_queue.put(True)

        data_barrier.wait()

    # Stop workers:
    for worker, data_queue in workers:
        data_queue.put(None)
        worker.join()

Here, you start with the definition of the shared data array data and the barrier data_barrier used for the process synchronization. Then, in the loop, you instantiate a queue data_queue, create and start a worker process p passing the shared data array, the queue instance, and the shared barrier instance data_barrier as its parameters. Once the workers have been started, you generate data batches in the loop, copy generated numpy arrays into shared data array, and signal processes via their queues that the next data batch is ready for processing. Then, you wait on barrier when all the worker processes have finished their work before generate the next data batch. In the end, you send None signal to all the processes in order to make them quit the infinite processing loop.

constt
  • 2,250
  • 1
  • 17
  • 18
  • Thanks! Will try it out but compared to putting the data in a Queue this requires the data generation to be stopped while the data is being processed (so this is not applicable for external data sources where you don't control when they are sent). Also doing that kind takes the purpose away from doing a multiprocessing application. – Sicks Nov 04 '22 at 09:57
  • 1
    Nope, the data generation isn't necessarily to be stopped while the current batch is being processed. The data generation process can take place in a different process and generate data independently. Once the batch is processed by all the workers, just request the next one from a generator queue and feed it to the workers. – constt Nov 04 '22 at 11:36
  • 1
    Using two arrays for double buffering may also help. The producer can fill one array while the other is read by the consumers. Then flip the association – Homer512 Nov 04 '22 at 15:08