0

I have a function which request a server, retrieves some data, process it and saves a csv file. This function should be launch 20k times. Each execution last differently: sometimes It last more than 20 minutes and other less than a second. I decided to go with multiprocessing.Pool.map to parallelize the execution. My code looks like:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

Looking at how prints are generated it seems that long_list_of_filenames it's been splited into 8 parts and assinged to each CPU because sometimes is just get blocked in one 20 minutes execution with no other element of long_list_of_filenames been processed in those 20 minutes. What I was expecting is map to schedule each element in a cpu core in a FIFO style.

Is there a better approach for my case?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
lsmor
  • 4,698
  • 17
  • 38
  • You should set the `chunksize`-parameter for `Pool.map()` to `1` in this case. You can calculate the otherwise generated chunksizes with `calc_chunksize_info()` from my answer [here](https://stackoverflow.com/a/54032744/9059420). – Darkonaut Aug 01 '19 at 17:17
  • 1
    `map` works similar to the built-in `map` for iterables. That means that order is *ensured*. In other words, a slow process will block the faster processes. If order doesn't matter for you, I suggest looking into `map_async` instead. – Bram Vanroy Aug 01 '19 at 17:39

2 Answers2

2

The map method only returns when all operations have finished.

And printing from a pool worker is not ideal. For one thing, files like stdout use buffering, so there might be a variable amount of time between printing a message and it actually appearing. Furthermore, since all workers inherit the same stdout, their output would become intermeshed and possibly even garbled.

So I would suggest using imap_unordered instead. That returns an iterator that will begin yielding results as soon as they are available. The only catch is that this returns results in the order they finish, not in the order they started.

Your worker function (get_data_and_process_it) should return some kind of status indicator. For example a tuple of the filename and the result.

def get_data_and_process_it(filename):
    ...
    if (error):
        return (filename, f'has *failed* bacause of {reason}')
    return (filename, 'has been processed')

You could then do:

with Pool(8) as p:
   for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
       print(fn, res)

That gives accurate information about when a job finishes, and since only the parent process writes to stdout, there is no change of the output becoming garbled.

Additionally, I would suggest to use sys.stdout.reconfigure(line_buffering=True) somewhere in the beginning of your program. That ensures that the stdout stream will be flushed after every line of output.

Roland Smith
  • 42,427
  • 3
  • 64
  • 94
1

map is blocking, instead of p.map you can use p.map_async. map will wait for all those function calls to finish so we see all the results in a row. map_async does the work in random order and does not wait for a proceeding task to finish before starting a new task. This is the fastest approach.(For more) There is also a SO thread which in detail discusses about map and map_async.

The multiprocessing Pool class handles the queuing logic for us. It's perfect for running web scraping jobs in parallel (example) or really any job that can be broken up and distributed independently. If you need more control over the queue or need to share data between multiple processes, you may want to look at the Queue class(For more).

j23
  • 3,139
  • 1
  • 6
  • 13