3

I have df_fruits, which is a dataframe of fruits.

index      name
1          apple
2          banana
3          strawberry

and, its market prices are in mysql database like below,

category      market      price
apple         A           1.0
apple         B           1.5
banana        A           1.2
banana        A           3.0
apple         C           1.8
strawberry    B           2.7        
...

During the iteration in df_fruits, I'd like to do some processes.

The code below is a non-parallel version.

def process(fruit):
   # make DB connection
   # fetch the prices of fruit from database
   # do some processing with fetched data, which takes a long time
   # insert the result into DB
   # close DB connection

for idx, f in df_fruits.iterrows():
    process(f)

What I want to do is to do process on each row in df_fruits in parallel, since df_fruits has plenty of rows and the table size of the market prices is quite large (fetching data takes a long time).

As you can see, the order of execution between rows does not matter and there's no sharing data.

Within iteration in df_fruits, I'm confused about where to locate `pool.map(). Do I need to split the rows before parallel execution and distribute chunks to each process? (If so, a process which finished its job earlier than other process would be idle?)

I've researched of pandarallel but I can't use it (my os is windows).

Any help would be appreciated.

Shaido
  • 27,497
  • 23
  • 70
  • 73
cointreau
  • 864
  • 1
  • 10
  • 21
  • How big is the data? – Yash Nov 10 '20 at 06:27
  • @Yash well I can't exactly say it but just fetching market prices sometimes take over than several minutes. the problem is not only the size of the data but also poor structure of database(this makes fetching takes longer time). – cointreau Nov 10 '20 at 06:33

3 Answers3

3

There is no need to use pandas at all. You can simply use Pool from the multiprocessing package. Pool.map() takes two inputs: one function and one list of values.

So you can do:

from multiprocessing import Pool

n = 5  # Any number of threads
with Pool(n) as p:
    p.map(process, df_fruits['name'].values)

This will go through all fruits in the df_fruits dataframe one-by-one. Note that there is no result returned here since the process function is designed to write the result back to the database.


If you have multiple columns that you want to consider in each row, you can change df_fruits['name'].values to:

df_fruits[cols].to_dict('records')

this will give a dictionary as the input to preprocess, e.g.:

{'name': 'apple', 'index': 1, ...}
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Thank you for answer. in `process` function I need to access other column values for processing. Can I access to the other columns in `process` though I passed only `df_fruits['name'].values` as param? – cointreau Nov 10 '20 at 06:50
  • 1
    @W.Cointreau: No, you can't. However, instead of using `df_fruits['name'].values`, you can use `df_fruits[cols].values` (with `cols` being the columns you want to consider). Or you can simply use `df_fruits.values`. This will give you the values as a list. Usually it's easier to use a dictionary, so you can try with `df_fruits.to_dict('records')` instead. – Shaido Nov 10 '20 at 06:51
  • 1
    Since `process` function has more parameters, I referred to https://stackoverflow.com/questions/25553919/passing-multiple-parameters-to-pool-map-function-in-python as well. Finally, I needed to use `partial` like `func = partial(process, param1, param2 ...)` and then `pool.map(func, df_fruits.to_dict('records')`. Thanks a lot! – cointreau Nov 10 '20 at 08:32
  • @Shaido what is the procedure if the function has two argument and there exist two columns to be refereed to function – RF1991 Jun 28 '23 at 08:28
  • @RF1991: You can change the last line to use `starmap` instead: `p.starmap(process, df_fruits[cols].values)`. – Shaido Jun 28 '23 at 09:12
  • @Shaido you mean `p.starmap(process, df_fruits['index','name'].values)` ? – RF1991 Jun 28 '23 at 09:36
  • @RF1991: Close, you need to use `p.starmap(process, df_fruits[['index', 'name']].values)` (note the double `[[` and `]]`). No problems~ – Shaido Jun 28 '23 at 09:43
  • @Shaido I appreciate the help best regards – RF1991 Jun 28 '23 at 09:52
1

Yeah its possible, although not really provided in the pandas library straight out of the box.

Maybe you can attempt something like this:

def do_parallel_stuff_on_dataframe(df, fn_to_execute, num_cores):
    # create a pool for multiprocessing
    pool = Pool(num_cores)

    # split your dataframe to execute on these pools
    splitted_df = np.array_split(df, num_cores)

    # execute in parallel:
    split_df_results = pool.map(fn_to_execute, splitted_df)

    #combine your results
    df = pd.concat(split_df_results)

    pool.close()
    pool.join()
    return df
Serial Lazer
  • 1,667
  • 1
  • 7
  • 15
1

You might to be able do something like:

with Pool() as pool:
    # create an iterator that just gives you the fruit and not the idex
    rows = (f for _, f in df_fruits.iterrows())
    pool.imap(process, rows)

You may want to use one of the other pool primitives other than map if you don't care for the results, or are willing to get the results in any order or don't care about the results.

Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
  • ahh, I've never heard of `imap`. I found some knowledges from here: https://stackoverflow.com/questions/11338044/python-multiprocessing-whats-the-difference-between-map-and-imap/11338089. Thank you. – cointreau Nov 10 '20 at 06:58
  • Actually, I just tried an experiment, and `map` seemed to be faster than `imap` for a fast operation on 100,000 elements. All of them allow iterators as the argument. Experiment and see which one works best for you. Of course, we don't know if you need the results of process or not. – Frank Yellin Nov 10 '20 at 18:14
  • OK, I'll try it. I expect `imap` can be faster than `map` since fetching data in `process` is the main bottleneck and each job does not influence to other. When I tried with `map` with several workers, it seems like some of them wait each other to finish though they don't need to wait... – cointreau Nov 12 '20 at 00:36