3

To get a better understanding about parallel, I am comparing a set of different pieces of code.

Here is the basic one (code_piece_1).

for loop

import time

# setup
problem_size = 1e7
items = range(9)

# serial
def counter(num=0):
    junk = 0
    for i in range(int(problem_size)):
        junk += 1
        junk -= 1
    return num

def sum_list(args):
    print("sum_list fn:", args)
    return sum(args)

start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed)
print('for loop {}s'.format(time.time() - start))

This code ran a time consumer in a serial style (for loop) and got this result

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
for loop 8.7735116481781s

multiprocessing

Could multiprocessing style be viewed as a way to implement parallel computing?

I assume a Yes, since the doc says so.

Here is code_piece_2

import multiprocessing
start = time.time()
pool = multiprocessing.Pool(len(items))
num_to_sum = pool.map(counter, items)
print(sum_list(num_to_sum))
print('pool.map {}s'.format(time.time() - start))

This code ran the same time consumer in multiprocessing style and got this result

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
pool.map 1.6011056900024414s

Obviously, the multiprocessing one is faster than the serial in this particular case.

Dask

Dask is a flexible library for parallel computing in Python.

This code (code_piece_3) ran the same time consumer with Dask (I am not sure whether I use Dask the right way.)

@delayed
def counter(num=0):
    junk = 0
    for i in range(int(problem_size)):
        junk += 1
        junk -= 1
    return num
@delayed
def sum_list(args):
    print("sum_list fn:", args)
    return sum(args)

start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed.compute())
print('dask delayed {}s'.format(time.time() - start))

I got

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
dask delayed 10.288054704666138s

my cpu has 6 physical cores

Question

Why does Dask perform so slower while multiprocessing perform so much faster?

Am I using Dask the wrong way? If yes, what is the right way?

Note: Please discuss with this particular case or other specific and concrete cases. Please do NOT talk generally.

  • **For record** : the text of the O/P has been changed ( incl. the tail "**Note** warning ) after an answer has been already provided, adding several other full sections of code and rewording the original question, copied from the end of the O/P. Still the add-on costs of both the instantiations and running the spawned threads / processes during running the code-samples ( the more at small, if not tiny scales of [SPACE]-domain footprints ) seem to be underestimated in the expectations of speedups. – user3666197 Sep 06 '19 at 15:48
  • **Dask documentation** is explicit both on the range of built-in process-scheduling options: *"The dask library **contains schedulers** for single-threaded, multi-threaded, **multi-process**, and distributed execution."* (https://conference.scipy.org/proceedings/scipy2015/pdfs/matthew_rocklin.pdf) and on differences on the ways,how the work is organised & what are known to be the Best Practices so as to achieve results as reasonably close to the theoretical maximum speedups. With indeed computing-intensive workloads,like **`np.math.factorial(1000+i)`**,a full landscape will start showing teeth – user3666197 Sep 06 '19 at 16:02
  • The original question, before changes, was this: https://stackoverflow.com/revisions/57820724/1 – user3666197 Sep 06 '19 at 16:04
  • Does this answer your question? [why is multiprocessing slower than a simple computation in Pandas?](https://stackoverflow.com/questions/49837539/why-is-multiprocessing-slower-than-a-simple-computation-in-pandas) – demongolem Dec 29 '20 at 14:38

3 Answers3

5

In your example, dask is slower than python multiprocessing, because you don't specify the scheduler, so dask uses the multithreading backend, which is the default. As mdurant has pointed out, your code does not release the GIL, therefore multithreading cannot execute the task graph in parallel.

Have a look here for a good overview over the topic: https://docs.dask.org/en/stable/scheduler-overview.html

For your code, you could switch to the multiprocessing backend by calling: .compute(scheduler='processes').

If you use the multiprocessing backend, all communication between processes still needs to pass through the main process. You therefore might also want to check out the distributed scheduler, where worker processes can directly communicate with each other, which is beneficial especially for complex task graphs. Also, the distributed scheduler supports work-stealing to balance work between processes and has a webinterface providing some diagnostic information about running tasks. It often makes sense to use the distributed scheduler rather than the multirpocessing scheduler even if you only want to compute on a local machine.

Arco Bast
  • 3,595
  • 2
  • 26
  • 53
  • Thanks for your answer. it seems that "your code does not release the GIL" indicates the GIL could be released? So far as I know, Python does not support this, except for using c [extension](https://docs.python.org/3/c-api/init.html#releasing-the-gil-from-extension-code) right? –  Sep 07 '19 at 10:42
  • 1
    The GIL can be released, if you write a c extension or use Cython. There are also libraries like numba, which can help. Also, many libraries like pandas or numpy release the GIL for many operations. – Arco Bast Sep 07 '19 at 10:46
  • @ArcoBast **You've forgotten the most important type of GIL-avoidance** - using process-based backend, where(at an increased instantiation cost)whole python-session is replicated into full-copies of the main-python and the "team" of the spawned copies is used for GIL-independent processing.Numba-tricks were for years limited to a subset of data-objects (+ they bear a big,one-time,cost of JIT-compilation,that has to get justified by many re-uses of such function,otherwise you'll pay more than receive back).Cython-modules are hard to fuse to make strict-typed language safely match untyped python – user3666197 Sep 10 '19 at 19:29
  • Dask-level directive (scheduler = 'processes') depends on the Dask-level implementation. Native Python-level module (the multiprocessing module) has **version-specific behaviour and different versions have different defaults for different O/S**. So there is a big **chain-of-dependencies** to say, that a single Dask-level directive will (or will not) force the process-based **spawned** copies of the Python session, where GIL does not have a code-execution parallelism devastating blocking side-effects, or **{ fork | forkserver }** where "*safely forking a multithreaded process is problematic.*" – user3666197 Sep 10 '19 at 20:09
2

Q : Why did parallel computing take longer than a serial one?

Because there are way more instructions loaded onto CPU to get executed ( "awfully" many even before a first step of the instructed / intended block of calculations gets first into the CPU ), then in a pure-[SERIAL] case, where no add-on costs were added to the flow-of-execution.

For these (hidden from the source-code) add-on operations ( that you pay both in [TIME]-domain ( duration of such "preparations" ) and in [SPACE]-domain ( allocating more RAM to contain all involved structures needed for [PARALLEL]-operated code ( well, most often a still just-[CONCURRENT]-operated code, if we are pedantic and accurate in terminology ), which again costs you in [TIME], as each and every RAM-I/O costs you about more than 1/3 of [us] ~ 300~380 [ns] )

The result?

Unless your workload-package has "sufficient enough" amount of work, that can get executed in parallel ( non-blocking, having no locks, no mutexes, no sharing, no dependencies, no I/O, ... indeed independent having minimum RAM-I/O re-fetches ), it is very easy to "pay way more than you ever get back".

For details on the add-on costs and things that have such strong effect on resulting Speedup, start reading the criticism of blind using the original, overhead naive formulation of the Amdahl's law here.

user3666197
  • 1
  • 6
  • 50
  • 92
  • Thanks man. Would you please give some pieces of code to illustrate your answer. I believe that will get this answer more upvotes. –  Sep 06 '19 at 21:49
1

The code you have requires the GIL, so only one task is running at a time, and all you are getting is extra overhead. If you use, for example, the distributed scheduler with processes, then you get much better performance.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Let me ask you for a kind provision of explicit evidence of Dask not using a full process-based multiprocessing (where no GIL-stepped dancing takes place)- i.e. where did you get sure a thread-only "backend" was run? If Dask were a just GIL-stepped(still re-[SERIAL]-ised by a centrally enforced GIL-lock policy),the library would never (except for a few use-cases with I/O-latency-masking) deliver any remarkable value for parallel-processing speedups, which seems contradicting all the smart and dedicated efforts of Scientific Python community. Sure, any code using Dask,must be designed correctly – user3666197 Sep 06 '19 at 15:45
  • 1
    With Dask you have a choice ( https://docs.dask.org/en/latest/scheduling.html ). The default is threads only, because it has much fewer install dependencies, and can be appropriate for the right workloads, such as calling functions which release the GIL, which many array and dataframe computations do. – mdurant Sep 06 '19 at 16:20
  • With all respect, this was **not correct**, @mdurant, the O/P code was using just a **`@delayed`**-decorated syntax-sugar trick, that permits Dask to ***lazily** defer a such decorated execution of code-segments ALAP*, where some forms of possible concurrent / parallel code-execution is **not related** to Dask `{ dask.Array | dask.Bag | dask.DataFrame }`-storage-abstraction archetypes with their respective Schedulers (some thread-based w/GIL,others not), but uses another way of using a fine-grain parallelism, if possible, over the DAG-dependency-tree of the **lazy**-ALAP-evaluation strategy – user3666197 Sep 06 '19 at 17:03
  • 1
    I am afraid you are mistaken, whether you use delayed or the higher-level API, you create a graph which then gets executed by whichever scheduler you want, in parallel. The execution stage is the same. – mdurant Sep 06 '19 at 18:14
  • @user3666197 How to check whether a thread-only "backend" was running? –  Sep 06 '19 at 20:59
  • 1
    @singularli the threaded scheduler is the default. You could try `.compute(scheduler='processes')` to test the performance of the multiprocessing scheduler. – Arco Bast Sep 06 '19 at 21:02
  • @ArcoBast Thanks a lot! Please move your comments to answer, I'll accept it. –  Sep 06 '19 at 21:23
  • @singularli Easily - check processes in process-monitor or via pid / parent-PID and their respective amounts ot RAM, threads and CPU-core-affinities. Also be warned that "defaults" are prone for version dependencies, so rather be explicit. Dask documentation is clear on other Best Practice Do-s and Dont-s for achieving better performance ( like the python multiprocessing documentation is for how to select process-based backend for escape from GIL-lock and Windows v/s Linux v/s Mac defaults are not the same and version by version do differ )…. Using this since py-2.3 so for a while ... – user3666197 Sep 06 '19 at 23:56