0

After browsing through many discussions on the same/similar topics, I still can't solve my problem, hence I would like to post it below.

The following is a MWE for what I would like to parallelize, which is to solve a set of independent linear equations (nI+mat)x=y parametrized by n=0,1,2 with fixed arrays mat and y. Note that the arrays are declared to be global with the hope that they can be accessed by different processes/pools (see below). But I don't think it works and this is the core of the question: How to share big numpy arrays for different processes/pools to avoid communication overhead?

import numpy as np 
import time 
import os


N = 2000
num = 3 
global mat 
mat = np.random.rand(N, N)
global y 
y = np.random.rand(N,1)

# Functions to be parallelized num of times
def fun(n): 
    print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
    newmat = n * np.eye(N) + mat
    
    return np.linalg.solve(newmat, y)

# Approach 1: no parallel
def main():
    start_time = time.time()
    res = []
    for i in range(num):
        res.append(fun(i))
    print(f"Approach 1: Time elapsed = {time.time()-start_time} sec")
    return res
main()

I tried the following three approaches to parallelize it: Pool, Process and Process with Array and numpy.frombuffer. See below.

from multiprocessing import Process, set_start_method, Queue, Pool, cpu_count, Array, RawArray
set_start_method('fork') 

# Approach 2: with pool 
def main2():
    start_time = time.time()
    pool = Pool(cpu_count())
    res = pool.map(fun, range(num))
    print(f"Approach 2: Time elapsed = {time.time()-start_time} sec")
    pool.close()
    return res    
main2()

# Approach 3: with process
def fun2(i, output):
    output.put(fun(i))

def main3():
    start_time = time.time()
    output = Queue()
    processes = [Process(target=fun2, args=(i, output)) for i in range(num)]
    # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()            

    res = [output.get() for _ in processes]
    
    print(f"Approach 3: Time elapsed = {time.time()-start_time} sec")
    
    return res       
main3()

# Approach 4: with process with Array, numpy.frombuffer, 
def fun3(n, output, mat, y):
    print(f"{n}-th job is run on child {os.getpid()} of parent {os.getppid()}")
    mat2 = np.frombuffer(mat.get_obj())
    newmat = n * np.eye(N) + mat2.reshape((N, N))
    output.put(np.linalg.solve(newmat, y))

def main4():
    mat2 = Array('d', mat.flatten())
    y2 = Array('d', y)
    start_time = time.time()
    output = Queue()
    processes = [Process(target=fun3, args=(i, output, mat2, y2)) for i in range(num)]

    # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()            

    res = [output.get() for _ in processes]
    
    print(f"Approach 4: Time elapsed = {time.time()-start_time} sec")
    
    return res
main4()

Neither of these approaches works and I got

0-th job is run on child 8818 of parent 3421
1-th job is run on child 8818 of parent 3421
2-th job is run on child 8818 of parent 3421
Approach 1: Time elapsed = 0.2891273498535156 sec
0-th job is run on child 8819 of parent 8818
1-th job is run on child 8820 of parent 8818
2-th job is run on child 8821 of parent 8818
Approach 2: Time elapsed = 3.6278929710388184 sec
0-th job is run on child 8832 of parent 8818
1-th job is run on child 8833 of parent 8818
2-th job is run on child 8834 of parent 8818
Approach 3: Time elapsed = 4.243804931640625 sec
0-th job is run on child 8843 of parent 8818
1-th job is run on child 8844 of parent 8818
2-th job is run on child 8845 of parent 8818
Approach 4: Time elapsed = 4.745251893997192 sec

This summarizes all the approaches I have seen so far. I am aware of that there is a SharedMemory in Multiprocessing, which it is not available to python 3.7.2. If that could solve the problem, I would be very happy to see how it works.

Really thanks for anyone to read through the whole post, and any helps are appreciated. And in case it is important, I am using a Mac with Apple M1 chip, macOS Monterey.

Update 1: per @AKX's point, I removed the print(n-th job) line, and make N=10000, and the results are

Approach 1: Time elapsed = 23.812573194503784 sec
Approach 2: Time elapsed = 126.91087889671326 sec

for Approach 3, it has taken for around 5 minutes which I have to cut it off. So the time overhead is pretty large for large N.

fagd
  • 103
  • 5
  • Why do you need to share `mat` and `y`? They are not modified by `fun`. – Roland Smith Jul 24 '22 at 19:08
  • You're measuring the time it takes to spawn the subprocesses too, not the time it takes for them to do the work. You'd probably get different numbers for `n=1000000`. You should also note the result is necessarily serialized even if the inputs would get forked. – AKX Jul 24 '22 at 19:08
  • @RolandSmith So they can be _read_ in the other processes too without being explicit function arguments (and as such be serialized and deserialized). – AKX Jul 24 '22 at 19:09
  • @RolandSmith Ah sorry forgot to address that, the reason is that the approach 2 (with `Pool`) is slow because the big matrices will be passed to other processes. Thus I am thinking to share them to avoid the overhead. That is what I read somewhere. – fagd Jul 24 '22 at 19:10
  • @AKX Good point. I did see that the lines (0-th job, 1-th job etc), say with approach 4, pop up immediately, and that is how I convinced myself that they are running in parallel. But the problem is that they will stuck for a while, before printing out the last line (Approach 4: time ...). Then to your point, let me ask this: How I should parallelize it in such a way that I can *get back the result* as early as possible? – fagd Jul 24 '22 at 19:12
  • Sure, but my bet is that most of the "time elapsed" is spent setting things up, not doing work. Could you rerun things (probably without the "nth job" `print`s) with a much, much larger `num` and add the results? – AKX Jul 24 '22 at 19:14
  • "How I should parallelize it in such a way that I can get back the result as early as possible?" Use `Pool.imap_unordered` (or start reading your Queue before you `join` all of the processes) – AKX Jul 24 '22 at 19:15
  • @AKX, I tried `N=10000` (5 times larger) and remove the print statement, but still the `Pool`, `Process` is still much slower than without them. I will pose the results when they are ready. I can figure out how to use `Pool.imap_unordered` and order the results if needed, but could you elaborate on how to "start reading your Queue"? – fagd Jul 24 '22 at 19:21
  • @AKX, so am I sharing the `mat` and `y` correctly in approach 4? That is what I have been worried about. – fagd Jul 24 '22 at 19:25
  • I'd say yes, you are sharing them correctly there too. By the way - make sure you only do `mat = np.random.rand(N, N)` etc. in `main()` - otherwise that code could be run by the subprocesses too! – AKX Jul 24 '22 at 19:32
  • Yeah, they used to be in the `main`s. I moved them out because I read somewhere if you put them there they will be automatically shared by different processes. Not sure if is the case, but anyway let me move them back. – fagd Jul 24 '22 at 19:34
  • So long as they're global names, they will be shared to forked processes, sure. But don't initialize them in the global scope. – AKX Jul 24 '22 at 19:35
  • @fagd I see you mentioned `shared_memory`, which could be useful here and might be what you are looking for. Is there a reason you don't want to upgrade to python 3.9? – Charchit Agarwal Jul 24 '22 at 19:37
  • @Charchit, there isn't any particular reason, simply feel that it should be doable with the functionalities in 3.7.2. It should be the case right, given my use case is pretty simple and all those discussions suggest so. If that is not possible (which would really surprise me), then I will try `shared_memory`. – fagd Jul 24 '22 at 19:41
  • Just to add on, I don't think copy-on-write would be any use here. Variables are only shared between parent and child if they are not edited from within the child. However, because of how garbage collection works in python, every time you create a reference to the variable (or part of it, since its a numpy array here), you are incrementing the reference count for that object, which means it is being modified. So theoretically, a copy *should* be made if you intend to use the object in any meaningful way. – Charchit Agarwal Jul 24 '22 at 19:43
  • @fagd Can you see how e.g. https://gist.github.com/akx/d70ca59c2d209ab83eb36d4f56e5049b fares? It's using `imap_unordered` so you start getting results almost immediately, and `chunksize=8` so each process gets sent 8 jobs at a time for greater serialization efficiency. – AKX Jul 24 '22 at 19:43
  • 1
    @Charchit Yes, the page containing the object header will be copied when the refcount is touched, but that's probably pretty irrelevant if the array data is larger than the page size (e.g. 64KB). – AKX Jul 24 '22 at 19:44
  • @AKX. Thanks so much for writing this. The code is taking quite a while to run probably because `num=3000` is too big. Did you get back the result immediately? In my use case, I am trying to solve 3 (a small number of) linear equations with Big matrix. Let me try `imap_unordered` in my code. – fagd Jul 24 '22 at 19:51
  • Not quite immediately, but not all at once either. Depends on `num` and `N`, I guess... :) – AKX Jul 24 '22 at 19:56
  • @AKX. You know what, `imap_unordered` seems working! I will update my question once I confirmed it. Thanks! – fagd Jul 24 '22 at 19:57
  • @AKX. `imap_unordered` returns the `multiprocessing.pool.IMapUnorderedIterator object` object immediately even the run is not finished. The actual result will be returned after the run is done, and I see no advantage here. Let me know if I am wrong. I am actually convinced by the answer by Jerome Richard below, that there is no way to parallelize the use case since `np.linalg.solve` has been parallelized. – fagd Jul 24 '22 at 20:41

2 Answers2

3

np.linalg.solve should already be executed in parallel function implemented in LAPACK. In fact, this is the case on my (Linux + Windows) machine. Indeed, it calls LAPACK functions like dtrsm and dlaswp and the main computational function, dgemm, implemented in BLAS libraries. This last function should take >90% of the time and is heavily optimized and parallelized as long as you use a fast BLAS implementation. Numpy use OpenBLAS by default on most systems which is very good (and parallel). The Intel MKL is also a good alternative supporting LAPACK (certainly better on Intel hardware). If the computation is not parallel on your machine, this is a problem and you should check your BLAS implementation as it may be very inefficient.

The thing is parallelizing a code already parallel make it slower because running more threads than available core put a lot of pressure on the OS scheduler and the BLAS functions are not optimized for such a case. More specifically, profiling results shows parallelizing a parallel OpenBLAS function cause some synchronization function to wait for a while because of the work imbalance (certainly due to a static schedule of the computing kernels). This is the main source of slowdown of the approach 2/3/4 compared to the first sequential approach.

If you really want to parallelize the function you need to configure the BLAS so to use 1 thread (with OMP_NUM_THREADS=1 on OpenBLAS) but this is likely be less efficient than letting the BLAS does the parallelization. Indeed, BLAS makes use of optimized multi-threaded kernels (working in shared memory) meanwhile Python nearly prevent such design to be fast because of the global interpreter lock (GIL) in multi-threaded codes. Multi-threading is also limited by the overhead of pickling or the one of forking. That being said such overheads are small in the approach 2 and 3 (not in 4 certainly due to the queue). This generally is why Python is often not great for writing fast parallel applications (unless for long-lasting embarrassingly parallel works with small data transfers).

Here are average timing on my 6-core i5-9600KF machine with OpenBLAS:

Default OpenBLAS configuration:
- Approach 1:      0.129 seconds (stable)
- Approach 2:      1.623 seconds (highly-variable)
- Approach 3:      1.407 seconds (highly-variable)
- Approach 4:      1.532 seconds (highly-variable)

Sequential configuration of OpenBLAS:
- Approach 1:      0.338 seconds (stable)   <--- Now sequential
- Approach 2:      0.152 seconds (quite stable)
- Approach 3:      0.151 seconds (stable)
- Approach 4:      0.177 seconds (stable)   <--- >10% time in CPython overheads
- Parallel Numba:  0.144 seconds (stable)   <--- Multi-threaded

Note that the best speed up is pretty bad (ie. ~2.6) on a 6-core machine. I guess this might be because the computation is memory-bound (on my PC, the RAM throughput is about 20 GiB/s, while it can reach 32-35 GiB/s at most for such access pattern).

Jérôme Richard
  • 41,678
  • 6
  • 29
  • 59
  • 1
    Thanks. I think this makes sense in particular that np.linalg.solve should already be executed in parallel. After several trials, I am convinced that there should be no way to speed up further for my use case. I am gonna wait for a couple of hours, in case someone has better idea, before accepting this answer. – fagd Jul 24 '22 at 20:44
  • I think this answer is demonstratable-y correct in the sense that the cause for slowdown is trying to parallelize `np.linalg.solve`, and not because of any shenanigans of multiprocessing in python. You can check this by running the same code as `Approach 2` in the OP, but starting the pool with lesser number of workers. In my case, the lower the number of workers spawned, the greater was the speed of each run. – Charchit Agarwal Jul 24 '22 at 21:33
  • 1
    @Charchit Both are important. Actually, I first wrote a Numba parallel code that resulted to no speed up (actually a small performance decrease) and then discovered that it was due to the BLAS calls. The BLAS calls explains why the proposed approaches 2/3/4 cannot be faster but this does not explain why the code is much slower. On my machine the multiprocessing codes are >=10 times slower than the sequential while Numba succeed to be only 10%~20% time slower. This is certainly due to pickling (dependent of the number of workers) and more generally the overhead of multiprocessing. – Jérôme Richard Jul 25 '22 at 02:12
  • @Charchit Actually, when running the Numba code again I saw that the timings are very variables and I was lucky the first times. A low-level profiling shows that some unnamed optimized OpenBLAS functions are the source of the slow-down. I guess this is functions doing active waits appearing only in this case because of the load imbalance caused by the other threads/processes (this often happens with GOMP for example). The Python version has bigger kernel overheads (certainly due to the process) but not as big as the OpenBLAS suspicious functions. – Jérôme Richard Jul 25 '22 at 02:31
  • "mainly because Python is not great for writing fast parallel applications due to pickling and the global interpreter lock" - the GIL doesn't matter with multiprocessing (as is being done here) and pickling is also side-stepped by using fork to share the matrices. The rest I agree with - OP should check whether the non-MP version actually saturates their CPU(s). – AKX Jul 25 '22 at 07:29
  • @AKX I was thinking about multi-threading when writing about the GIL and multiprocessing about pickling. I reformulated this in a dedicated paragraph since it was not matching with the OP use-case. The GIL is not the problem in the OP code indeed. Pickling surprisingly turned out to not to be a problem in approaches 2/3 but is for approach 4. Pickling is used in all multiprocessing approaches of the OP though is is not necessary for (big) arrays, hence a variable overhead. Forking also introduce an overhead but it is small on my machine. Thank you for pointing this out. – Jérôme Richard Jul 25 '22 at 13:17
0

To answer this question, we need to talk about how multiprocessing works.

On UNIX-like platforms, multiprocessing uses the fork system call. This creates an almost perfect copy of the parent process, copying mat and y for you. In this case, sharing them doesn't make much sense because modern UNIX-like operating systems tend to use copy-on-write for memory pages where possible.

On platforms like macOS and ms-windows, it starts a new Python instance and imports your code into it by default. Although you can use fork on macOS. So there it wil re-create mat and y in every instance.

If you want to share data in this case, multiprocessing has several mechanisms for that, like Array and Manager. The latter should be able to share numpy arrays with some glue. And you have to keep in mind that there is some overhead associated with using them; their use case is geared toward modification of shared data, so it has mechanisms to deal with that. Which you don't need in your case.

Since you are using fork, I don't think that sharing the data will be much faster. And if the data is larger than a page of memory the operating system should use copy-on-write sharing, so it would not save you memory either.

As an alternative, you could write mat and y to a file in the parent process, and read them in the worker processes.

Or you could use a read-only mmap. But in that case you would still have to convert it to a numpy array.

Roland Smith
  • 42,427
  • 3
  • 64
  • 94
  • Erm. macOS is an UNIX-like platform; the `fork` start method is available there (but not the default method due to certain possible macOS library incompatibilities) and OP _is_ using it. – AKX Jul 24 '22 at 19:28
  • Yeah, I used the `fork` method for other reasons which could be lengthy to explain here. I thought about writing the `mat` and `y` onto a file and read them in child processes, but that should create larger time overhead than any of the above methods, I think. – fagd Jul 24 '22 at 19:32
  • @fagd See updated answer. – Roland Smith Jul 24 '22 at 19:41
  • 1
    Manager can support any datatype given that the data it contains is picklable. Relevant link: https://stackoverflow.com/a/72901934/16310741 – Charchit Agarwal Jul 24 '22 at 19:44