0

I have a dataframe with one of the columns as a list and another column as a dictionary. However, this is not consistent. It could be a single element or NULL too

df = pd.DataFrame({'item_id':[1,1,1,2,3,4,4],
'shop_id':['S1','S2','S3','S2','S3','S1','S2'], 
'price_list':["{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}",'50','NaN',"{'10':['S1','S2','S3'],'25':['S4']}","{'10':['S1','S2','S3'],'25':['S4']}"]})


+---------+---------+--------------------------------------------------+
| item_id | shop_id |                      price_list                  |
+---------+---------+--------------------------------------------------+
|       1 | S1      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       1 | S2      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       1 | S3      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       2 | S2      | 50                                               |
|       3 | S3      | NaN                                              |
|       4 | S1      | {'10': ['S1', 'S2', 'S3'], '25': ['S4']}         |
|       4 | S2      | {'10': ['S1', 'S2', 'S3'], '25': ['S4']}         |
+---------+---------+--------------------------------------------------+

I would like this to be expanded as this:

+---------+---------+-------+
| item_id | shop_id | price |
+---------+---------+-------+
|       1 | S1      | 10    |
|       1 | S2      | 10    |
|       1 | S3      | 20    |
|       2 | S2      | 50    |
|       3 | S3      | NaN   |
|       4 | S1      | 10    |
|       4 | S2      | 10    |
+---------+---------+-------+

I have tried with apply :

def get_price(row):
    if row['price_list'][0]=='{':
        prices = eval(row['price_list'])
        for key,value in prices.items():
            if str(row['shop_id']) in value:
                price = key
                break
            price =  np.nan
    else:
        price =  row["price_list"]
    return price


df['price'] = df.apply(lambda row: get_price(row),axis=1)

(The dictionary elements in the price_list column are actually strings, so I need them to be evaluated as dicts first.)

But the above approach takes a lot of time since my dataframe is pretty large.

Hence I tried using multiprocessing. My approach using multiprocessing is as below:

def get_price(row):
    if row['price_list'][0]=='{':
        prices = eval(row['price_list'])
        for key,value in prices.items():
            if str(row['shop_id']) in value:
                price = key
                break
            price =  np.nan
    else:
        price =  row["price_list"]
    return price

def parallelize(data, get_price, num_of_processes):
    data_split = np.array_split(data, num_of_processes)
    pool = mp.Pool(num_of_processes)
    data = pd.concat(pool.map(get_price, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(get_price, data_subset):
    data_subset['price'] = data_subset.apply(get_price, axis=1)
    return data_subset

def parallelize_on_rows(data, get_price, num_of_processes):
    return parallelize(data, partial(run_on_subset, get_price), num_of_processes)

df = parallelize_on_rows(df,get_price,num_processes) 

Now, while I used to run apply on a single core, it will go all the way through. But after using multiprocessing with all 4 cores I get out of memory error and the kernel dies.

I have 16Gb of RAM and 4 cores. When the script starts, I'm already using 8Gb. I'm running 64-bit Python 3.6.

I'm running linux and just using Pool from multiprocessing.

How can I run the script all the way through using multiprocessing without using fewer cores?

charlie_boy
  • 81
  • 13
  • Your dataframe is being copied into each process. You can try to figure out a way to share state, but that is not trivial in multiprocessing. – juanpa.arrivillaga Dec 08 '22 at 21:11
  • 2
    as an aside, you should just considering converting this tot a list comprehension, but the fundamental problem is that your operation is super slow because of the way your data is organized. having to `eval` millions of things is not going to be performant, the fact that you are reaching for `eval` at all is a problem – juanpa.arrivillaga Dec 08 '22 at 21:13
  • And even more fundamentally, the way you've organized your data require *a linear scan*. So, you should have some data structure that is *indexed/keyed by product*, then you just query that data structure with the product. This could literally just be a giant `dict`. Just throwing more cores is probably not the first way I would handle performance here – juanpa.arrivillaga Dec 08 '22 at 21:21
  • on the topic of performance [json.loads is faster than eval for reading string dictionaries](https://stackoverflow.com/a/20276991/15649230) – Ahmed AEK Dec 08 '22 at 21:24

1 Answers1

0

instead of splitting the input to num_of_processes consider splitting it to like 10-20 times that number, this way only 5-10% of the data gets copied to the other process at one time, also consider fixing the chunksize parameter of map as it takes a value that depends on the number of inputs by default, which may not be 1, so you risk sending more data than you intend to the other process.

def parallelize(data, get_price, num_of_processes):
    data_split = np.array_split(data, num_of_processes*20)
    pool = mp.Pool(num_of_processes)
    data = pd.concat(pool.map(get_price, data_split, chunksize=1))
    pool.close()
    pool.join()
    return data
Ahmed AEK
  • 8,584
  • 2
  • 7
  • 23