I have tried using this question to answer my problem, but I haven't had any success.
I'm using Python 3.10.
My dictionary
is structured like this (where each list of string is a review of the product):
{storeNameA : {productA : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string],
productB : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string],
...,
product_n : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string]},
storeNameB : {productA : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string],
productB : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string],
...,
product_n : 0 [string, string, ..., string]
1 [string, string, ..., string]
2 [string, string, ..., string]
...
n [string, string, ..., string]}}
So I would access a single 'review' like dictionary['storeNameA']['productB'][0]
or dictionary['storeNameB']['productB'][2]
. Each product
is the same in each store.
I am trying to perform a process on each review across the entire dictionary. I can perform this sucessfuly in an iterative manner with this code:
def mapAllValues(nestedDict, func):
return {storeName: {product: func(prodFile) for product, prodFile in storeDict.items()} for storeName, storeDict in nestedDict.items()}
new_dictionary = mapAllValues(dictionary, lambda reviews: reviews.apply(processFunction))
# processFunction takes a list of string and returns a list of tuples.
# So I end up with a new dictionary where there is now a list of tuples, where there was a list of string.
# {storeName : {product : 0 [(str, str), (str, str), ..., (str, str)] and so on...
It's a pretty long dictionary, and takes ~606 seconds to complete.
So, I have tried to implement a way to run this in parallel, but it's obviously not working as I expect it to because that runs in ~2170 seconds. I do get the right output though.
My question is, what am I doing wrong in the following code please? Can anyone provide me a solution to this problem?
manager = multiprocessing.Manager()
container = manager.dict()
d = manager.dict(dictionary)
container = manager.dict()
for key in d:
container[key] = manager.dict()
for key in d['storeNameA']:
container['storeNameA'][key] = manager.dict()
for key in d['storeNameB']:
container['storeNameB'][key] = manager.dict()
with multiprocessing.Pool() as pool:
pool.starmap(processFunction, [('storeNameA', product, d, container) for product in d['storeNameA']], chunksize=round(42739 / multiprocessing.cpu_count()))
pool.starmap(processFunction, [('storeNameB', product, d, container) for product in d['storeNameB']], chunksize=round(198560 / multiprocessing.cpu_count()))
new_dictionary = dict(container)
I'm sure I'm misunderstanding how this is actually working, but as I see it it should be chunking each product from each store and parellising those?
Anyway, I think I've explained it as well as I can. If I need to clarify anything, please let me know.
Thank you in advance!