0

Suppose I have a list of strings like so:

a = ['string1', 'string2', ..., 'stringN']

Was wondering what could be the most efficient way to make use of "8" processes using these "N" strings in the said list as an argument for a "single" function?

For better understanding consider the function like this:

def some_function(a_string: str):
    f = open("my_enemy_list!.txt", "a")
    f.write(a_string)
    f.close()

Rest assured I'll use some mutex lock for protecting against the step overs.

(in Linux and not a very large or small list size)

John
  • 35
  • 9
  • 1
    You probably can't for your use case, if everything is IO bound parallelization usually doesn't help much, are you doing any computation with the string? – Carbon Jul 03 '23 at 17:15
  • @Carbon my actual intention is totally cpu bound I can tell you that! what comes to my mind is to assign each string to a process starting from first to the last until all of'em are processed.. – John Jul 03 '23 at 17:18

3 Answers3

1

This should be enough to get you started. If the processes need to interact, it becomes a bit more interesting, but for an embarrassingly parallel problem this is all you need.

from concurrent.futures import ProcessPoolExecutor


def load_function(item):
    print(item)


if __name__ == '__main__':
    SAMPLE_DATA = [i for i in range(25)]
    with ProcessPoolExecutor(8) as ppe:
        ppe.map(load_function, SAMPLE_DATA)
Carbon
  • 3,828
  • 3
  • 24
  • 51
  • As much helpful as your sample code may be in action it actually gave me a pretty nice understanding of what's going on, I'll dig into it and thanks – John Jul 04 '23 at 04:23
1

The simplest way is to pass that list directly to Pool.map(), and let it split up the list into chunks and distribute them to workers.

This doesn't assign the chunks to specific workers all at once, but that's usually undesirable anyway. You want the workers to be flexible, and to request more work if they finish their work earlier than expected. The more constraints you put on which process can do which work, the slower your code will be.

But why break the list into chunks? Why not give each worker one item, then give them more when they finish? The reason for this is IPC (inter-process communication) overhead. Every time a worker needs to request more work, and wake up the parent process, that takes some time, and the worker is idle during that.

By default, chunksize is set to the number of items, divided by the number of processes, divided by 4. In your case, that would lead to dividing your list into 32 parts. This is normally a pretty good default, but you can do better in some cases by tuning it. I usually run my program with different chunksize values, and pick the one which is fastest.

Nick ODell
  • 15,465
  • 3
  • 32
  • 66
  • Now hands down the best detailed understandable explanation for some rookie like me. Thanks a galaxy. – John Jul 04 '23 at 04:26
1

You did not specify what platform you are running under or how large the data string being written will be. If you are running under Linux and the data is not too large, explicit locking is not necessary. See this post. But whether the writing is atomic or you have to do explicit locking, since all processes would be writing to the same file, there is no real parallelization being accomplished in outputting the data. For that reason I would find it simpler to have a single writer.

If the order in which the strings do not matter, I would use the following code:

from multiprocessing import Pool, Queue, cpu_count

def init_pool_processes(q: Queue) -> None:
    global queue

    queue = q

def some_function(a_string: str) -> None:
    ... # Perform some CPU-intenive operations yielding result
    result = a_string.upper() + '\n' # for demo purposes
    queue.put(result)


def writer() -> None:
    with open("my_enemy_list!.txt", "w") as f:
        for result in iter(queue.get, None):
            f.write(result)

def main():
    a = [f'string{i}' for i in range(1, 101)]
    queue = Queue()
    with Pool(cpu_count() + 1, initializer=init_pool_processes, initargs=(queue,)) as pool:
        async_result = pool.apply_async(writer)
        pool.map(some_function, a)
        # Tell writer there is no more data coming:
        queue.put(None)
        # Wait for writer to complete:
        async_result.get()

if __name__ == '__main__':
    main()

If the order does matter, then:

from multiprocessing import Pool, cpu_count


def some_function(a_string: str) -> None:
    ... # Perform some CPU-intenive operations yielding result
    result = a_string.upper() + '\n' # for demo purposes
    return result

def compute_chunksize(iterable_size: int, pool_size: int) -> int:
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

def main():
    a = [f'string{i}' for i in range(1, 101)]
    iterable_size = len(a)
    pool_size = cpu_count()
    chunksize = compute_chunksize(iterable_size, pool_size)

    with Pool(pool_size) as pool, \
    open("my_enemy_list!.txt", "w") as f:
        for result in pool.imap_unordered(some_function, a, chunksize=chunksize):
            f.write(result)

if __name__ == '__main__':
    main()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Yeah my bad, should've mentioned the platform and the size but you guessed just right. and about a single writer, that seems like the greatest idea! even better than what I had in mind about each process creating it's own file and turning them in a single one at the end. And the idea about using a queue was Awesome and pretty straight forward practically and at the end I must say this solution worked for me. Brilliant. Thanks – John Jul 04 '23 at 05:01