0

I have a big list of images list_img, say 20k that I need to process multiple times with changing arguments out of a list params = [arg1, arg2, ...]. Ideally, I want to use multiple processes to do so. But I need all processes to first use arg1 and then arg2 on chunks of my list list_img. The processing time for each arg in params varies greatly. So if I would distribute the list params over my processes instead of the list of images (core 1: arg1, core 2: arg2, ...) it happens that after a while most of the processes are idle (finished) while very few are still crunching data.

My current (working) solution looks like that:

from multiprocessing import Pool
import numpy as np

def calc_image(argument, image):
    val = argument * image    # not the real process, just demo
    return val

if __name__ == "__main__":
    pool = Pool(processes=8)
    list_img = [np.ones((100, 100))] * 20000    # for demo only
    params = list(range(100))    # for demo only
    for par in params:
        par_list = [par] * len(list_img)
        return_vals = pool.starmap(calc_image, zip(par_list, list_img))
    pool.close()

How can I avoid to copy the list list_img every time the variable par changes in the for-loop? I also would like to avoid using global variables, if possible.

RaJa
  • 1,471
  • 13
  • 17
  • Pickling 200MB of images to pass to each of 100 processes doesn't seem ideal. – Mark Setchell Jul 14 '23 at 06:32
  • That's exactly the problem that I'm trying to solve. – RaJa Jul 14 '23 at 07:20
  • How about going for fewer processes, say 4 if you have 4 CPU cores, and then letting each process do 1/4 of the images, or 1/4 of the parameter list? Or how about putting the images into Redis or shared memory where the processes can help themselves? – Mark Setchell Jul 14 '23 at 08:15
  • "If I would distribute the list params ... most of the processes are idle (finished) while very few are still crunching data.", Does this mean that the processing time varies greatly from image to image? Can you also provide the code you tried when this happened? – ken Jul 14 '23 at 08:55
  • @ken That's exactly the case. Each `arg` is actually a list of image processing kernels. Say `arg1 = Sobel` and `arg2 = Normalize + Hough_circles` than applying `arg2` takes much longer to process. In this case the process previously applying `arg1` falls idle and it's computing time is wasted. I've put that info in the initial question. – RaJa Jul 14 '23 at 09:23
  • Ah, I see. Then I have a concern: you may be able to reduce the number of copies for input, but you will not be able to reduce the number of copies for output because there are 100x20000 different output images. So you may want to consider using memmap or shared_memory to improve the performance of each copy first. – ken Jul 14 '23 at 10:06
  • This code has two errors: `np.ones((100, 100)` (missing `)`) and `len(images)` (`images` undefined). – Booboo Jul 15 '23 at 11:51
  • By my calculation you are creating 20_000 * 100 = 2_000_000 tasks and each task is being passed a 100 by 100 (10_000 elements) array. – Booboo Jul 15 '23 at 12:18

2 Answers2

0

Try using "Partial Functions" from functools module. https://docs.python.org/3/library/functools.html#functools.partial

Create a new function for example, "partial_calc_image" and pass partial() function on *args which in this case is "calc_image" function and **keywords arg which is image list "list_img"

refer to below answer: https://stackoverflow.com/a/72524256/22072290

Furthermore, starmap() can be passed on "partial_calc_image".

return_vals = pool.starmap(partial_calc_image, zip(par_list, list_img))

VeeyesPlus
  • 57
  • 6
  • I'm not sure that this solves the problem. Your solution still copies the list `list_img` every time. – RaJa Jul 14 '23 at 07:21
0

This is my current workaround for the problem. I'm still interested in a better solution - maybe more elegant.

I've switched from using Poolto a collection of Process:

from multiprocessing import Queue, Process
import numpy as np

def process_image(list_images, queue_in, queue_out):
    for arg in iter(queue_in.get, "STOP"):
        processed_images = []
        for img in list_images:
            result = arg * img
            processed_images.append(result)
        queue_out.put(processed_images)

if __name__ == "__main__":
    list_img = [np.ones((100, 100))] * 20000    # for demo only
    splits = np.split(list_img, 4)   # split into 4 chunks
    my_pool = []
    queue_in = Queue()
    queue_out = Queue()
    # starting a bunch of process, each owning a part of the list of images
    # so list is only copied once
    for n in range(4):
        proc = Process(target=process_image, args=(splits[n], queue_in, queue_out))
        proc.start()
        my_pool.append(proc)
    params = list(range(100))    # for demo only
    for par in params:
        for n in my_pool:
            queue_in.put(par)    # each process gets the same element and starts crunching
        return_vals = []
        for n in my_pool:
            return_vals.append(queue_out.get(block=True)) # wait for results
    for element in my_pool:
        creature_tasks.put("STOP")   # indicate processes to close
    for element in pool:
        element.join()

The trick is that I only copy the list of images only once during the creation of the processes. Each worker gets its own sub-list of the total list during initialization which has been split before. Later I provide the argument that should be used to process the images in a small loop. As the processes are blocking until queue_in contains elements, I just have to provide the respective argument exactly the same times as I have processes. This way the images are not copied again.

Copying the results back (from the processes to the main-process) can't be avoided.

RaJa
  • 1,471
  • 13
  • 17
  • How could you have possibly tested this code since it will not compile? See my comment to your question. – Booboo Jul 15 '23 at 11:52
  • Well, the code you see is not the actual code I'm using in my project. Obviously. And the mistakes likely occurred while writing the post. – RaJa Jul 17 '23 at 04:55