0

I have some expensive long-running functions that I'd like to run on multiple cores. This is easy to do with multiprocessing. But I will also need to periodically run a function that calculates a value based on the state (global variables) of a specific process. I think this should be possible by simply spawning a thread on the subprocess.

Here's a simplified example. Please suggest how I can call procces_query_state().

import multiprocessing
import time

def process_runner(x: int):
    global xx
    xx = x
    while True:
        time.sleep(0.1)
        xx += 1  # actually an expensive calculation

def process_query_state() -> int:
    y = xx * 2 # actually an expenseive calculation
    return y

def main():
    processes = {}
    for x in range(10):
        p = multiprocessing.get_context('spawn').Process(target=process_runner, args=(x,))
        p.start()
        processes[x] = p
    while True:
        time.sleep(1)
        print(processes[3].process_query_state()) # this doesn't actually work

if __name__ == '__main__':
    main()
  • as for me when process runs `process_runner()` then it can't run any other function. And if you need some value from process then you should use queue to send it to main process. – furas Jun 23 '21 at 22:29
  • I think you expect too much. `Process` is not `RPC` (Remote Procedure Call) - [What is the current choice for doing RPC in Python?](https://stackoverflow.com/questions/1879971/what-is-the-current-choice-for-doing-rpc-in-python). It would need something more. Process would need one thread to get messages from other processes and others threads to runs functions at the same time. With `Process` you could use `queue` to send message to other process - and inside `process_runner` it would have to check if there is message in queue and run `process_query_state` and send result using also `queue – furas Jun 23 '21 at 23:25

1 Answers1

0

I see two problems:

  1. Process is not RPC (Remote Procedure Call) and you can't execute other function process_query_state from main process. You can only use queue to send some information to other process - but this process has to periodically check if there is new message.

  2. Process can run only one function so it would stop one function when it get message to run other function or it would have to run threads on new processes to run many functions at the same time.

EDIT: It may give other problem - if two functions will work at the same time on the same data then one can change value before other will use old value and this can create wrong results.


I created example which uses queues to send message to process_runner, and it periodically check if there is message and run process_query_state, and it send result back to main process.

Main process wait for result from selected porcess - it blocks code - but if you want to work with more processes then it would have to make it more complex.

import multiprocessing
import time

def process_query_state():
    y = xx * 2 # actually an expenseive calculation
    return y

def process_runner(x: int, queue_in, queue_out):
    global xx
    xx = x

    # reverse direction
    q_in = queue_out
    q_out = queue_in
    
    while True:
        time.sleep(0.1)
        xx += 1  # actually an expensive calculation

        # run other function - it will block main calculations 
        # but this way it will use correct `xx` (other calculations will not change it)
        if not q_in.empty():
            if q_in.get() == 'run':
                result = process_query_state()
                q_out.put(result)
                
def main():
    processes = {}
    for x in range(4):
        ctx = multiprocessing.get_context('spawn')
        q_in  = ctx.Queue()
        q_out = ctx.Queue()
        p = ctx.Process(target=process_runner, args=(x, q_in, q_out))
        p.start()
        processes[x] = (p, q_in, q_out)
        
    while True:
        time.sleep(1)
        q_in  = processes[3][1]
        q_out = processes[3][2]
        q_out.put('run')
        
        # non blocking version
        #if not q_in.empty():
        #    print(q_in.get())
        
        # blocking version
        print(q_in.get())

if __name__ == '__main__':
    main()
furas
  • 134,197
  • 12
  • 106
  • 148