0

Python and GitHub/stackoverflow newbie here, trying to speed up my workflow in Python using joblib and multiprocessing for the first time.

I have defined an empty OrderedDict to store the DataFrames generated by a function (my_function). The function takes in the elements of the column of a separate DataFrame, performs some operations, and is supposed to return the (hopefully filled) OrderedDict and another DataFrame.

Allow me to provide some pseudo-code to explain this:

from joblib import Parallel, delayed
from collections import OrderedDict
from tqdm import tqdm

import pandas as pd
import multiprocessing

my_dict = OrderedDict()
my_df = DataFrameofvalues

def my_function(k):

  my_dict[k] = someoperationswithpandasresultinginDataFrames
  
  my_df = someooperationswithpandas
  
  return my_dict, my_df
  
num_cores = multiprocessing.cpu_count()
inputs = tqdm(my_df['my_column'])

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

This results in the following error:

  File "<ipython-input-52-df771b916ba5>", line 8, in <module>
    my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k) for k in inputs)

ValueError: too many values to unpack (expected 2)

I think I've overlooked something minor, but I just can't find it. Could somebody perhaps take a look and help me out?

I haven't been able to find much online on how to figure out how many values my function is trying to unpack, exactly (I'm guessing the number of elements in inputs?) or if it's giving me all the DataFrames that are supposed to be going into the OrderedDict, all at once.

Thanks, much appreciated!

Edit based on further troubleshooting:

I think I know where the problem is coming from: the function is iterating through inputs and simply generating dataframes, which it then cannot put together as a dict, which it expects. I figured this out by setting inputs = tqdm(my_df.loc[0:1, 'my_column']). It works when I do this, but fails to unpack if I set it to inputs = tqdm(my_df.loc[0:2, 'my_column']). No solutions thus far though.

whackerama
  • 11
  • 4

3 Answers3

1

I believe it's syntax related. You're not giving the parameters in a correct way to your functions. You could try to split the final line into smaller pieces to figure out which part breaks.

Also, this is not proper list comprehension:

(delayed(my_function)(k) for k in inputs)

Maybe you want this:

[delayed(my_function(k)) for k in inputs]

Hope this helps you. Good luck!

Joep
  • 788
  • 2
  • 8
  • 23
  • Thanks Joep, I tried out your solution and it gave me a SyntaxError. I think I know where the problem is coming from: the function is iterating through inputs and simply generating dataframes, which it then cannot put together as a dict. I figured this out by setting inputs = tqdm(my_df.loc[0:1, 'my_column']). It works when I do this, but fails to unpack if I set it to inputs = tqdm(my_df.loc[0:2, 'my_column']) – whackerama May 08 '21 at 12:16
  • That's right, the solution I presented is not complete. I'm trying to guide you in the right direction. Please Google how to use function parameters in Python. Also, you can't execute two function next to each other in one line (e.g., `func1()func2()`), you need to place them on separate lines. – Joep May 08 '21 at 12:22
1

Figured out how to get what I wanted and thought I'd share.

The following snippet of pseudo-code:

if __name__ == '__main__':
  my_dict, my_df = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

was actually giving me a list of dataframes. I changed it to:

if __name__ == '__main__':
  my_list = Parallel(n_jobs = num_cores)(delayed(my_function)(k for k in inputs)

for i in range(len(my_list)):
   if len(results[i]) > 0:
        my_list[i] = my_list[i].reset_index(drop = True)            
        my_dict[str(my_list[i].loc[0,'col1'])] = my_list[i]

which now returns a dictionary of dataframes. Not exactly what I was looking for in the first place, but for my purposes, even better.

whackerama
  • 11
  • 4
0

TL;DR:

#Python3

from multiprocessing import Process, Manager
from collections import OrderedDict


def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = {'1st df': 'result_df_1',
                    '2nd df': 'result_df_2'}
    return


if __name__ == "__main__":
    # whatever your inputs are
    inputs = [x for x in range(4)]

    manager = Manager()
    global_dict = manager.dict()
    job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
    _ = [p.start() for p in job]
    _ = [p.join() for p in job]

    [print(f"{x}") for x in global_dict.items()]
    
    # N.B assumes numeric (sortable) keys:
    # dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
    ordered_global_dict = OrderedDict(sorted(global_dict.items(), key=lambda t: t[0]))
    print(ordered_global_dict.items())

    # accessing the results dataframe from the dict
    results_df_01 = ordered_global_dict[0]['results_df_1']
    results_df_02 = ordered_global_dict[0]['results_df_2']

#output:
odict_items([(0, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (1, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (2, {'1st df': 'result_df_1', '2nd df': 'result_df_2'}), (3, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})])


Explanation

Good question, though it's a little ambiguous what you're trying to achieve. For instance you are returning a my_df from each process but mistakenly assigning all of the output dataframes to a single variable: my_dict, my_df = Parallel(....

Going from what I understand, I will answer as though you would need my_function to update a global dictionary of the form: {key: {secondary_key: <dataframe>}}.

Let's walk through this bit by bit. Here's an answer I found from a related question on S.O which I've based my answer on:

You are incurring a good amount of overhead in (1) starting up each process, and (2) having to copy the pandas.DataFrame (and etc) across several processes. If you just need to have a dict filled in parallel, I'd suggest using a shared memory dict. If no key will be overwritten, then it's easy and you don't have to worry about locks.

Here's the solution they provided:

>>> from multiprocess import Process, Manager
>>> 
>>> def f(d, x):
...   d[x] = x**2
... 
>>> manager = Manager()
>>> d = manager.dict()
>>> job = [Process(target=f, args=(d, i)) for i in range(5)]
>>> _ = [p.start() for p in job]
>>> _ = [p.join() for p in job]
>>> print d
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

With a small amount of tinkering, we can easily repurpose this code for your use.

def f(d, x):
    d[x] = x**2

Becomes (Using your variable names).

def update_dict(my_dict, key):
    # Insert your DataFrame calculations here!
    my_dict[key] = {'1st df': 'result_df_1',
                    '2nd df': 'result_df_2'}
    return
 

And the remaining code becomes:

inputs = [x for x in range(4)]

manager = Manager()
global_dict = manager.dict()
job = [Process(target=update_dict, args=(global_dict, _input)) for _input in inputs]
_ = [p.start() for p in job]
_ = [p.join() for p in job]
[print(f"{x}") for x in global_dict.items()]

Which prints as:

#python3.9
(1, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(0, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(2, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})
(3, {'1st df': 'result_df_1', '2nd df': 'result_df_2'})

Note that the dictionary here is unordered. The final step will be for us to order your dict. Assuming your keys are integers, you can use:

from collections import OrderedDict
# dictionary sorted by key -- OrderedDict(sorted(d.items()) also works
ordered_global_dict = OrderedDict(sorted(global_dict.items())
Joe Boyle
  • 169
  • 2
  • 7