4

So I've looked at both the documentation of the multiprocessing module, and also at the other questions asked here, and none seem to be similar to my case, hence I started a new question.

For simplicity, I have a piece of code of the form:

# simple dataframe of some users and their properties.
data = {'userId': [1, 2, 3, 4],
        'property': [12, 11, 13, 43]}
df = pd.DataFrame.from_dict(data)

# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)

# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
    df1 = df.userId.isin(permutation[0])
    df2 = df.userId.isin(permutation[1])
    user_dict[permutation[0]] += permutation[1]
    return user_dict


# and finally a loop: 
user_dict = defaultdict(int)
for permutation in user_perm:
    user_dict = comp_rel(df, permutation, user_dict)    

I know this code makes very little (if any) sense right now, but I just wrote a small example that is close to the structure of the actual code that I am working on. That user_dict should finally contain userIds and some value.

I have the actual code, and it works fine, gives the correct dict and everything, but... it runs on a single thread. And it's painfully slow, keeping in mind that I have another 15 threads totally free.

My question is, how can I use the multiprocessing module of python to change the last for loop, and be able to run on all threads/cores available? I looked at the documentation, it's not very easy to understand.

EDIT: I am trying to use pool as:

p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()

however this breaks because I am using the line :

user_dict = comp_rel(df, permutation, user_dict) 

in the initial code, and I don't know how these dictionaries should be merged after pool is done.

tzaman
  • 46,925
  • 11
  • 90
  • 115
Qubix
  • 4,161
  • 7
  • 36
  • 73
  • You definitely need to know about [GIL](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). – Olvin Roght Feb 04 '20 at 09:51
  • @OlvinRoght I know there is some lock, but this also says that : However, some extension modules, either standard or third-party, are designed so as to release the GIL when doing computationally-intensive tasks such as compression or hashing. Multiprocessing seems to be such a module. – Qubix Feb 04 '20 at 09:53
  • @OlvinRoght assuming he does use `multiprocessing`, that would not be a real issue (despite him saying he has 15 "threads" free, he means cores) – GPhilo Feb 04 '20 at 09:53
  • @GPhilo, my machine has 4 cores with 4 threads each, as far as I know. If I use htop, I see 16 free "threads". Are we talking threads or cores? – Qubix Feb 04 '20 at 09:54
  • The examples in the multiprocessing modules show how you can do that: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool You can use a pool for firing off each invocation of `comp_rel`. Since you're firing off multiple python processes, the GIL will not be an issue. – rdas Feb 04 '20 at 09:54
  • @rdas , pool is precisely the function I was looking at in the docs, however I did not manage to use it properly, it seems, as my code does not run. Could you please copy-paste my above code and add pool and whatever else, in an answer, so I can try and run it? – Qubix Feb 04 '20 at 09:56
  • @Qubix a thread, similarly to a process, is an OS concept. Your machine has 4 cores that use a version of "hyper-threading" (that's the intel name, can't remember the proper name of the technique right now) to expose 16 "virtual" cores. Threads are a different thing. The GIL is concerned with the OS threads, nothing to do with virtual CPUs. For all practical purposes, you can imagine your machine has 16 cores (won't be as performant as a real 16 cores, but we're talking details now) – GPhilo Feb 04 '20 at 09:56
  • As per an example of using pool: https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments – GPhilo Feb 04 '20 at 09:57
  • @GPhilo oh, thanks for making that clear :) – Qubix Feb 04 '20 at 09:57
  • Not working yet, could anyone please post an answer? – Qubix Feb 04 '20 at 10:07

2 Answers2

5

After short discussion in comments I've decided to post solution using ProcessPoolExecutor:

import concurrent.futures
from collections import defaultdict

def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(comp_rel, df, perm): perm for perm in user_perm}
    for future in concurrent.futures.as_completed(futures):
        try:
            k, v = future.result()
        except Exception as e:
            print(f"{futures[future]} throws {e}")
        else:
            user_dict[k] += v

It works same as @tzaman, but it gives you possibility to handle exceptions. Also there're more interesting features in this module, check docs.

Qubix
  • 4,161
  • 7
  • 36
  • 73
Olvin Roght
  • 7,677
  • 2
  • 16
  • 35
  • throws comp_rel() missing 1 required positional argument: 'user_dict', is there a way to feed the user dict there as in the original code? – Qubix Feb 04 '20 at 13:47
  • @Qubix, quote from docs "If `max_workers` is `None` or not given, it will default to the number of processors on the machine." – Olvin Roght Feb 04 '20 at 13:52
  • Thanks, but I have the problem that comp_rel needs to take the df, perm and user_dict arguments. How can I add this in the executor.submit line? – Qubix Feb 04 '20 at 13:54
  • @Qubix, just add one more positional argument `executor.submit(comp_rel, df, perm, user_dict)`. – Olvin Roght Feb 04 '20 at 13:54
  • I did, but now I get exceptions for all permutations, of the form: throws too many values to unpack (expected 2). – Qubix Feb 04 '20 at 13:58
  • 1
    @Qubix, `future.result()` returns exactly the same as `comp_rel()`. In example function returns 2 values, that's why I unpacked it `k, v = future.result()`. If you function has different return - you should patch code – Olvin Roght Feb 04 '20 at 14:01
  • @Qubix, but I would recommend you to patch function return as in example and don't patch dict in threads. – Olvin Roght Feb 04 '20 at 14:05
2

There are two parts to your comp_rel which need to be separated - first is the calculation itself which is computing some value for some userID. The second is the "accumulation" step which is adding that value to the user_dict result.

You can separate the calculation itself so that it returns a tuple of (id, value) and farm it out with multiprocessing, then accumulate the results afterwards in the main thread:

from multiprocessing import Pool
from functools import partial
from collections import defaultdict

# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
    ...
    return perm[0], perm[1]

comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
    results = pool.map(comp_with_df, user_perm)

# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
    user_dict[k] += v

Alternatively you could also pass a Manager().dict() object into the processing function directly, but that's a little more complicated and likely won't get you any additional speed.


Based on @Masklinn's suggestion, here's a slightly better way to do it to avoid memory overhead:

user_dict = defaultdict(int)
with Pool(None) as pool:
    for k, v in pool.imap_unordered(comp_with_df, user_perm):
        user_dict[k] += v

This way we add up the results as they complete, instead of having to store them all in an intermediate list first.

Olvin Roght
  • 7,677
  • 2
  • 16
  • 35
tzaman
  • 46,925
  • 11
  • 90
  • 115
  • 1
    Also since the order of the result doesn't seem to matter at all, you probably want to use `imap_unordered`, and do the accumulation within the pool. That way you can consume results as they're produced and python doesn't need a large resequencing buffer to return elements in-order. – Masklinn Feb 04 '20 at 11:24
  • @Masklinn [`ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) with [`as_completed()`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed) could be a bit more "nice" option. – Olvin Roght Feb 04 '20 at 11:56
  • It seems like a much lower-level process: with imap_unordered there's a single pass of userland code, with as_completed you have to first submit all the tasks, possibly keep track of the futures that returns, then process the futures you get from as_completed. – Masklinn Feb 04 '20 at 12:01
  • @Masklinn actually it's 2 lines of code totally. Also it doesn't require "hacks" like `partitial()` and you can consume results immediately after processing finished. – Olvin Roght Feb 04 '20 at 12:47
  • 1
    "actually it's 2 lines of code totally." which is double the LOCs of imap_unordered. "Also it doesn't require "hacks" like partitial()" partial is a hack in literally no sense of the word. "and you can consume results immediately after processing finished" so... like imap_unordered except still not as good? – Masklinn Feb 04 '20 at 12:53
  • @Masklinn, performance is same, code is shorter, why not as good?) – Olvin Roght Feb 04 '20 at 13:01
  • @Masklinn Agreed with all your points; I've edited my answer to include your method. Thanks for the suggestion! – tzaman Feb 04 '20 at 13:01
  • @OlvinRoght code is longer not shorter, and it's not as good because it's more complex for no gain or point and OP uses multiprocessing already – Masklinn Feb 04 '20 at 13:06
  • one problem, user_dict is also an argument of comp_rel, how can I add this to the code? – Qubix Feb 04 '20 at 13:55
  • 1
    @Qubix the whole point of this answer is to **not** have `user_dict` be an argument. You return only the results of each individual computation, and then create the dict in the main thread. – tzaman Feb 04 '20 at 14:34