1

My code has the following scheme:

class A():
    def evaluate(self):
        b = B()
        for i in range(30):
            b.run()

class B():
    def run(self):
        pass

if __name__ == '__main__':
    a = A()
    for i in range(10):
        a.evaluate()

And I want to have two level of concurrency, the first one is on the evaluate method and the second one is on the run method (nested concurrency). The question is how to introduce this concurrency using the Pool class of the multiprocessing module? Should I pass explicitly number of cores?. The solution should not create processes greater than number of multiprocessing.cpu_count().

note: assume that number of cores is greater than 10 .

Edit: I have seen a lot of comments that say that python does not have true concurrency due to GIL, this is true for python multi-threading but for multiprocessing this is not quit correct look here, also I have timed it also this article did, and the results show that it can go faster than sequential execution.

adnanmuttaleb
  • 3,388
  • 1
  • 29
  • 46
  • 1
    You would create nested processes. The manager should handle maintaining the maximum number of processes. – juanpa.arrivillaga Dec 31 '18 at 21:06
  • 1
    If your program is compute-bound, you might not get any benefit from concurrency in Python anyway. – Ned Batchelder Dec 31 '18 at 21:06
  • According to documentation of Pool, the Pool will create number of workers equal to cpu_count() if not explicitly passed. so if I created a Pool's object twice it may create too many workers, according to my understanding @juanpa.arrivillaga. – adnanmuttaleb Dec 31 '18 at 21:16
  • Why would you create the Pool object twice? – juanpa.arrivillaga Dec 31 '18 at 21:16
  • yes it is compute-bound, but why I might not get the benefits of concurrency? can you explain more please @NedBatchelder. – adnanmuttaleb Dec 31 '18 at 21:17
  • 1
    for each level of concurrency I have to create Pool object. Actually this is one of my problems I do not know how to code it without creating two Pools? @juanpa.arrivillaga – adnanmuttaleb Dec 31 '18 at 21:21
  • Maybe you should include your solution for the *nested processes* - have you tried something and gotten it to work? – wwii Dec 31 '18 at 21:31
  • I have successfully code it for the outer level (evaluate), but not for the nested @wwii – adnanmuttaleb Dec 31 '18 at 21:34
  • @adnanmuttaleb CPython (the usual Python implementation) can only execute Python bytecode in one thread at a time, due to the GIL (Global Interpreter Lock). – Ned Batchelder Dec 31 '18 at 23:53
  • Is multiprocessing also liable to the problem of GIL? if yes how to bypass the problem, since all my code is python and without parallelism is very slow? @NedBatchelder – adnanmuttaleb Jan 01 '19 at 08:53
  • Using multiple processes (with multiprocessing or some other way) is the usual solution to this. Each process has its own GIL, so they can all run at the same time. – Ned Batchelder Jan 01 '19 at 13:23

1 Answers1

1

Your comment touches on a possible solution. In order to have "nested" concurrency you could have 2 separate pools. This would result in a "flat" structure program instead of a nest program. Additionally, it decouples A from B, A now knows nothing about b it just publishes to a generic queue. The example below uses a single process to illustrate wiring up concurrent workers communicating across an asynchronous queue but it could easily be replaced with a pool:

import multiprocessing as mp


class A():
    def __init__(self, in_q, out_q):
      self.in_q = in_q
      self.out_q = out_q

    def evaluate(self):
        """
        Reads from input does work and process output
        """
        while True:
          job = self.in_q.get()
          for i in range(30):
            self.out_q.put(i)

class B():
    def __init__(self, in_q):
      self.in_q = in_q

    def run(self):
        """
        Loop over queue and process items, optionally configure
        with another queue to "sink" the processing pipeline
        """
        while True:
           job = self.in_q.get()

if __name__ == '__main__':
    # create the queues to wire up our concurrent worker pools
    A_q = mp.Queue()
    AB_q = mp.Queue()

    a = A(in_q=A_q, out_q=AB_q)
    b = B(in_q=AB_q)

    p = mp.Process(target=a.evaluate)
    p.start()

    p2 = mp.Process(target=b.run)
    p2.start()

    for i in range(10):
        A_q.put(i)

    p.join()
    p2.join()

This is a common pattern in golang.

adnanmuttaleb
  • 3,388
  • 1
  • 29
  • 46
dm03514
  • 54,664
  • 18
  • 108
  • 145