0

I wrote a simple parallel python program

import multiprocessing as mp
import time

def test_function(i):
    print("function starts" + str(i))
    time.sleep(1)
    print("function ends" + str(i))

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    pool.map(test_function, [i for i in range(4)])
    pool.close()
    pool.join()

What I expect to see in the output:

function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0

What I actually see:

function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0

When I'm looking in the output It's look like pool.map running a function and waits till it's done and then run another, but when I calculate the duration of whole program its about 2 seconds and it's impossible unless the test_function is running parallel


Edit:

This code is working well in MacOS and Linux but It's not showing the expected output on windows 10. python version is 3.6.4

user3666197
  • 1
  • 6
  • 50
  • 92
SMMousaviSP
  • 624
  • 7
  • 23
  • 3
    Your code gives expected output, are you sure your `cpu_count()` is 4? – Mohd Nov 06 '19 at 21:35
  • @Mohd My `cpu_count()` is 8 ! and why do you think the output is normal? – SMMousaviSP Nov 06 '19 at 21:51
  • 2
    I meant it runs fine on my pc – Mohd Nov 06 '19 at 21:52
  • @Mohd May I ask what is your OS and which version of python do you use? I'm in windows 10 and using python 3.6.4 – SMMousaviSP Nov 06 '19 at 21:57
  • 1
    Ubuntu 18.04, python 3.6.8 – Mohd Nov 06 '19 at 21:59
  • @Mohd You're right, I've tested it on my MacBook and it worked as I expected. Actually this is not my real problem, I have a much more complex code that I'm trying to parallelize and it didn't work well and I thought it was the code! – SMMousaviSP Nov 06 '19 at 22:13
  • 1
    You may be running to an issue where the output isn't deterministic and is based on the whim of the thread scheduler, you have to be very careful about not assuming any part of one thread will finish before another part unless you explicitly enforce it – DeadChex Nov 06 '19 at 22:55

2 Answers2

1

The multiprocessing.Pool() documentation ( since ever, Py27 incl. ) is clear in intentionally blocking in processing the queue-of-calls as created by the iterator-generated set of the just -4- calls, produced sequentially from the above posted example.

The multiprocessing-module documentation says this about its Pool.map() method:

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

This should be the observed behaviour, whereas different instantiation methods would accrue different add-on ( process copying-related ) overhead costs.

Anyway, the mp.cpu_count() need not be the number of CPU-cores any such dispatched .Pool()-instance workers' tasks will get on to get executed, because of the O/S ( user/process-related restriction policies ) settings of affinity:

Your code will have to "obey" the sub-set of those CPU-cores, that are permitted to be harnessed by any such multiprocessing-requested sub-process,
the number of which is not higher than: len( os.sched_getaffinity( 0 ) )


The Best Next Step : re-evaluate your whole code-execution eco-system

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

might look about something like this on linux-class O/S:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

Check the hidden detail - what your O/S uses for invoking the test_function() - the mapstar() ( not being a sure choice universally ) was the local SMP-linux-class O/S's choice for its default sub-process instantiation method, performed via 'fork'.

user3666197
  • 1
  • 6
  • 50
  • 92
0

I suspect you may be experiencing a common gotcha in multiprocessing:

Printing to shared logs/screen from multiple threads (or processes) of execution (simultaneously) can produce confusing results!

This also explains why you see different behavior depending on the OS. Different OS's will have resolved this in slightly different ways. The underlying buffering scheme, access control , etc. will make a difference.

You probably are getting the multiprocessing you expect, but your printout may be misleading.

I know you provided this code as an example in order to demonstrate a real world problem. So, just return to your original code and consider the above oft-overlooked fact again: printing (or logging to file) is accessing a shared resource. You may need locking or queueing or other techniques. Without knowing the details of your real problem, nothing more can be suggested.

Bill Huneke
  • 746
  • 4
  • 12
  • Are you sure? If there were a Thread-based backend used ( **not** seen in the source-code ) Python **will never permit `[PARALLEL]` code-execution** due to central GIL-lock re-introducing a pure-`[SERIAL]` one-step-after-another for any amount of `mp.Pool`'s thread-based workers ( re-sequenced by GIL-lock policing for principally avoiding any concurrency-related collision - this was the Guido Rossum's design decision and will remain such ) In process-based backend, Windows spawn full copies of python interpreter state, while Linux/MacOS use fork, forkserver, other means, depending on version. – user3666197 Nov 07 '19 at 12:32
  • I'm never really sure of anything.... :) But from your comment, I'm not sure which part you are questioning. Based on the documentation of the multiprocessing module, I do believe that mp provides parallel execution, unencumbered by the GIL. Or, are you wondering whether Windows' implementation (with "full copies" of interpreter) would really constitute parallel access of a shared resource? If I had a Windows box, maybe I would write up a toy script to test this, but even that would not be conclusive... docs: https://docs.python.org/3.8/library/multiprocessing.html – Bill Huneke Nov 07 '19 at 17:47
  • Ouch. I guess I've learned my lesson on SO - never say you are not sure. LOL, my answer was unaccepted and down voted. Your call o/c, but what I said was fully accurate and is the correct answer. The details mentioned in the new answer (how many cores/cpus do you have, does Python on your OS use fork or spawn, etc) are all irrelevant. "Affinity" is especially irrelevant. Lastly, the comment about blocking completely misunderstands. Yes the map call blocks. Of course it does. It joins all the subs. But the subs run in parallel, not in series. If in series, you would see 4 second run time – Bill Huneke Nov 11 '19 at 03:56