0

I've read process-or-pool-for-what-i-am-doing and when-to-use-apply-apply-async-or-map and I was hoping that I understand differences between apply and apply_async. But I have this code which returns desired output only when apply is used and is very short when apply_async is used:

#!/bin/env python
import multiprocessing
import time
import os

semaphore = multiprocessing.Semaphore(1)
# semaphore = multiprocessing.Manager().Semaphore(1)

def producer(num, len, output):
    time.sleep(1)
    element = "PROCESS: %d PID: %d, PPID: %d, QSIZE: %d" % (num, os.getpid(), os.getppid(), output.qsize())
    semaphore.acquire()
    print "PID: %s WRITE -> %s" % (os.getpid(), element)
    if (num == len - 1):
        print "PID: %d WRITE -> Everything was written inside queue, no more apply_async calling, just reading!" % os.getpid()
    output.put(element)
    semaphore.release()
    time.sleep(1)

def consumer(output):
    while True:
      try:
        print "PID: %d READ  <- %s" % (os.getpid(), output.get())
        break
      except:
        print "PID: %d READ  <- NOTHING IN BUFFER" % os.getpid()
        # pass
      time.sleep(1)

if __name__ == '__main__':
    """
    MULTIPLE PRODUCERS AND MULTIPLE CONSUMERS
    """
    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    print "Calling apply*!"
    for i in lst:
        pool.apply_async(producer, (i, len(lst), output))
    print "Do not wait until apply* finishes!"

    for i in lst:
        # RETURNS OUTPUT
        # pool.apply(consumer, (output,))

        # DOES NOT RETURN OUTPUT
        pool.apply_async(consumer, (output,))

Output when pool.apply is used:

Calling apply*!
Do not wait until apply* finishes!
PID: 18348 WRITE -> PROCESS: 1 PID: 18348, PPID: 18341, QSIZE: 0
PID: 18346 WRITE -> PROCESS: 0 PID: 18346, PPID: 18341, QSIZE: 1
PID: 18349 WRITE -> PROCESS: 2 PID: 18349, PPID: 18341, QSIZE: 2
PID: 18347 WRITE -> PROCESS: 3 PID: 18347, PPID: 18341, QSIZE: 3
PID: 18346 WRITE -> PROCESS: 4 PID: 18346, PPID: 18341, QSIZE: 4
PID: 18348 WRITE -> PROCESS: 5 PID: 18348, PPID: 18341, QSIZE: 5
PID: 18349 WRITE -> PROCESS: 6 PID: 18349, PPID: 18341, QSIZE: 6
PID: 18347 WRITE -> PROCESS: 7 PID: 18347, PPID: 18341, QSIZE: 7
...

Output when pool.apply_async is used:

Calling apply*!
Do not wait until apply* finishes!

Seems like producer is evaluated only when it is called from apply but not when it is called from apply_async. Why?

Community
  • 1
  • 1
Wakan Tanka
  • 7,542
  • 16
  • 69
  • 122

1 Answers1

1

Your code is evaluated in either case, however it's done in another process. The difference is that apply is blocking while apply_async is not. In your code, you sent the work off to another process and then never collected it back to the main process.

Note that apply returns a value, while apply_async returns a result object. You have to call get on the result object to get the result. Here's a distilled example:

>>> import multiprocessing
>>> import math
>>> 
>>> p = multiprocessing.Pool() 
>>> p.apply(math.sin, (.5,))
0.479425538604203
>>> result = p.apply_async(math.sin, (.5,))
>>> result 
<multiprocessing.pool.ApplyResult object at 0x103edc350>
>>> result.get()
0.479425538604203
>>> 

If you are doing a for loop over an apply or apply_async, you might want to think about using a map or map_async.

>>> p.map(math.sin, range(5))
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282]
>>> result = p.map_async(math.sin, range(5))
>>> result.get()
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282]
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
  • Thank you for your answer, I'm aware of map* functions but the main problem with using them is that they can't handle (easy way) multiple arguments. May I ask what happens to processes when I did not call `result.get()` are they somehow cleaned or what? – Wakan Tanka Sep 04 '15 at 13:35
  • @WakanTanka: No, if you don't call `get`, the jobs in the other processes finish, and wait in a queue. They are only cleared when you `close` or `terminate` the other processes. If you want a `map` that takes multiple arguments check out my fork of `multiprocessing` (called `pathos.multiprocessing`) -- see: http://stackoverflow.com/a/28001397/2379433. It also utilizes much better serialization than the standard library version. – Mike McKerns Sep 04 '15 at 15:15