2

I am trying to perform parallel processing for my requirements, and the code seems to be working as expected for 4k-5k elements in parallel. But as soon as the elements to be processed start increasing, the code processes a few listings and then without throwing any error, the program stops running abruptly.

I checked and the program is not hung, the RAM is available (I have a 16 Gb RAM) and CPU Utilization is not even 30%. Can't seem to figure out what is happening. I have 1 million elements to be processed.

def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item

def start_download_process():
    multiproc_pool = multiprocessing.Pool(processes=10)
    for download_item in get_items_to_download():
        multiproc_pool.apply_async(start_processing, args = (download_item, ), callback = results_callback)
    
    multiproc_pool.close()
    multiproc_pool.join()

def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False

def results_callback(result):
    print(result)

if __name__ == "__main__":
    start_download_process()

UPDATE -

Found the error- BrokenPipeError: [Errno 32] Broken pipe

Trace -

Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 125, in worker
put((job, i, result))
File "/usr/lib/python3.6/multiprocessing/queues.py", line 347, in put
self._writer.send_bytes(obj)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Rajat Suneja
  • 494
  • 2
  • 10
  • 27
  • I hope you've checked their rate-limit for requests before you tried downloading 1 million items in parallel from the same api. – Darkonaut Oct 05 '20 at 18:11
  • Yes, I have.. It suffices and if I just simply download the required items it works fine, but as soon as I bring in the processing element to the multi-processing. Then it starts to fail.. – Rajat Suneja Oct 07 '20 at 03:27
  • Try with `processes=1` to see if it still fails. 16GB is not much for 10 processes, your OS might have killed a process. Try with [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3.8/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) instead of `multiprocessing.Pool`, it will break immediately if this happens. Also test if commenting out the database part helps. `print()` is your friend, use it to zero in on the location where it starts hanging. – Darkonaut Oct 07 '20 at 08:07
  • Doesn't happen with single process. when processes=1 things run smoothly till the very end. – Rajat Suneja Oct 09 '20 at 06:56
  • Do you have any feedback to my response below? Because if you have tried my suggestion and let me know what happened, I might have further suggestions. For example, did you find that you did have tasks that were hanging and now timed out? Or by submitting in batch sized of 1000 it ran to completion (which, I understand, is problematic -- it should work in larger batch sizes if that were the case). – Booboo Oct 09 '20 at 11:43

3 Answers3

1

The code looks correct. The only thing I can think of is that all of your processes are hanging waiting for completion. Here is a suggestion: Rather than using the callback mechanism provided by apply_async, use the AsyncResult object that is returned to get the return value from the process. You can call get on this object specifying a timeout value (30 seconds arbitrarily specified below, possibly not long enough). If the task has not completed in that duration, a timeout exception will be thrown (you could catch it, if you wish). But this will test the hypothesis that the processes are hanging. Just be sure to specify a timeout value that is sufficiently large that the task should complete within that time period. I have also broken up the task submissions up into batches of 1000, not because I think the size of 1,000,000 is a problem per se, but just so you don't have a list of a 1,000,000 result objects. But if you find that you no longer hang as a result, then try increasing the batch size and see if it does make a difference.

import multiprocessing

def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item

BATCH_SIZE = 1000

def start_download_process():
    with multiprocessing.Pool(processes=10) as multiproc_pool:
        results = []
        for download_item in get_items_to_download():
            results.append(multiproc_pool.apply_async(start_processing, args = (download_item, )))
            if len(results) == BATCH_SIZE:
                process_results(results)
                results = []
        if len(results):
            process_results(results)
    

def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False

TIMEOUT_VALUE = 30 # or some suitable value

def process_results(results):
    for result in results:
        return_value = result.get(TIMEOUT_VALUE) # will cause an exception if process is hanging
        print(return_value)

if __name__ == "__main__":
    start_download_process()

Update

Based on Googling several pages for your broken pipe error, it appears that your error could be the result of exhausting memory. See Python Multiprocessing: Broken Pipe exception after increasing Pool size, for example. The following reworking attempts to utilize less memory. If it works, you can then try to increase the batch size:

import multiprocessing


BATCH_SIZE = 1000
POOL_SIZE = 10


def get_items_to_download():
    #iterator to fetch all items that are to be downloaded
    yield download_item


def start_download_process():
    with multiprocessing.Pool(processes=POOL_SIZE) as multiproc_pool:
        items = []
        for download_item in get_items_to_download():
            items.append(download_item)
            if len(items) == BATCH_SIZE:
                process_items(multiproc_pool, items)
                items = []
        if len(items):
            process_items(multiproc_pool, items)


def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False


def compute_chunksize(iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
    if extra:
        chunksize += 1
    return chunksize


def process_items(multiproc_pool, items):
    chunksize = compute_chunksize(len(items))
    # you must iterate the iterable returned:
    for return_value in multiproc_pool.imap(start_processing, items, chunksize):
        print(return_value)


if __name__ == "__main__":
    start_download_process()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Tried this method, and specified a timeout of 30 seconds (the individual processing usually takes about 2 seconds.) after about 5k iterations one of the tasks (at random) causes a timeout exception. – Rajat Suneja Oct 11 '20 at 11:20
  • @RajatSuneja That suggests it was hanging on something. If you put a `try/catch` around the call to `result.get(TIMEOUT_VALUE)` you will continue processing and see all the hanging processes. You will have to figure out what they are hanging on. But just to be sure, this was with batch sizes of 1000? You could also reduce `TIMEOUT_VALUE` to a smaller value so you are not waiting overly long to get the timeout exception. If you say a process should complete in 2 seconds, then perhaps if it hasn't within, for example 5 or perhaps 10 seconds, you can be sure it is already hanging. – Booboo Oct 11 '20 at 11:29
  • Yes, it as with batch size 1k as suggested. – Rajat Suneja Oct 11 '20 at 11:31
  • You may have to modify `start_processing` to log its progress as it proceeds through its logic to determine where it is hanging. See [How should I log while using multiprocessing in Python?](https://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python) and [Logging to a single file from multiple processes](https://docs.python.org/dev/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes). – Booboo Oct 11 '20 at 11:45
  • You could also have each process open a uniquely named output file in append mode in a known directory and write debug information to that file closing after each write. When the process completely finishes, it then deletes the file. Hanging processes will have undeleted files, which you can inspect -- a very low tech solution. – Booboo Oct 11 '20 at 11:50
  • The process is breaking and giving an error - BrokenPipeError: [Errno 32] Broken pipe – Rajat Suneja Oct 21 '20 at 06:51
  • Still the same error, BrokenPipeError: [Errno 32] Broken pipe. Traceback is added to the original question. – Rajat Suneja Oct 22 '20 at 07:01
  • I am running out of suggestions. Are any of the batches successfully processed? If so, perhaps there is a memory leak somewhere in code not shown. If not, then you will have to try a much smaller BATCH_SIZE value and work your way up from there if you wish, although it may not significantly improve performance if the amount of CPU time required by each task is significant. You could even start with a BATCH_SIZE of 10 (the number of processors in the pool) just to see what happens. – Booboo Oct 22 '20 at 11:07
0
def get_items_to_download():
    #instead of yield, return the complete generator object to avoid iterating over this function.
    #Return type - generator (download_item1, download_item2...)
    return download_item


def start_download_process():
    download_item = get_items_to_download()
    # specify the chunksize to get faster results. 
    with multiprocessing.Pool(processes=10) as pool:
    #map_async() is also available, if that's your use case.
        results= pool.map(start_processing, download_item, chunksize=XX )  
    print(results)
    return(results)

def start_processing(download_item):
    try:
        # Code to download item from web API
        # Code to perform some processing on the data
        # Code to update data into database
        return True
    except Exception as e:
        return False

def results_callback(result):
    print(result)

if __name__ == "__main__":
    start_download_process()
Simplecode
  • 559
  • 7
  • 19
  • Firstly, Chunksize is an arg for map/imap and not for Pool.. Anyways, got the suggestion that you made. But, it doesn't seem to have any effect. Still the same issue pops up.. – Rajat Suneja Oct 11 '20 at 11:48
  • My bad ! I will edit that. I think it can be because you are trying to insert in the db. May be the database can't serve many simultaneous connections. Can you just remove the db insertion/updation step and check if code works.It will help in narrowing down the issue as this multiprocessing code works fine for my use case. – Simplecode Oct 11 '20 at 11:57
0

I had the same experience with Python 3.8 on Linux. I set up a new environment with Python 3.7 and multiprocessing.Pool() works now without any issue.

kiasari
  • 111
  • 1
  • 10