3

I am making a project which collects data from clients sensors, processes the gathered data and sends it back to clients. There can be multiple clients asking to receive some data from our server at the same time, so I had to implement multiprocessing. I can't use Threads because there are certain variables that must be client independent. If I did, my code would probably get very complicated to read and upgrade, and I don't want that. So I decided to use Processes, but now there is some data that needs to be sheared between parent and child Processes. After some research, I found that Pipe communication would satisfy my requirements.

The following code successfully sends data from parent to child Process, child updates the data and sends it back to the parent. But it is working only because of the sleep() function that stops the parent from using the pipe at the same time as the child.

How can it be changed so it does the same, but without the sleep() function for which I believe it will most probably cause problems in the future?

from multiprocessing import Process, Pipe
import time

def update_data(pipe):
    p_out, p_in = pipe
    L = []
    while True:
        message = p_out.recv()
        if message=='FINISHED':
            break       
        L.append(message)      

    L.append(['new data'])       #updating received data
    writer(L, p_in)              #sending received data to parent Process
    p_in.close()

def writer(i, p_in):
    p_in.send(i)
    p_in.send('FINISHED')

L = ['0' for i in range(10)]     #current data
if __name__=='__main__':
    p_out, p_in = Pipe()
    update_data_process = Process(target=update_data, args=((p_out, p_in),))
    update_data_process.start()    
    writer(L, p_in)              #sending current data to child Process
    time.sleep(3)                #needs to be changed
    while True:
        message = p_out.recv()
        if message != 'FINISHED':
            L = message
        else:
            break
    print(L)
    p_in.close()
    update_data_process.join()
general_418
  • 43
  • 1
  • 5
  • I think your decision to use multiple processes may have been misguided. Try using a datastructure like a map/dictionary to look up each client (by id/guid) then store the actual client details in a class / a nested dictionary. – eddiewould Mar 11 '19 at 22:24
  • The multiprocessing library is usually used when you have compute-bound work that you want to run in parallel. – eddiewould Mar 11 '19 at 22:26
  • I think I didn't explain the problem as good as I could. The data gathered from the clients sensors is processed during only a period of time when a clients trigger is activated. This period of time when the data is processed can last for more then a few hours. If there are 2 or more triggers activated, the program has to do the same work, but with different data because of the different clients. That's the reason of multiprocessing implementation. I thought it would be better to run the processes in parallel, rather than one by one. Hope this changes things, still a newbie in this area. – general_418 Mar 11 '19 at 23:20

1 Answers1

5

You have the issue because you are treating the connections like if they were simplex, but Pipe() by default returns duplex (two-way) connections. This means when you call parent_conn, child_conn = Pipe(), you get one connection, only the parent should use for reads and writes and another such connection object for the child. Parent and child only operate upon their connection objects.

from multiprocessing import Process, Pipe
from datetime import datetime

SENTINEL = 'SENTINEL'


def update_data(child_conn):

    result = []

    for msg in iter(child_conn.recv, SENTINEL):
        print(f'{datetime.now()} child received {msg}')
        result.append(msg)

    print(f'{datetime.now()} child received sentinel')
    result.append(['new data'])
    writer(child_conn, result)
    child_conn.close()


def writer(conn, data):
    conn.send(data)
    conn.send(SENTINEL)


if __name__=='__main__':

    parent_conn, child_conn = Pipe()  # default is duplex!
    update_data_process = Process(target=update_data, args=(child_conn,))
    update_data_process.start()

    data = ['0' for i in range(3)]
    writer(parent_conn, data)

    for msg in iter(parent_conn.recv, SENTINEL):
        print(f'{datetime.now()} parent received {msg}')

    print(f'{datetime.now()} parent received sentinel')
    parent_conn.close()
    update_data_process.join()

Example Output:

2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel

Process finished with exit code 0

In case you are unfamiliar with the use of iter(callable, sentinel), look here.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • I only need parent to pass data to child. How should I do it? @Darkonaut – tomeda Mar 23 '20 at 15:23
  • @tomeda `r_conn, w_conn = Pipe(False)` will give you a simplex connection ([docs](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe)), then give the child the read end. – Darkonaut Mar 23 '20 at 16:05
  • Thanks a lot. Is there a way to keep the connection open? it doesn't solve my problem, because I have to close the connection in order for the while block to keep running. But if I close connection it means I no longer will be able to pass data in the future. If I don't clsoe it will be stuck waiting for new data.. – tomeda Mar 24 '20 at 02:38
  • @tomeda Instead of closing the connection you can send some special value like the `SENTINEL` in my answer, the child checks that value and decides what to do. – Darkonaut Mar 24 '20 at 10:42