2

I am working to update a shared dictionary synchronously using mpire package in Python in a multi-core machine (i.e., parallel processing to update a dict). The environment I am using is a Linux machine with 8 vCPU and 16 GB memory in Amazon Sagemaker. Below is a sample/dummy code snippet I am using for this. But I am unable to make it working. I know I can perhaps use Process or map methods from multiprocessing package to accomplish this task. I am just checking if there is any way I can do it using mpire package. Any help would be greatly appreciated. Thanks much!

def myFunc(shared_objects, id_val):
    
    indata, output = shared_objects
    
    # Temporary store for model output for an input ID
    temp:  Dict[str, int] = dict()    

    # Filter data for input ID and store output in temp variable
    indata2 = indata.loc[indata['ID']==id_val]
    temp = indata2.groupby(['M_CODE'])['VALUE'].sum().to_dict()
    
    # store the result .. I want this to happen synchronously
    output[id_val] = temp

#*******************************************************************
if __name__ == '__main__':

    from mpire import WorkerPool
    from multiprocessing import Manager

    # This is just a sample data
    inputData = pd.DataFrame(dict({'ID':['A', 'B', 'A', 'C', 'A'], 
                                  'M_CODE':['AKQ1', 'ALM3', 'BLC4', 'ALM4', 'BLC4'], 
                                  'VALUE':[0.75, 1, 1.75, 0.67, 3], }))

    start_time = datetime.now()
    print(start_time, '>> Process started.')

    # Use a shared dict to store results from various workers
    manager = Manager()

    # dict on Manager has no lock at all!
    # https://stackoverflow.com/questions/2936626/how-to-share-a-dictionary-between-multiple-processes-in-python-without-locking
    output: Dict[str, Dict[str, int]] = manager.dict()

    shared_objects = inputData, output

    with WorkerPool(n_jobs=7, shared_objects=shared_objects) as pool:
        results = pool.map_unordered(myFunc, inputData['ID'].unique(), progress_bar=True)

    print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)

Below is the error I'm stuck with:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-10-df7d847398a1> in <module>
     37 
     38     with WorkerPool(n_jobs=7, shared_objects=shared_objects) as pool:
---> 39         results = pool.map_unordered(myFunc, inputData['ID'].unique(), progress_bar=True)
     40 
     41     print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)

/opt/conda/lib/python3.7/site-packages/mpire/pool.py in map_unordered(self, func, iterable_of_args, iterable_len, max_tasks_active, chunk_size, n_splits, worker_lifespan, progress_bar, progress_bar_position, enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout, worker_exit_timeout)
    418                                         n_splits, worker_lifespan, progress_bar, progress_bar_position,
    419                                         enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout,
--> 420                                         worker_exit_timeout))
    421 
    422     def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], iterable_len: Optional[int] = None,

/opt/conda/lib/python3.7/site-packages/mpire/pool.py in imap_unordered(self, func, iterable_of_args, iterable_len, max_tasks_active, chunk_size, n_splits, worker_lifespan, progress_bar, progress_bar_position, enable_insights, worker_init, worker_exit, task_timeout, worker_init_timeout, worker_exit_timeout)
    664                     # Terminate if exception has been thrown at this point
    665                     if self._worker_comms.exception_thrown():
--> 666                         self._handle_exception(progress_bar_handler)
    667 
    668                     # All results are in: it's clean up time

/opt/conda/lib/python3.7/site-packages/mpire/pool.py in _handle_exception(self, progress_bar_handler)
    729         # Raise
    730         logger.debug("Re-raising obtained exception")
--> 731         raise err(traceback_str)
    732 
    733     def stop_and_join(self, progress_bar_handler: Optional[ProgressBarHandler] = None,

ValueError: 

Exception occurred in Worker-0 with the following arguments:
Arg 0: 'A'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 352, in _run_safely
    results = func()
  File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 288, in _func
    _results = func(args)
  File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 455, in _helper_func
    return self._call_func(func, args)
  File "/opt/conda/lib/python3.7/site-packages/mpire/worker.py", line 472, in _call_func
    return func(args)
  File "<ipython-input-10-df7d847398a1>", line 9, in myFunc
    indata2 = indata.loc[indata['ID']==id_val]
  File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/common.py", line 69, in new_method
    return method(self, other)
  File "/opt/conda/lib/python3.7/site-packages/pandas/core/arraylike.py", line 32, in __eq__
    return self._cmp_method(other, operator.eq)
  File "/opt/conda/lib/python3.7/site-packages/pandas/core/series.py", line 5502, in _cmp_method
    res_values = ops.comparison_op(lvalues, rvalues, op)
  File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/array_ops.py", line 262, in comparison_op
    "Lengths must match to compare", lvalues.shape, rvalues.shape
ValueError: ('Lengths must match to compare', (5,), (1,))

[Update]: Here is the code I found to be working fine using only the package multiprocessing.

def myFunc(id_val, output, indata):
    
    # Temporary store for model output for an input ID
    temp:  Dict[str, int] = dict()    

    # Filter data for input ID and store output in temp variable
    indata2 = indata.loc[indata['ID']==id_val]
    temp = indata2.groupby(['M_CODE'])['VALUE'].sum().to_dict()
    
    # store the result .. I want this to happen synchronously
    output[id_val] = temp



#*******************************************************************
if __name__ == '__main__':

    import pandas as pd
    from typing import Dict
    from itertools import repeat
    from multiprocessing import Manager
    from datetime import datetime

    # This is just a sample data
    inputData = pd.DataFrame(dict({'ID':['A', 'B', 'A', 'C', 'A'], 
                                  'M_CODE':['AKQ1', 'ALM3', 'BLC4', 'ALM4', 'BLC4'], 
                                  'VALUE':[0.75, 1, 1.75, 0.67, 3], }))

    start_time = datetime.now()
    print(start_time, '>> Process started.')

    # Use a shared dict to store results from various workers
    with Manager() as manager:
        # dict on Manager has no lock at all!
        # https://stackoverflow.com/questions/2936626/how-to-share-a-dictionary-between-multiple-processes-in-python-without-locking
        output: Dict[str, Dict[str, int]] = manager.dict()

        # Start processes involving n workers
        # Set chunksize to effciently handling the tasks across workers so none remains idle as much as possible
        with manager.Pool(processes=7, ) as pool:
            pool.starmap(myFunc, 
                         zip(inputData['ID'].unique(), repeat(output), repeat(inputData)),
                         chunksize = max(inputData['ID'].nunique() // (7*4), 1))

        output = dict(output)
            
    print(datetime.now(), '>> Process completed -> total time taken:', datetime.now()-start_time)
Sauvik De
  • 121
  • 3

1 Answers1

0

UPDATE:

Now that I better understand the specific issue, I can say the issue lies with the relationship of mpire.WorkerPool.map_unordered chunking procedure with the expected inputs to the pandas.loc function. Specifically, MyFunc gets id_val as a singular Numpy array such as array(['A'], dtype=object) as detailed in the chunking explanation and the source code. On the other side, indata['ID'] in the loc function a pandas Series. One of these has to be changed for the comparison to work, but based on what your code is trying to do, the id_val can be changed to give just its item, like:

id_val.item()
indata2 = indata.loc[indata['ID']==id_val]

Making the new MyFunc (which on my machine gets your script to run):

def myFunc(shared_objects, id_val):
    
    indata, output = shared_objects

    # Keep just the value of the id_val
    id_val.item()
    
    # Temporary store for model output for an input ID
    temp:  Dict[str, int] = dict()    

    # Filter data for input ID and store output in temp variable
    indata2 = indata.loc[indata['ID']==id_val]
    temp = indata2.groupby(['M_CODE'])['VALUE'].sum().to_dict()
    
    # store the result .. I want this to happen synchronously
    output[id_val] = temp

The reason why this isn't an issue in your multiprocessing-only solution is because zip is chunking inputData['ID'].unique() the way you expect: it only gives the value, not the value wrapped in an array object. Nice job finding an alternative solution, though!


The error is occurring in the function line:

indata2 = indata.loc[indata['ID']==id_val

Per the main error:

File "/opt/conda/lib/python3.7/site-packages/pandas/core/ops/array_ops.py", line 262, in comparison_op "Lengths must match to compare", lvalues.shape, rvalues.shape ValueError: ('Lengths must match to compare', (5,), (1,))

This is an element-wise equality match between Series(['A', 'B', 'A', 'C', 'A']).unique() and Series(['A', 'B', 'A', 'C', 'A']) . Which will never work unless there are no repeated values in 'ID' . I'm not sure what you are trying to do exactly with this statement, but that is certainly the cause of your error.

physincubus
  • 986
  • 2
  • 11
  • 26
  • Yes - I know that's the line where the code errors out. The process isn't somehow not able to match element-wise due to unequal length of comparison vectors both sides. And, that's where I need pointers on why it doesn't work while using ```mpire``` package coupled with ```multiprocessing```. This, to me, sounds like a compatibility issue. Code works just fine using Manager and Pool (from Manager) from ```multiprocessing``` package though. – Sauvik De Aug 19 '22 at 06:11
  • Interesting, To help my update my answer to be more helpful, what are you expecting to happen when you do the element-wise match between two vectors of unequal length? In my experience, this always causes an error – physincubus Sep 06 '22 at 20:25
  • I am not matching two vectors of unequal length; it is more of a filtering where I am trying to filter by one ID value from the dataframe. Pretty simple! Please refer the line in my original post `indata2 = indata.loc[indata['ID']==id_val]` where the code fails.I have also edited my post to include the solution to the equivalent code while using _only_ `multiprocessing` package, as I mentioned in my above comment. – Sauvik De Sep 12 '22 at 07:35
  • I dug more into this and I realized what is going on: the issue is a combination of how `mpire.WorkerPool.map_unordered` chunks your `iterable_of_args` and how your `pandas.loc` function expects to receive those chunks. Specifically, `MyFunc` gets `id_val` as an array such as `array(['A'], dtype=object)`, and `indata['ID']` in the `loc` function call produces a Series, which aren't comparable. for this comparison to work, you must either change the `id_val` to get the `item`, or the `indata['ID']` to get the array `values` (see my edit for more) – physincubus Sep 17 '22 at 22:32