2

I have tried using this question to answer my problem, but I haven't had any success.

I'm using Python 3.10.

My dictionary is structured like this (where each list of string is a review of the product):

{storeNameA : {productA : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string], 
               productB : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string],
               ...,
               product_n : 0 [string, string, ..., string]
                           1 [string, string, ..., string]
                           2 [string, string, ..., string]
                           ...
                           n [string, string, ..., string]},
 storeNameB : {productA : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string], 
               productB : 0 [string, string, ..., string]
                          1 [string, string, ..., string]
                          2 [string, string, ..., string]
                          ...
                          n [string, string, ..., string],
               ...,
               product_n : 0 [string, string, ..., string]
                           1 [string, string, ..., string]
                           2 [string, string, ..., string]
                           ...
                           n [string, string, ..., string]}}

So I would access a single 'review' like dictionary['storeNameA']['productB'][0]
or dictionary['storeNameB']['productB'][2]. Each product is the same in each store.

I am trying to perform a process on each review across the entire dictionary. I can perform this sucessfuly in an iterative manner with this code:

def mapAllValues(nestedDict, func):
    return {storeName: {product: func(prodFile) for product, prodFile in storeDict.items()} for storeName, storeDict in nestedDict.items()}

new_dictionary = mapAllValues(dictionary, lambda reviews: reviews.apply(processFunction))
# processFunction takes a list of string and returns a list of tuples.
# So I end up with a new dictionary where there is now a list of tuples, where there was a list of string.
# {storeName : {product : 0 [(str, str), (str, str), ..., (str, str)]    and so on...

It's a pretty long dictionary, and takes ~606 seconds to complete.
So, I have tried to implement a way to run this in parallel, but it's obviously not working as I expect it to because that runs in ~2170 seconds. I do get the right output though.

My question is, what am I doing wrong in the following code please? Can anyone provide me a solution to this problem?

manager = multiprocessing.Manager()
container = manager.dict()
    d = manager.dict(dictionary)
    container = manager.dict()
    for key in d:
        container[key] = manager.dict()
    for key in d['storeNameA']:
        container['storeNameA'][key] = manager.dict()
    for key in d['storeNameB']:
        container['storeNameB'][key] = manager.dict()
    
    with multiprocessing.Pool() as pool:
        pool.starmap(processFunction, [('storeNameA', product, d, container) for product in d['storeNameA']], chunksize=round(42739 / multiprocessing.cpu_count()))
        pool.starmap(processFunction, [('storeNameB', product, d, container) for product in d['storeNameB']], chunksize=round(198560 / multiprocessing.cpu_count()))
    
new_dictionary = dict(container)

I'm sure I'm misunderstanding how this is actually working, but as I see it it should be chunking each product from each store and parellising those?

Anyway, I think I've explained it as well as I can. If I need to clarify anything, please let me know.
Thank you in advance!

Kav
  • 25
  • 4
  • 1
    chunksize is a parameter on how many "tasks" are sent to each worker at a time. If you have a chunk size too large, a worker may have a large amount of work to do while another may not have any, because the one worker "took all the tasks". If the chunksize is too small, you spend more of the time sending tasks back and forth (more overhead). If you omit the chunksize parameter, python will attempt to ensure the work is split up such that each worker ends up getting 4-5 chunks total. This is usually a good starting point. Your given chunksizes look suspiciously specific, and suspiciously large. – Aaron Jul 29 '22 at 20:21
  • I will also point out that `manager` objects are not particularly fast, and compared to a single threaded program using a normal `dict`, you're loosing a lot due to the overhead of worker processes communicating with the manager process to read / write to your dictionaries... When using multiprocessing you should be very judicious with what data is shared / synchronized between multiple processes. `shared_memory` is much more restricted than managed objects, but it is much faster for example. – Aaron Jul 29 '22 at 20:25
  • @Aaron Thanks for your comments. The chunksizes are based on how many iterations each `storeName` will roughly have. It was a starting point to take that and divide it by the amount of cpu. The use of the `manager` was taken from the linked question I tried using. Would you be able to suggest some code, or even pseudocode as to your suggestion in your second comment please? – Kav Jul 29 '22 at 20:50
  • Note that managers use threading internally to execute commands passed to the managed object. So if you attempt to access a value from the managed dictionary from two different processes at the same time, they will be executed sequentially, not parallelly. – Charchit Agarwal Jul 29 '22 at 21:33
  • Also, you should add a **complete** reproducible sample here. So, include a sample `dictionary`, with fake data if you must, in the code where you tried to use multiprocessing. – Charchit Agarwal Jul 29 '22 at 21:37

2 Answers2

1

First of all, while creating managers is relatively cheap, accessing them can become quite expensive if you don't know how they work. Long story short, they spawn a separate process, and allow other processes to execute commands on any object stored inside the process. These commands are read sequentially (execution can be somewhat parallel since they use threading internally).

Therefore, if two or more processes attempt to access a managed object (a dictionary in this case) at the same time, one will block until the other process's request is read. Therefore, managers are non-ideal when using multiprocessing (although very useful nonetheless), and definitely something to be reconsidered when the parallel processes need to regularly access the managed object (which I assume is the case here with processFunction).

With that said, here, you do not even need to use managers. From the looks of it, processFunction seems like a localized function which does not care about the state of the dictionary as a whole. Therefore, you should only concern yourself with concatenating the return values from the pool into your main dictionary from within the main process itself, rather then worrying about trying to create shared memory for the pool to have access to (remember that a pool automatically passes the return value of the tasks it is assigned to the main process upon completion).

Here's a way you can do that, with a sample dictionary and processFunction, along with a benchmark comparing the speed if you were to do the same task serially.

from multiprocessing import Pool
import string, random, time

def review_generator(size=10):
    chars = string.ascii_uppercase + string.digits
    return ''.join(random.choice(chars) for _ in range(size))

def processFunc(product, prodFile):
    # Return a tuple of the product name and the altered value (a list of tuples)
    return product, [[(element, review_generator()) for element in review] for review in prodFile]


if __name__ == "__main__":

    # Generate example dictionary
    dictionary = {'storeNameA': {}, 'storeNameB': {}}
    for key, _ in dictionary.items():
        for prod_i in range(1000):
            prod = f'product{prod_i}'
            dictionary[key][prod] = [[review_generator() for _ in range(50)] for _ in range(5)]

    # Time the parallel approach
    t = time.time()
    with Pool() as pool:
        a = pool.starmap(processFunc, [(product, prodFile) for product, prodFile in dictionary['storeNameA'].items()])
        b = pool.starmap(processFunc, [(product, prodFile) for product, prodFile in dictionary['storeNameB'].items()])

    print(f"Parallel Approach took {time.time() - t}")

    # Time the serial approach
    t = time.time()

    a = [processFunc(product, prodFile) for product, prodFile in dictionary['storeNameA'].items()]
    b = [processFunc(product, prodFile) for product, prodFile in dictionary['storeNameB'].items()]

    print(f"Serial approach took {time.time() - t}")

Output

Parallel Approach took 1.5318272113800049
Serial approach took 5.765411615371704

Once you have the results from the sample processFunction for each store inside a and b, you can then create your new dictionary in the main process itself:

new_dictionary = {'storeNameA': {}, 'storeNameB': {}}
for product, prodFile in a:
    new_dictionary['storeNameA'][product] = prodFile
for product, prodFile in b:
    new_dictionary['storeNameB'][product] = prodFile

I would also encourage you to try different variants of assigning tasks to workers a pool offers, (like imap) to see if they fit your use-case better and are more efficient.

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
  • I would say this is very close to what I need. `def processFunction(prodFile): # Does some things with prodFile return [(str, str), (str, str), ...]` I had a different amount of inputs to the function in my original question because I was trying to update the new dictionary in the parallel stage as you said. But, when stripped down, it only needs the actual list of string to process, and returns a list of tuples (where each string has been turned into a tuple. – Kav Jul 30 '22 at 12:58
0

With massive thanks for @Charchit and their answer, I have got this working. And it is now running my huge dataset in ~154 seconds, compared to the ~606 seconds it was taking iteratively.

Here's the final code, which is very similar to @Charchit's answer above, but with some small changes.

def processFunction(product, listOfReviews):
    # This function handles every review for each product
    toReturn = []
    for review in listOfReviews:
        X = # Do something here...
        toReturn.append(X)
        # X is now a list of tuples [(str, str), (str, str), ...]

    # toReturn is now a list of list
    return product, toReturn

if __name__ == "__main__":

original_dictionary = dict() 
# Where this would be the VERY large dictionary I have. See the structure in my original question.

    new_dictionary = dict()
    for key in original_dictionary:
        new_dictionary[key] = dict()
    for key in original_dictionary['storeNameA']:
        new_dictionary['storeNameA'][key] = list()
    for key in original_dictionary['storeNameB']:
        new_dictionary['storeNameB'][key] = list()

    with multiprocessing.Pool() as pool:
        a = pool.starmap(processFunction, [(product, reviews) for product, reviews in original_dictionary['storeNameA'].items()])
        b = pool.starmap(processFunction, [(product, reviews) for product, reviews in original_dictionary['storeNameB'].items()])

    for product, reviews in a:
        new_dictionary['storeNameA'][product] = reviews
    for product, reviews in b:
        new_dictionary['storeNameB'][product] = reviews

Thanks again, @Charchit!

Kav
  • 25
  • 4