1

I want to use Pool to split a task among n workers. What happens is that when I'm using map with one argument in the task function, I observe that all the cores are used, all tasks are launched simultaneously.

On the other hand, when I'm using starmap, task launch is one by one and I never reach 100% CPU load.

I want to use starmap for my case because I want to pass a second argument, but there's no use if it doesn't take advantage of multiprocessing.

This is the code that works

import numpy as np
from multiprocessing import Pool

# df_a = just a pandas dataframe which I split in n parts and I 
#        feed each part to a task. Each one may have a few 
#        thousand rows
n_jobs = 16

def run_parallel(df_a):
    dfs_a = np.array_split(df_a, n_jobs)
    print("done split")
    pool = Pool(n_jobs)
    result = pool.map(task_function, dfs_a)
    return result

def task_function(left_df):
    print("in task function")
    # execute task...
    return result

result = run_parallel(df_a)

in this case, "in task function" is printed at the same time, 16 times.

This is the code that doesn't work

n_jobs = 16

# df_b: a big pandas dataframe (~1.7M rows, ~20 columns) which I 
#        want to send to each task as is

def run_parallel(df_a, df_b):
    dfs_a = np.array_split(df_a, n_jobs)
    print("done split")
    pool = Pool(n_jobs)
    result = pool.starmap(task_function, zip(dfs_a, repeat(df_b)))
    return result

def task_function(left_df, right_df):
    print("in task function")
    # execute task
    return result

result = run_parallel(df_a, df_b)

Here, "in task function" is printed sequentially and the processors never reach 100% capacity. I also tried workarounds based on this answer: https://stackoverflow.com/a/5443941/6941970

but no luck. Even when I used map in this way:

from functools import partial

pool.map(partial(task_function, b=df_b), dfs_a)

considering that maybe repeat(*very big df*) would introduce memory issues, still there wasn't any real parallelization

tzoukritzou
  • 337
  • 1
  • 4
  • 16
  • This code is not a reproducible example. Edit it to include the import statements and a sample `df_a` and `df_b` – Charchit Agarwal Aug 04 '22 at 08:05
  • 1
    `starmap` is basically identical to `map` except for the argument unpacking, so if one works, and the other doesn't, there's something else going on. – Aaron Aug 04 '22 at 16:32
  • 1
    also, depending on the size of your `df's`, you could just be spending a bunch of time making a copy and sending each of your tables to each child process. It can sometimes be faster to open a df as a mmaped file from each child process, and only pass the indices of data you want to work on. This can drastically reduce how much data needs to be copied, pickled, and sent to the child. – Aaron Aug 04 '22 at 16:39

0 Answers0