0

I have a basic question that ragards the Python multiprocessing method, how different processes, which use queues to transfer data, could optimally be started.

For that I use a simple example where

  1. Data is received
  2. Data is processed
  3. Data is send

All of the upper steps should happen in parallel through three different processes.

Here the example code:

import multiprocessing
import keyboard
import time

def getData(queue_raw):
    for num in range(1000):
        queue_raw.put(num)
        print("getData: put "+ str(num)+" in queue_raw")
    while True:
        if keyboard.read_key() == "s":
            break

def calcFeatures(queue_raw, queue_features):
    while not queue_raw.empty():
        data = queue_raw.get()
        queue_features.put(data**2)
        print("calcFeatures: put "+ str(data**2)+" in queue_features")

def sendFeatures(queue_features):
    while not queue_features.empty():
        feature = queue_features.get()
        print("sendFeatures: put "+ str(feature)+" out")

if __name__ == "__main__":

    queue_raw = multiprocessing.Queue()
    queue_features = multiprocessing.Queue()

    processes = [

        multiprocessing.Process(target=getData, args=(queue_raw,)),
        multiprocessing.Process(target=calcFeatures, args=(queue_raw, queue_features,)),
        multiprocessing.Process(target=sendFeatures, args=(queue_features,))
    ]

    processes[0].start()
    time.sleep(0.1)
    processes[1].start()
    time.sleep(0.1)
    processes[2].start()

    #for p in processes:
    #    p.start()
    for p in processes:
        p.join()

This program works, but my question is regarding the start of the different processes. Ideally process[1] should start only if process[0] put data in the queue_raw; while process[2] should only start if process[1] put the calculated features in queue_features.

Right now I did that through time.sleep() function, which is suboptimal, since I don't necessarily know how long the processes will take. I also tried something like:

processes[0].start()
while queue_raw.empty():
    time.sleep(0.5)
processes[1].start()

But it won't work, since only the first process is estimated. Any method how this process depending starts could be done?

Merk
  • 171
  • 12
  • Start all the processes and design them as a loop of infinite sleep unless there is some work to do (data found in the respective queue)? – deponovo Nov 15 '21 at 09:44
  • `Queue.get()` should block if there is no data in the queue. Are you sure you need to sleep there? (https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.get) – moooeeeep Nov 15 '21 at 09:55
  • @moooeeeep Yes, based on the documentation this would not be the case. But if I leave the sleep statements out, the program rather does not execute the other processes, or it does it, but since the queue is still empty the processes are already finished. Also the keyboard break then does not work. I tested the same also without the print statements, but it still won't work. So I think I am generally missing a basic concept how processes are started when the passed queues are interchangeably used between the processes – Merk Nov 15 '21 at 11:04
  • You should change the exit condition to `while True` then, or check some other flag to notify processes to exit. – moooeeeep Nov 15 '21 at 11:49
  • Example: https://stackoverflow.com/questions/48569731/detect-when-multiprocessing-queue-is-empty-and-closed – moooeeeep Nov 15 '21 at 11:53
  • Thanks for your comment @moooeeeep. That solved the problem. I put the answer in pseudo code below – Merk Nov 15 '21 at 12:30

1 Answers1

1

@moooeeeep pointed out the right comment. Checking with while not queue.empty(): is not waiting till data is actually in the queue!

An approach via a sentinel object (here None) and a while True loop will enforce that the process waits till the other processes put data in the queue:

FLAG_STOP=False
while FLAG_STOP is False:
    data = queue_raw.get()  # get will wait
    if data is None:
        # Finish analysis
        FLAG_STOP = True
    else:
        # work with data
Merk
  • 171
  • 12