I have a dataframe with one of the columns as a list and another column as a dictionary. However, this is not consistent. It could be a single element or NULL too
df = pd.DataFrame({'item_id':[1,1,1,2,3,4,4],
'shop_id':['S1','S2','S3','S2','S3','S1','S2'],
'price_list':["{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}",'50','NaN',"{'10':['S1','S2','S3'],'25':['S4']}","{'10':['S1','S2','S3'],'25':['S4']}"]})
+---------+---------+--------------------------------------------------+
| item_id | shop_id | price_list |
+---------+---------+--------------------------------------------------+
| 1 | S1 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 1 | S2 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 1 | S3 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 2 | S2 | 50 |
| 3 | S3 | NaN |
| 4 | S1 | {'10': ['S1', 'S2', 'S3'], '25': ['S4']} |
| 4 | S2 | {'10': ['S1', 'S2', 'S3'], '25': ['S4']} |
+---------+---------+--------------------------------------------------+
I would like this to be expanded as this:
+---------+---------+-------+
| item_id | shop_id | price |
+---------+---------+-------+
| 1 | S1 | 10 |
| 1 | S2 | 10 |
| 1 | S3 | 20 |
| 2 | S2 | 50 |
| 3 | S3 | NaN |
| 4 | S1 | 10 |
| 4 | S2 | 10 |
+---------+---------+-------+
I have tried with apply :
def get_price(row):
if row['price_list'][0]=='{':
prices = eval(row['price_list'])
for key,value in prices.items():
if str(row['shop_id']) in value:
price = key
break
price = np.nan
else:
price = row["price_list"]
return price
df['price'] = df.apply(lambda row: get_price(row),axis=1)
(The dictionary elements in the price_list column are actually strings, so I need them to be evaluated as dicts first.)
But the above approach takes a lot of time since my dataframe is pretty large.
Hence I tried using multiprocessing. My approach using multiprocessing is as below:
def get_price(row):
if row['price_list'][0]=='{':
prices = eval(row['price_list'])
for key,value in prices.items():
if str(row['shop_id']) in value:
price = key
break
price = np.nan
else:
price = row["price_list"]
return price
def parallelize(data, get_price, num_of_processes):
data_split = np.array_split(data, num_of_processes)
pool = mp.Pool(num_of_processes)
data = pd.concat(pool.map(get_price, data_split))
pool.close()
pool.join()
return data
def run_on_subset(get_price, data_subset):
data_subset['price'] = data_subset.apply(get_price, axis=1)
return data_subset
def parallelize_on_rows(data, get_price, num_of_processes):
return parallelize(data, partial(run_on_subset, get_price), num_of_processes)
df = parallelize_on_rows(df,get_price,num_processes)
Now, while I used to run apply on a single core, it will go all the way through. But after using multiprocessing with all 4 cores I get out of memory error and the kernel dies.
I have 16Gb of RAM and 4 cores. When the script starts, I'm already using 8Gb. I'm running 64-bit Python 3.6.
I'm running linux and just using Pool from multiprocessing.
How can I run the script all the way through using multiprocessing without using fewer cores?