2

Here is my python-3.6 code for simulating a 1D reflected random walk, using the joblib module to generate 400 realizations concurrently across K workers on a Linux cluster machine.

I note, however, that the runtime for K=3 is worse than for K=1, and that the runtime for K=5 is even worse!

Can anyone please see a way to improve my use of joblib?

from math import sqrt
import numpy as np
import joblib as jl
import os

K = int(os.environ['SLURM_CPUS_PER_TASK'])

def f(j):
    N = 10**6
    p = 1/3
    np.random.seed(None)
    X = 2*np.random.binomial(1,p,N)-1   # X = 1 with probability p
    s = 0                               # X =-1 with probability 1-p
    m = 0 
    for t in range(0,N):
        s = max(0,s+X[t])
        m = max(m,s)
    return m

pool = jl.Parallel(n_jobs=K)
W = np.asarray(pool(jl.delayed(f)(j) for j in range(0,400)))
W      
user3666197
  • 1
  • 6
  • 50
  • 92
S. Finch
  • 21
  • 3
  • 1
    Would you please include the times you get for your various K values? – Stefan Collier Jun 14 '18 at 15:55
  • 1
    For K=1, CPUTime=00:10:52. For K=3, CPUTime=00:11:36. For K=5, CPUTime=00:12:58. – S. Finch Jun 14 '18 at 16:18
  • @S.Finch would you mind to also post the NUMA-system's actual `hwloc` / **`lscpu`** picture ? – user3666197 Jun 14 '18 at 16:25
  • @S.Finch would you mind to kindly cooperate with people, who sponsored their time & knowledge to help you? Solution is heavily dependent on how the hardware resources available get used and scheduled into the flow of processing, where a pure-`[SERIAL]` scheduling of **standard python tools can make the whole block of 400**-RandomWalks **`x 1E6`**-drunken-sailor-steps **in less than `1.45 [s]`**, instead of your posted ~11-13 [min](!). If the intention was to teach students on {R|MATLAB|Julia|Stata} with adding also a python approach, **do post the `lstopo` map of the NUMA-Cluster-under-test** – user3666197 Jun 15 '18 at 11:09
  • I am afraid that NUMA clusters & python's GlobalInterpreterLock are far beyond the skill levels of my students (in an elementary seminar on statistical programming). Thank you for your kindness & expertise. – S. Finch Jun 16 '18 at 13:37

2 Answers2

7

a way to improve my use of joblib ?

joblib can help and will help, but only a code, which can benefit from distributed execution, split across some pool of resources if costs of doing so are less than an effective-speedup.

Tweaking the joblib's pre-load and batch size parameters only starts to make any sense only after the to-be-distributed code gets performance polished.

Some efforts on this, as demonstrated below, have shown the core-speedup of ~ 8x on achieving still pure-[SERIAL] run-times of ~ 217,000 [us] per one Random Walk ( instead of ~ 1,640,000 [us] per item, as was reported above ).

Only after this there may come some harder work on Cluster-resources related optimisation ( performance devastation avoidance efforts ) for the ab-definitio postulated intention to organise a distributed work-flow of the above defined 400 repetitions thereof.

That makes sense if and only if:

  • if possible to avoid CPU-starvation, and
  • if not having to pay any excessive add-on costs of distributed job-scheduling.

Perhaps a long, but important story about where the performance gets saved or lost:

Executive summary

There could be hardly a better reward to Dr. Gene AMDAHL's argument.

The inner structure of the above defined task is heavily [SERIAL]:

  • random number genesis is principally a pure-[SERIAL] process, due to PRNG-design
  • 1E6 iterator atop a 1D-vector of pre-calc'd drunken-sailor-steps is pure-[SERIAL]

Yes, the "outer"-scope-of-work ( the 400 repetitions of the same process ) can be easily converted to a "just"-[CONCURRENT] ( not a true-[PARALLEL], even if professors and wannabe gurus try to tell you ) process scheduling, but the add-on costs of doing so get worse than linearly added to the run-time, and given the [SERIAL] part was not performance re-engineered, the net-effect of such efforts could easily devastate the original good intentions ( Q.E.D. above, as the posted run-times grew up, from 10:52, for K == 1, towards ~ 13 minutes for even small number of K-s ).


A brief testing has proved the whole task could be, after using standard python tools, run in a pure-[SERIAL] fashion under < 1.45 [s] ( instead of the reported ~ 12 - 13 minutes ) even on a rather stone-age archaic desktop device ( some in-cache computing effect were possible, yet more as an accidental side-effect, than an intentional HPC-motivated code-refactoring for an HPC-cluster specific performance ):

u@amd64FX:~$ lstopo --of ascii
+-----------------------------------------------------------------+
| Machine (7969MB)                                                |
|                                                                 |
| +------------------------------------------------------------+  |
| | Package P#0                                                |  |
| |                                                            |  |
| | +--------------------------------------------------------+ |  |
| | | L3 (8192KB)                                            | |  |
| | +--------------------------------------------------------+ |  |
| |                                                            |  |
| | +--------------------------+  +--------------------------+ |  |
| | | L2 (2048KB)              |  | L2 (2048KB)              | |  |
| | +--------------------------+  +--------------------------+ |  |
| |                                                            |  |
| | +--------------------------+  +--------------------------+ |  |
| | | L1i (64KB)               |  | L1i (64KB)               | |  |
| | +--------------------------+  +--------------------------+ |  |
| |                                                            |  |
| | +------------++------------+  +------------++------------+ |  |
| | | L1d (16KB) || L1d (16KB) |  | L1d (16KB) || L1d (16KB) | |  |
| | +------------++------------+  +------------++------------+ |  |
| |                                                            |  |
| | +------------++------------+  +------------++------------+ |  |
| | | Core P#0   || Core P#1   |  | Core P#2   || Core P#3   | |  |
| | |            ||            |  |            ||            | |  |
| | | +--------+ || +--------+ |  | +--------+ || +--------+ | |  |
| | | | PU P#0 | || | PU P#1 | |  | | PU P#2 | || | PU P#3 | | |  |
| | | +--------+ || +--------+ |  | +--------+ || +--------+ | |  |
| | +------------++------------+  +------------++------------+ |  |
| +------------------------------------------------------------+  |
|                                                                 |
+-----------------------------------------------------------------+
+-----------------------------------------------------------------+
| Host: amd64FX                                                   |
| Date: Fri 15 Jun 2018 07:08:44 AM                               |
+-----------------------------------------------------------------+

< 1.45 [s]?
Why ? How ? That is the whole story about ...
( Due HPC efforts could make it further well under 1 [s] )


Dr. Gene AMDAHL's argument, even in his original, add-on overheads agnostic form, in his well cited report, was showing, that any composition of [SERIAL] and [PARALLEL] blocks of work will have a principally limited benefit from increasing amount of processing units harnessed for the [PARALLEL]-part ( a.k.a. A Law of Diminishing Returns, towards an asymptotically limited speedup for even an infinite number of processors ), whereas any improvement introduced for the [SERIAL]-part will continue to additively increase the speedup ( in a pure linear fashion ). Let me skip here the adverse effects ( also affecting the speedup, some in a similarly pure linear fashion, but in an adverse sense - the add-on overheads - as these will get discussed below ).

Step No. 1:
Repair the code, so as to make something useful.

Given the code as-is above, there is no random-walk at all.

Why ?

>>> [ op( np.random.binomial( 1, 1 /3,  1E9 ) ) for op in ( sum, min, max, len ) ]
[0, 0, 0, 1000000000]

So,
the code as-is produces a rather expensive list of a-priori known constants. No randomness at all. Damned python rounding of integer division. :o)

>>> [ op( np.random.binomial( 1, 1./3., 1E9 ) ) for op in ( sum, min, max, len ) ]
[333338430, 0, 1, 1000000000]

So this is fixed.


Step No. 2:
Understand the overheads ( and best also any hidden performance-blockers )

Any attempt to instantiate a distributed process ( for each one of the instructed K-amount joblib-spawned processes, calling a multiprocessing with the subprocess, not the threading, backend ) makes you pay a cost. Always...

Given that your code execution will get additional [SERIAL]-add-on code, that has to run, before any ... still just a theoretical ... ( 1 / n_jobs ) split-effect may start to take place.

A closer look onto the "useful" work:

def f( j ):                                         # T0
    #pass;   np.random.seed( None )                 #  +      ~ 250 [us]
    prnGEN = np.random.RandomState()                #  +      ~ 230 [us]
    # = 2 * np.random.binomial( 1, 1./3., 1E6 ) - 1 #  +  ~ 465,000 [us]
    X =        prnGEN.binomial( 1, 1./3., 1E6 )     #  +  ~ 393,000
    X*= 2                                           #  +    ~ 2.940
    X-= 1                                           #  +    ~ 2.940                                  
    s = 0; m = 0                                    #  +        ~ 3 [us]
    for t in range( 0, int( 1E6 ) ):                #     ( py3+ does not allocate range() but works as an xrange()-generator
        s = max( 0, s + X[t] )                      #  +       ~ 15 [us] cache-line friendly consecutive { hit | miss }-rulez here, heavily ...
        m = max( m, s )                             #  +       ~  5 [us]
    return  m                                       # = ~ 2,150,000 [us] @  i5/2.67 GHz
#                                                   # = ~ 1,002,250 [us] @ amd/3.6  GHz

For this sort of workpackage, the best demonstration-purpose speedups will be demonstrable from a non-interpreted, GIL-free, threading-backend, multiprocessing.Pool-spawned Cython-ised code-packages, cdef-ed with a nogil directive. May expect such code-execution under about = ~ 217,000 [us] per one pure-[SERIAL] Random Walk, with 1E6 steps, when it starts to make sense to harness the pool of code-execution nodes with some pre-load tweaking, so as not to let them starve. Yet, all premature-optimisation warnings are due and valid in this simplified context and proper engineering practices are to be used for achieving a professional-grade result.

Some tools may help you see, dow to the assembly level, how many instructions were actually added, by any respective high-level language syntax-constructor element ( or concurrency / parallelisation #pragma masquerades ) to "smell" these add-on processing-costs one will pay during the finalised code-execution:

enter image description here

Given these add-on processing costs, a "small"-(thin)-amount of work "inside" the "just"-concurrently executed ( Beware, not automatically a true [PARALLEL]-scheduling possible ), these add-on costs may make you pay a way more than you would receive by the split.

The blockers:

Any add-on communication / synchronisation may further devastate the theoretical Speedup flow-of-code-execution. Locks, GIL being avoided if not using the threading-backend, semaphores, socket-comms, sharing et al are the common blockers.

For a carefully designed source-of-randomness, any draw from such a "device" must also get centrally re-synchronised, so as to keep the quality of such randomness. This may cause additional trouble behind the curtain ( a common issue on systems with some authority certified sources of randomness ).


The next step?

Read more details on Amdahl's Law, best the contemporary re-formulated version, where both setup + termination overheads are added in the Overhead-strict mode and also an atomicity-of-processing was taken into account for practical evaluation of realistic speedup limitations

Next: measure your net-duration-costs of your code and you get indirectly the add-on costs of setup+termination overheads on your in-vivo system execution.

def f( j ):
    ts = time.time()
    #------------------------------------------------------<clock>-ed SECTION
    N = 10**6
    p = 1./3.
    np.random.seed( None )                    # RandomState coordination ...
    X = 2 * np.random.binomial( 1, p, N ) - 1 # X = 1 with probability p
    s = 0                                     # X =-1 with probability 1-p
    m = 0 
    for t in range( 0, N ):
        s = max( 0, s + X[t] )
        m = max( m, s )
    #------------------------------------------------------<clock>-ed SECTION
    return ( m, time.time() - ts )            # tuple

For classroom tutorials, I've successfully parallelized my random walk code using special modules within R, Matlab, Julia & Stata. (By "successfully", I mean it is abundantly clear that 20 workers do at least 10 times as much work as 1 worker in the same time interval.) Is such internal parallelization not feasible in Python ?

Well, the last comment seems to display some sort of inconvenience to people, who tried to help and who have brought in reasons, why the posted code, as-is, works as was observed. It was not our choice to define the processing strategy this exact way, was it?

So, again. Given the original decision was to use python-3.6 + joblib.Parallel() + joblib.delayed() instrumentation, simply Alea Iacta Est ...

What might have worked ( as cited ) for { R | MATLAB | Julia | Stata } simply does not mean that it will work the same way in GIL-stepped, the less joblib.Parallel()-spawned eco-systems.

The first cost one will ALWAYS pay for a joblib.Parallel()-spawned job is a cost of re-construction of a whole, 1:1 copy of the current python-interpreter state. Given that the current state contains a way more object-instances, than a stripped-down MCVE-code ( as was demonstrated for a MCVE-script as @rth demonstrated ), the whole, many-times-replicated memory-image first has to get copied + transported + re-constructed onto all distributed-processing nodes, accross the SLURM-managed cluster-footprint, which all costs add-on ( non-productive ) overhead time. If in doubts, add a few GB-sized numpy-arrays into the state of the python interpreter and put the measured timestamps for a respective duration calculations into the first and last array cells and finally return ( m, aFatArray ). The overall execution times will jump higher, as both the initial 1:1-copy and the returning path will have to move much larger amount of data there and back ( again, for details on instantiation-related add-on costs, there has been posted in many places here, including templates for systematic benchmarking of the respective add-on costs ).

Exactly this was the reason to advice the kind O/P to indeed do measure the effective amount of computing time ( the "benefit" part of the elementary cost/benefit argument ), which is both cheap to get in the trivialised experiment, which will show the scale, the sum, and the actual proportion of the useful-work inside the "remote"-executed efficient-computing-payload(s) ( ref. the code modification proposed above, that returns the values so that W[:][1] will tell the actual "net" computing costs of the "useful-work" spent during the efficient-workpackage-computing time, once these have got finally arrived and activated into the respective "remote" code-execution eco-system(s) ( here, in the form of the joblib.Parallel()-spawned binary full-scale replicas of the original python interpreter ) whereas a flow of time between a start and end of the main-code execution shows the actual expenses - here the lumpsum spent amount of time, i.e. including all the "remote"-process-instantiation(s) + all the respective workpackage(s)-distribution(s) + all the "remote"-process-termination.


A final remark for those who have not spend a time on reading about the source-of-Randomness-related problems :

Any good-practice shall rather avoid a blocking logic hidden behind the "shared-randomness". Better use individually configured PRNG-sources. If interested or in a need for a certifiable PRNG-robustness, feel free to read more on this in a discussion here

halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
3

@user3666197 wrote a very nice answer about overhead, with a lot of bolded text ;) However, I want to draw your attention, that when you run your code with K=1, you do only one random walk. With K=3 or 5 you perform 3 or 5 random walks simultaneously (seems). So you need to multiply runtime of K=1 by 3 or 5 to get runtime which is required get the same job done. I guess this runtime will be much bigger than you got.

Well, to provide a useful answer, not just a note (OP is right in the comments). It seems a multiprocessing module is a better choice. Here your code

from math import sqrt
import numpy as np

from multiprocessing import Pool
import os

K = int(os.environ['NTASK'])


def f(j):
    N = 10**6
    p = 1./3.
    np.random.seed(None)
    X = 2*np.random.binomial(1,p,N)-1   # X = 1 with probability p
    s = 0                               # X =-1 with probability 1-p
    m = 0 
    for t in range(0,N):
        s = max(0,s+X[t])
        m = max(m,s)
    return m

pool = Pool(processes=K) 
print pool.map(f, xrange(40))

and the performance

$ time NTASK=1 python stof.py                                                                                                
[21, 19, 17, 17, 18, 16, 17, 17, 19, 19, 17, 16, 18, 16, 19, 22, 20, 18, 16, 17, 17, 16, 18, 18, 17, 17, 19, 17, 19, 19, 16, 16, 18, 17, 18, 18, 19, 20, 16, 19]

real    0m30.367s
user    0m30.064s
sys     0m 0.420s

$ time NTASK=2 python stof.py                                                                                                
[18, 16, 16, 17, 19, 17, 21, 18, 19, 21, 17, 16, 15, 25, 19, 16, 20, 17, 15, 19, 17, 16, 20, 17, 16, 16, 16, 16, 17, 23, 17, 16, 17, 17, 19, 16, 17, 16, 19, 18]

real    0m13.428s
user    0m26.184s
sys     0m 0.348s

$ time NTASK=3 python stof.py 
[18, 17, 16, 19, 17, 18, 20, 17, 21, 16, 16, 16, 16, 17, 22, 18, 17, 15, 17, 19, 18, 16, 15, 16, 16, 24, 20, 16, 16, 16, 22, 19, 17, 18, 18, 16, 16, 19, 17, 18]

real    0m11.946s
user    0m29.424s
sys     0m 0.308s

$ time NTASK=4 python stof.py
[16, 19, 17, 16, 19, 17, 17, 16, 18, 22, 16, 21, 16, 18, 15, 16, 20, 17, 22, 17, 16, 17, 17, 20, 22, 21, 17, 17, 16, 17, 19, 16, 19, 16, 16, 18, 25, 21, 19, 18]

real    0m 8.206s
user    0m26.580s
sys     0m 0.360s
user3666197
  • 1
  • 6
  • 50
  • 92
rth
  • 2,946
  • 1
  • 22
  • 27
  • Negative, Sir. In either case, the **`def`**-ed process loops always exactly **`1E6` times** "inside" any of the `joblib`-spawned **`400`-walks**, no matter how these got finally distributed among the `K == { 1, 3, 5, ... }`-code-execution paths. Plus sure, The Oveheads do matter here **`:o)`** – user3666197 Jun 14 '18 at 16:37
  • @user3666197, of course, I totally agree with you! The overhead, and in OP case, slurm distribution time do matter. But it is funny to think that when we distribute processes we expect just multiplication of work to be done within the same time. As far as I remember there is a very nice notice about this preassumption in the bible of parallel programming [DBPP](http://www.mcs.anl.gov/~itf/dbpp/) – rth Jun 14 '18 at 17:05
  • For classroom tutorials, I've successfully parallelized my random walk code using special modules within R, Matlab, Julia & Stata. (By "successfully", I mean it is abundantly clear that 20 workers do at least 10 times as much work as 1 worker in the same time interval.) Is such internal parallelization _not_ feasible in Python? Is some module other than "joblib" preferable? I'd enjoy seeing a concrete revision of my code that succeeds. Thanks! – S. Finch Jun 14 '18 at 17:28
  • @rth Just for clarity, the OP has explicitly mentioned **python-3.6**, that means your code will throw a `NameError: name 'xrange' is not defined` while the python-2.x will have problems with memory upon allocating all the `range()`-instance(s) memory-blocks **`:o)`** >>> https://tio.run/##K6gsycjPM/7/Py2/SCFTITNPoaIoMS89VUPB0EBB04pLAQgy//8HAA – user3666197 Jun 14 '18 at 19:03
  • 1
    @user3666197, oops, thanks for the comment. I stuck with python-2.x. Hope `range()` works in python-3.x as well as in python-2.x. I've corrected the answer. BTW, inside `joblib.pool` there is actual `multiprocessing` module, but `joblib.pool` slows down somewhere (accurate lock, I think), while `multiprocessing.Pool` doesn't. – rth Jun 14 '18 at 19:36
  • I appreciate your revised code greatly -- "multiprocessing" indeed seems better than "joblib" -- does anyone know why this is true? Thank you! – S. Finch Jun 14 '18 at 20:24
  • @rth after re-reading your testing, there is a cardinal error -- your test was run just K-times, **instead of 400-times**, so your time readouts should be scaled hundreds larger ... **Would you mind to kindly re-run the tests and post the relevant observations, coherent with the O/P problem definition?** – user3666197 Jun 15 '18 at 00:47
  • 1
    @user3666197 Oh! my bad! Thank you for spotting up my error. I've updated the answer but run it only for 40 walks, not for 400. I hope that is ok. I have no time to wait for 12- 15 minutes. I could confirm, there is a problem with joblib in python-2.x version. – rth Jun 15 '18 at 16:50
  • ( What kind of evidence has lead you to a guess the problem is with the **`joblib`** per se? You may started to see the memory expensive costs of how python-2.x implemented the **`range()`** ( `xrange()` does not show this, as being itself a sort of a `[SERIAL]`-iterator on its own, serving just one-at-a-time number per loop ), as `range()` will demand a large block of RAM to get memalloc-ed per each `[CONCURRENTLY]`-hosted and executed run, which may soon block your platform's CPU/RAM communication lines, if trying to being run 3+ concurrently, colocated on your desktop-class localhost ) – user3666197 Jun 15 '18 at 17:27
  • 1
    @user3666197 I should backup with confirmation. Well, it was my bug. Sorry **(-.-)** – rth Jun 15 '18 at 18:41