5

Here is some code I wrote for parallelizing the creation of a dictionary

parallelize.py

if __name__ == "__main__":
  import time

  from multiprocessing import Pool

  def assign_dict(alist):
    return {x:x for x in alist}


  my_dict = {}
  size = 10000000
  threshold=10000000
  my_list=list(range(size))

  start=time.time()
  my_dict=assign_dict(my_list)
  end=time.time()
  print("check seq",end-start, " sec")

  my_dict = {}

  chunks = [my_list[i*threshold:(i+1)*threshold] for i in range(int(size/threshold))]
  process_count = 7   
  pool = Pool(processes=process_count)

  start = time.time()
  inter_list = pool.map_async(assign_dict, chunks)
  inter_list.wait()
  inter_list=inter_list.get()

  for some_dict in inter_list:
    print("Combining...", time.time()-start, " sec elapsed")
    my_dict = {**my_dict, **some_dict}

  print("check 152002 as key ",my_dict[152002])
  end=time.time()
  print("check parallel",end-start, " sec")

Here is the output for size 1 mil with threshold 1 mil

check seq 0.6559352874755859  sec
Combining... 4.751460790634155  sec elapsed
check 152002 as key  152002
check parallel 5.064720869064331  sec

Here is output for size = 10mil with threshold 1 mil

check seq 0.668889045715332  sec
Combining... 1.6871337890625
Combining... 1.7269806861877441
Combining... 1.860083818435669
Combining... 2.0794677734375
Combining... 2.266465663909912
Combining... 2.47836971282959
Combining... 2.8915648460388184
Combining... 3.2443037033081055
Combining... 3.6063129901885986
Combining... 3.9933629035949707
check 115202 as key  1152002
check parallel 4.406447887420654  sec

Here is the output for size 100 mil with threshold 10 mil, the worst part here is even before the combining, the map_async still takes 55 secs compared to the 19 secs in sequential.

check seq 19.182615041732788  sec
Combining... 55.18172788619995
Combining... 56.38586497306824
Combining... 58.534785747528076
Combining... 61.805513858795166
Combining... 64.75091290473938
Combining... 71.34392070770264
Combining... 76.02847385406494
Combining... 81.2545096874237
Combining... 87.75674867630005
Combining... 109.01232576370239
check 115202 as key  1152002
check parallel 126.1939218044281  sec

Even though I tried various combinations of size and threshold the code with pool is always slower, so it is not that the threshold is too big as the sequential version runs very quickly. Even when the size is the same as the threshold, the code with pool is many seconds slower.

And even for long running jobs like size = 1000 million, the parallelized version is way slower than sequential execution, meaning that there is no benefit of parallelization. I have 8 cores and 16GB RAM, I am running MacOSX, I even verified that my cores were running in parallel in the activity monitor to execute the task, yet it is slower. The combine phase does not take much time as shown. By the time the inter_list.get() command ends, the parallel part is already done. So it cannot interfere with the dictionaries combining.

Can anyone please parallelize this code to be faster than the sequential version in Python 3 or at least help me understand why this is happening?

devssh
  • 1,184
  • 12
  • 28
  • Why is there only one `Combining...` output? Shouldn't it occur as many times as there are chunks? Are you sure that you have split up the work in multiple chunks? – mkrieger1 Oct 04 '18 at 19:56
  • Since the threshold and size are same, yes I have verified with multiple chunks, the chunks are working – devssh Oct 04 '18 at 19:58
  • But if there is only one chunk, isn't that basically the same as the "sequential" version, only with the multiprocessing overhead? – mkrieger1 Oct 04 '18 at 19:59
  • My problem is that the multiprocessing overhead does not go away, even on larger loads like 1000 million. – devssh Oct 04 '18 at 20:01
  • the processing module calls the entire file in a new process, you are doing the whole thing mutiple times. use the if __name__ == "__main__": to reduce it to just the bits that you need to process. – JamesD Oct 04 '18 at 20:01
  • It is executed only once, why would the whole thing happen multiple times? The `chunks` take care of that. Just run it with `python file.py` on python 3. I am using Python 3.6. The whole file would be in the `main` function then. – devssh Oct 04 '18 at 20:05
  • @devssh On Windows there is no fork, which means that `multiprocessing` will spawn a new python process and import your main file. In your case this will cause a new pool to be started etc... basically a "forkbomb". This is clearly documented in the documentation for `multiprocessing`. That's why the documentation says guard the "executable code" in `if __name__ == "__main__":` since that prevents the importing of the module to re-execute it incorrectly. – Bakuriu Oct 04 '18 at 20:10
  • 1
    @Bakuriu But if that were the case, wouldn't that mean that there was an infinite loop and the program would never finish? As opposed to just taking longer than expected? – mkrieger1 Oct 04 '18 at 20:13
  • Did you check how much time it takes for the `map_async` to complete vs how much it takes to combine the resulting dicts? I believe the time taken to combine the dicts is bigger, which means that parallelizing the first part is not really helping that much. – Bakuriu Oct 04 '18 at 20:14
  • 1
    @mkrieger1 Yes, I said *on Windows*. The OP is probably on a Unix system which supports `fork` and hence no import is necessary – Bakuriu Oct 04 '18 at 20:14
  • I have updated my code to add `if __name__=="__main__"` yet there is no change, can you confirm that? Did this thing "forkbomb" really!! Even after updating it does not improve the benchmarks. Yes, I will update my OS. The combining is not the source of the trouble. I went to great lengths to verify that. You can check it for yourself, I even added a benchmark on it. I even made it `map_async` to leave any doubt. The code works just as bad on `map` as well. – devssh Oct 04 '18 at 20:15
  • 1
    @devssh You should state in your post which operating system you are using to avoid any further speculation. – mkrieger1 Oct 04 '18 at 20:15
  • 1
    @Bakuriu The first `Combining...` output should correspond to the time taken by `map_async`. – mkrieger1 Oct 04 '18 at 20:17

1 Answers1

5

Your multiprocessing version is slower than the sequential version because of the inter-process communication required to pass the results from the Pool.map from the workers back to the process from which the workers were forked.

Because of the GIL, Python's multiprocessing library is the suggested way to cpu intensive parallel tasks. However, this means that the virtual memory address spaces of each worker in the Pool are different, and therefore the results of the Pool.map must be serialized and passed between processes. Because the result of your Pool.map is such a large array, this means your program is spending a large amount of time serializing/deserializing the answers and passing them between processes. In the sequential version, there is only a single process and thus the result never needs to be serialized and passed between processes and then deserialized, which is probably why it runs faster in this case.

If you want to avoid this bottleneck, you will want to try using Python's Shared Memory array which can avoid the inter-process communication bottleneck, since the array will be in the same virtual address space of all worker processes. If you really need a key-value pair mapping, then look into Python's multiprocessing.Manager.dict.

In general, Pool.map is good when you can parallelize some computation that does not produce a large quantity of data.

Jon Deaton
  • 3,943
  • 6
  • 28
  • 41
  • Since this example is fairly simple and self contained with no other complications can you edit to add how to use multiprocessing.Array? I'll read up more about this, but this seems to be severely restrictive, completely breaking the immutability. Why do the processes need to talk to each other when they have tasks that are self contained and the structures I am using are immutable? I saw another post describing the same just now but the logic behind this is quite fishy. The combining step should be separate right? Why does the independent job need to be given shared memory – devssh Oct 04 '18 at 21:11
  • Hmm, so `Pool.map` is not a map in the true sense, it should give smaller quantities of data. This does help a bit. The only thing worse than bad parallelization and IO calls is network calls which take too much time and have latency, even Spark tries to aggressively minimize them and this makes it 10x - 100x more performant than MapReduce2.0. Why did this happen here when my system is not distributed across systems! – devssh Oct 04 '18 at 21:13
  • 1
    Indeed this can be severely restrictive, and is one of the few places where python pays a fairly high price for it's general convenience and high level of abstraction. Alas no programming language is perfect. – Jon Deaton Oct 04 '18 at 21:23
  • There is nothing wrong with the general convenience and high level of abstraction. IMHO they got too involved with the low level details to prioritize immutability or were restricted by the OS process. `pool.map` should be able to parallelize immutable self-contained functions and eliminate interprocess communication when being explicitly provided `chunks` of data by their users from their API. Maybe `process` level is wrong and we should be looking at `multithreading` or something else. I hope they add more parallel processing to python, it's embarrassing to do things like map sequentially. – devssh Oct 04 '18 at 21:52