0

I've read about implementing Multiprocessing (reference) and am wondering the best practice for when one function needs to be offset because it depends on the other process.

The first process records sensor data. The second process analyzes that data. I want the first process to run continuously so I don't have gaps in my sensor readings. The analyzing takes a little less time than the recording. My problem is that I can't kick off the second process until at least one iteration of the recording function has finished or else there won't be data to read.

Should I offset the loop or should I make some kind of queue for the analyzing function?

This would be my offsetting approach:

from multiprocessing import Process

if __name__ == '__main__':
    
    X = variableHoldingOutputFileNamefromRecordingFunc

    recordingFunc()         # first loop
    
    while (True)
        p1 = Process(target=recordingFunc)
        p1.start()
        p2 = Process(target=analyzingFunc, args=X)
        p2.start()
        p1.join()
        p2.join()

I am not sure how to make a queue, but I suppose I could the recordingFunc add its output file names to a list and then have the analyzingFunc go thru that list...

Sorry I am a noob. I am told this question should be more focused but I am not sure in what way the moderators mean.

rfii
  • 562
  • 3
  • 14

1 Answers1

2

The most basic implementation would indeed use a queue, which the multiprocessing module provides. Here's a relatively simple example I made. It simulates your "sensor reader" by generating a random number every second. The analyzer thread pulls sensor readings off the queue as they become available.

import time
from multiprocessing import Process, Queue

sensor_data_queue = Queue()


def reads_sensor_data():
    # Suppose we add a sensor reading every second; this simulates that. It runs 10 iterations. You could modify this
    # to run forever, or until something tells it to quit.

    for iteration in range(10):
        sensor_data_queue.put(random.random())  # Generate a random number.
        time.sleep(1)  # Sleep for 1 second

    sensor_data_queue.put(None)  # None means we're done.


def analyze_sensor_data():
    while 1:
        data = sensor_data_queue.get(block=True)
        if data is None:
            break
        else:
            print(f'Analyzing {data}... Beep, beep, boop... {data * 100}')
    print('All done!')


# Run the reader process in the background...
reader_process = Process(target=reads_sensor_data)
reader_process.start()
try:
    analyze_sensor_data()
finally:
    reader_process.join()

If you try running that, one process will generate 10 random numbers, while the other will multiply those "sensor readings" by 100:

# python3 sensors.py
Analyzing 0.043564774215778646... Beep, beep, boop... 4.356477421577864
Analyzing 0.7373508496315736... Beep, beep, boop... 73.73508496315736
Analyzing 0.1261496911219001... Beep, beep, boop... 12.61496911219001
Analyzing 0.42168268032346623... Beep, beep, boop... 42.168268032346624
Analyzing 0.5781951078143707... Beep, beep, boop... 57.81951078143707
Analyzing 0.5887940456986528... Beep, beep, boop... 58.87940456986528
Analyzing 0.9427267891363492... Beep, beep, boop... 94.27267891363492
Analyzing 0.7163872833606556... Beep, beep, boop... 71.63872833606555
Analyzing 0.4673419692094539... Beep, beep, boop... 46.734196920945394
Analyzing 0.7920286810885665... Beep, beep, boop... 79.20286810885665
All done!

That covers the basics. Although if you intend on running this in a more scalable way across, for instance, multiple servers and over a period of time, you would look into a more comprehensive tool called Celery.

Ken Kinder
  • 12,654
  • 6
  • 50
  • 70
  • Thanks!!! I appreciate all the detail. Am I correctly interpreting that the analyze func is not running in the background, which means that the script will not proceed until it detects the "none" flag? I think that's fine for my purpose, but I just wanted to check my understanding. I see that the for loop in the reader function can be altered to make it go forever and that the block=true option in the analyze function makes it go forever until it sees the "none" flag. – rfii Jul 15 '20 at 00:54
  • 1
    Right, as I wrote it, there's only two processes. You can think of the sensor data reading one as the background job and the analyzer as the foreground job. But if you wanted another, unrelated foreground job, you could use three processes instead, by creating an analyzer process just like how I created the background process. You are correct in your understanding though. – Ken Kinder Jul 15 '20 at 14:38
  • `sensor_data_queue.get` could also take a timeout argument, which does what it sounds like. It could wait, for example, 60 seconds and then give up. – Ken Kinder Jul 15 '20 at 14:39
  • Thanks again! This is tremendously helpful. When I tried to implement your structure to my code, I can put objects (paths) into the queue and get them. However, when I try to do so from another function in the same program it hangs up (no error message). I declared the queue as you did in the first line of my program before I even define main(). Do I need to do pip install XXXXX to make this work? Is the queue only accessible from a background process perhaps? – rfii Jul 15 '20 at 20:17
  • I tried to run your code (only thing I changed was adding two lines at the end `if __name__ == "__main__":` and afterwards `main()` but it seems to crash before any print lines – rfii Jul 15 '20 at 21:27
  • Hi Ken Kinder, could you please check out `https://stackoverflow.com/questions/62924158/python-multiprocessing-queue-seemingly-unaccessible` because your code seems work for others but in my case global variables changed by one process don't appear to be visible to the other process. – rfii Jul 15 '20 at 22:06
  • I am reading that global variables are not truly shared between different processes. `https://blog.ruanbekker.com/blog/2019/02/19/sharing-global-variables-in-python-using-multiprocessing/` Each gets passed a copy of it and the tests I am doing on my own Win10 machine seem to reflect that; however, I copy and pasted your same code into an online linux-based IDE and it works. I am kind of befuddled. – rfii Jul 16 '20 at 00:27
  • OK this seems to be OS dependent. For Win10 on Python 3.7.8, it seems to work if I pass the queue as an argument to the background process as `args=(sensor_data_queue,)` I have no idea why, but that seemingly extraneous comma is critical. The foreground function does not require the queue to be passed as an argument. I would really like to understand why this is the case. – rfii Jul 16 '20 at 00:52
  • 1
    Sure thing, I followed up and included an explanation about that comma. It appears Windows is a corner case on the multiprocessing module I wasn't aware of. – Ken Kinder Jul 16 '20 at 10:25