1

I would like to have two processes which run parallel. The one gets input from the other, processes the data and sends the processed data back as output for the other. The other process does the same thing. Obviously there needs to be a starting point and an end point.

How can I communicate between the processes while they're running? I only managed to run the two processes after each other.

I tried to solve it with multiprocessing:

from multiprocessing import Process, Queue, Array
sentinel = -1

def process1(q, arr):
    # Receives data, modifies it and sends it back
    while True:
        data = q.get() # Receive data

        if data is sentinel:
            break
    data *= 2 # Data modification
    arr.append(data) # Data logging
    q.put(data) # Send data
    q.put(sentinel) # Send end signal

def process2(q, arr):
    # Receives data, increments it and sends data back
    if q.empty():
        data = 1
    else:
        while True:
            data = q.get() # Receive data
            if data == sentinel:
                break

    data += 1
    q.put(data) # Send data
    arr.append(data) # Data logging
    q.put(sentinel) # Send end signal

if __name__ == "__main__":
    q = Queue()
    logbook = Array('i', 0)
    counter = 0
    while counter < 10:
        process_1 = Process(target=process1, args=(q, logbook))
        process_2 = Process(target=process2, args=(q, logbook))
        process_1.start()
        process_2.start()
        q.close()
        q.join_thread()
        process_1.join()
        process_2.join()
        counter += 1
    print(logbook)
John Kugelman
  • 349,597
  • 67
  • 533
  • 578
ceske
  • 65
  • 1
  • 7

4 Answers4

3

I tried to understand your need, but it is not fully clear to me, thus I propose this producer-consumer version of the code, where the two process communicate to reach the final result for a certain amount of iterations.

First of all you need two queues in order to avoid that the same process that puts the content into the queue reads it before the other one. Second, you need a mechanism to agree on the end of the computation, in this case a None message.

My proposed solution is summarised in the following code:

from multiprocessing import Process, Queue, Array

def process1(in_queue, out_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            out_queue.put(None)  # send feedback about END message
            out_queue.close()
            out_queue.join_thread()
            break

        data *= 2 # Data modification
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()

    process_1 = Process(target=process1, args=(q1, q2))
    process_2 = Process(target=process2, args=(q2, q1, 10))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()
Roberto Trani
  • 1,217
  • 11
  • 14
  • This solution covers what I intended to do! I didn't think about using two queues for this purpose. Thank you very much. – ceske Jan 29 '18 at 16:25
  • Could you briefly explain, why process1 can exit before the end message is sent? Could this be solved by using the keyword finally, so that the end message is sent, even if an error occurred? – ceske Jan 29 '18 at 16:46
  • @ceske I solved this ensuring to call `close` and `join` on `out_queue`. The problem could be that the process exits while the internal thread is writing the None message on the queue. Ensuring to join the internal thread this behaviour cannot happen anymore. – Roberto Trani Jan 29 '18 at 16:53
  • Happy to help, and welcome to Stack Overflow. If this answer or any other one solved your issue, please mark it as accepted. – Roberto Trani Jan 29 '18 at 17:05
  • Done! With your solution I am even able to log the communication between the two processes using a third queue. – ceske Jan 29 '18 at 17:33
0

You could use sockets for this task, or even a micro-service approach (by rest api calls for instance).

JonDel
  • 36
  • 4
  • Could you explain further what you mean by micro-service approach? I heard about REST and right now I'm trying to figure out, how I can implement this paradigm in Python. – ceske Jan 29 '18 at 15:21
  • Like a web-service, for instance. You provide access to services (functions, methods) inside a module. This module can be accessed via a REST API, using for example a top-down approach as OpenApi specification (https://en.wikipedia.org/wiki/OpenAPI_Specification ). – JonDel Jan 29 '18 at 17:54
0

@Roberto Trani

Starting out from your solution I was even able to log the communication, that is going on between the two processes, using a third queue.

Thank you, I was really stuck and didn't know how to tackle the problem.

from multiprocessing import Process, Queue

def process1(in_queue, out_queue, log_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            log_queue.put(None) # log END
            out_queue.put(None)  # send feedback about END message
            break

        data *= 2 # Data modification
        print("p1: {}".format(data))
        log_queue.put(data) # Log data
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many, log_queue):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        print("p2: {}".format(data))
        log_queue.put(data) # Log Data
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    log_queue.put(None) # log END
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    logbook = []

    process_1 = Process(target=process1, args=(q1, q2, q3))
    process_2 = Process(target=process2, args=(q2, q1, 10, q3))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()

    while True:
        data = q3.get()
        logbook.append(data)
        if data is None:
            break

    q3.close()
    q3.join_thread()

    print(logbook)
ceske
  • 65
  • 1
  • 7
0

Could you explain further what you mean by micro-service approach? I heard about REST and right now I'm trying to figure out, how I can implement this paradigm in Python. –

Like a web-service, for instance. You provide access to services (functions, methods) inside a module. This module can be accessed via a REST API, using for example a top-down approach as OpenApi specification (https://en.wikipedia.org/wiki/OpenAPI_Specification ).

Im currently using this kind of approach: design a high level interface (modules, functionality of each module, hierarchy and modules' interconnections); write down that design to meet REST endpoints using CRUD (https://en.wikipedia.org/wiki/Create,_read,_update_and_delete) in a yaml/json file in an openapi editor (online: https://editor.swagger.io); use the editor functionality to generate python code (flask); edit the boiler plate code to actually implement the backend funcionalities; run the server to provide access to your API methods. You can even turn the module into a docker image for scalability: Im using this base image: https://github.com/tiangolo/uwsgi-nginx-flask-docker/

JonDel
  • 36
  • 4
  • Thank you for the explanation. I might want to come back to this approach later on in the project. Right now I have the impression that it's too much for my current needs. – ceske Jan 29 '18 at 20:24