1

I am doing some parallelization for a school assignment. Converting pictures to grayscale. But ever since I implemented multiprocessing.Array, I am getting terrible results. Even worse with increasing number of processors.

The speed was fine and as expected, when I was inputing the results into numpy array (but obviously I did not get a valid picture with that). But ever since I switched to the mp Array, it got this sluggish, but I got nice grayscale pictures from it.

I also tried implementing this with processexecutor, but it froze when I passed the mp Array as an argument (did not even reach pid print which was on the first line of the called function)

The Code:

from PIL import Image
import numpy
import time
from multiprocessing import Process
from multiprocessing import Array

if __name__ == "__main__":
    main2()


def main2():
    for i in range(3):
        filename = "%d.jpg" % i
        cols, rows, mpx, source_picture, result_array_seq, result_array_par = load_picture(filename)
        print(filename, "%dx%d" % (cols, rows), "%f Mpx" % mpx)
        seq_time, new_image = grayscale_seq(cols, rows, source_picture, result_array_seq)
        new_image.save("%d_gray_seq.jpg" % i)
        print("Seq run:", time.strftime('%H:%M:%S', time.gmtime(seq_time)))
        for p in range(2, 7, 2): #run on 2, 4 and 6 cores
            par_time, new_image = grayscale_par(cols, rows, source_picture, result_array_par, p)
            new_image.save("%d_gray_par_%d.jpg" % (i, p))
            print("Par run with %d cores" % p, time.strftime('%H:%M:%S', time.gmtime(par_time)))


def grayscale_seq(cols, rows, source_pic, dest_pic):
    time_start = time.time()
    for row in range(rows):
        for col in range(cols):
            r, g, b = source_pic[row, col]
            dest_pic[row, col] = calculate_rgb(r, g, b)
    time_end = time.time()
    time_diff = (time_end - time_start)
    result_array = numpy.array(dest_pic).astype(numpy.uint8)
    new_image = Image.fromarray(result_array)

    return time_diff, new_image


def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
    tuples = splitIndex(num_of_cpus, rows)
    process_array = []
    for i in range(len(tuples)):
        start_index, end_index = tuples[i]
        picture_slice = source_pic[start_index:end_index, :, :]
        p = Process(target=grayscaleSlice, args=(picture_slice, dest_pic, tuples[i][0], cols))
        process_array.append(p)

    time_start = time.time()
    for process in process_array:
        process.start()
        # process.join()
    for process in process_array:
        process.join()
    time_end = time.time()
    time_diff = (time_end - time_start)
    image = numpy.array(dest_pic).astype(numpy.uint8).reshape(rows, cols)
    new_image = Image.fromarray(image)
    return time_diff, new_image


def splitIndex(numprocessors, height):
    resultarray = [0]
    previousindex = 0
    for i in range(numprocessors - 1):
        result = previousindex + height // numprocessors
        resultarray.append(result)
        previousindex = result
    indextuples = []
    for i in range(numprocessors):
        if i == numprocessors - 1:
            indextuples.append((resultarray[i], height))
            break
        indextuples.append((resultarray[i], resultarray[i + 1]))
    return indextuples


def grayscaleSlice(picture_slice, array, start_index, cols):
    for row in range(len(picture_slice)):
        for col in range(cols):
            R = picture_slice[row, col, 0]
            G = picture_slice[row, col, 1]
            B = picture_slice[row, col, 2]
            result = int(round(0.299 * R + 0.587 * G + 0.114 * B))
            array[start_index * cols + row * cols + col] = result


def load_picture(filename):
    im = Image.open(filename)
    cols, rows = im.size
    mpx = (cols * rows) / 1000000
    source_picture = numpy.asarray(im)
    result_array_seq = numpy.zeros((rows, cols))
    result_array_par = Array('i', rows * cols)
    return cols, rows, mpx, source_picture, result_array_seq, result_array_par


def calculate_rgb(r, g, b):
    return int(round(0.299 * r + 0.587 * g + 0.114 * b))

The results:

0.jpg 500x375 0.187500 Mpx
Seq run: 00:00:06
Par run with 2 cores 00:00:01
Par run with 4 cores 00:00:01
Par run with 6 cores 00:00:02

1.jpg 1920x1200 2.304000 Mpx
Seq run: 00:00:16
Par run with 2 cores 00:00:12
Par run with 4 cores 00:00:18
Par run with 6 cores 00:00:21

2.jpg 3100x2074 6.429400 Mpx
Seq run: 00:00:47
Par run with 2 cores 00:00:32
Par run with 4 cores 00:00:45
Par run with 6 cores 00:01:03
Budbreaker
  • 35
  • 5
  • You didn't post implementations of `splitIndex` or `grayscaleSlice`, but as a guess, it appears you have each of the callers feeding data one pixel at a time to a shared array. Since sending data between processes is probably the most costly part of the process, it's going to be very slow. – Anon Coward Mar 02 '21 at 21:11
  • yes, I just realized that and it's already there. I expected the synchronization to take some processing power from the parallel computing, but I honestly did not expect it to have such huge impact. How should this problem be approached to get the most out of multiple processors ? somehow create each slice in individual processes and then splice them together ? – Budbreaker Mar 02 '21 at 21:17
  • You can't avoid the overhead of process creation (that's one of the reasons why threads have been later added). A solution is to keep the processes alive, and use as little shared resources as possible. –  Mar 02 '21 at 21:35

1 Answers1

1

Every time you access a shared memory component like multiprocessing's Array, you necessarily engage some inter-process communication. This is slower than normal memory access, of course, and to make matters worse, it tends to lock the array, meaning your workers are mostly locking on each other, and not doing real work.

If instead of sending one pixel at a time, if you batch up the results, you can speed things up. For instance, if you change to a model of listing what pixels need to be updated, and have your main worker do the actual work of updating the target array:

def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
    # dest_pic is now a numpy array, just like in grayscale_seq
    tuples = splitIndex(num_of_cpus, rows)
    process_array = []
    # A queue for the workers to store result sets into
    queue = Queue()
    for i in range(len(tuples)):
        start_index, end_index = tuples[i]
        picture_slice = source_pic[start_index:end_index, :, :]
        p = Process(target=grayscaleSlice, args=(queue, picture_slice, tuples[i][0], cols))
        process_array.append(p)

    time_start = time.time()
    for process in process_array:
        process.start()
    # Go ahead and apply the result set from each worker once it's ready
    # We want to do this once for each process we launched, so 
    # loop through that array.  The value is ignored, so just use an
    # underscore here.
    for _ in process_array:
        # Now, for each process, call queue.get() to get it's result
        # array.  This will block till it returns.  Once it returns
        # we iterate through that array, one item at a time.
        # Each item is just a tuple telling us which pixel to
        # update to which value
        for row, col, value in queue.get():
            # So, finally, we update each pixel in turn
            dest_pic[row, col] = value
    for process in process_array:
        process.join()
    time_end = time.time()
    time_diff = (time_end - time_start)
    image = numpy.array(dest_pic).astype(numpy.uint8)
    new_image = Image.fromarray(image)
    return time_diff, new_image


def grayscaleSlice(queue, picture_slice, start_index, cols):
    # Just store the result in an array
    result = []
    for row in range(len(picture_slice)):
        for col in range(cols):
            R = picture_slice[row, col, 0]
            G = picture_slice[row, col, 1]
            B = picture_slice[row, col, 2]
            value = int(round(0.299 * R + 0.587 * G + 0.114 * B))
            result.append((start_index + row, col, value))
    # All done, go ahead and send the result set along to the main thread
    queue.put(result)

This will result in the threads being more efficiently used. On my machine, with a sample image, this outputs the following, showing a clear speed improvement with more cores.

0.jpg 2272x1704 3.871488 Mpx
Seq run: 00:00:29
Par run with 2 cores 00:00:14
Par run with 4 cores 00:00:08
Par run with 6 cores 00:00:06
Anon Coward
  • 9,784
  • 3
  • 26
  • 37
  • Awesome, thanks. I have a hard time wrapping my head around this `for _ in process_array:` `for row, col, value in queue.get():` `dest_pic[row, col] = value` I assume it waits for the queue until it has some values available ? like a streamlistener in dart? – Budbreaker Mar 02 '21 at 21:52
  • 1
    I think you basically got it, but it's a fair question. I fleshed out that part a bit with some more comments so hopefully you can understand each step in detail. – Anon Coward Mar 02 '21 at 21:57
  • Hi @Anon, I have a question about running `multiprocessing.Pool` on Windows laptop [here](https://stackoverflow.com/questions/66445724/why-does-this-parallel-process-run-infinitely-on-windows). I hope that you can take some time have a check on this question. Thank you so much for your help! – Akira Mar 03 '21 at 08:56