0

I want to use multiprocessing to analyze several images in parallel, with my class:

class SegmentationType(object):
    DISPLAY_NAME = "invalid"

    def __init__(self, filename, path):
        self.filename = filename
        self.path = path
        self.input_data = None
        self.output_data = None


    def read_image(self):
        self.input_data =  cv2.imread(self.path + self.filename)[1]

    def write_image(self):
        cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)

    def process(self):
        # override in derived classes to perform an actual segmentation
        pass

    def start_pipeline(self):
        self.read_image()
        self.process()
        self.write_image()

class HSV_Segmenter(SegmentationType):
    DISPLAY_NAME = 'HSV'

    def process(self):
        source = rgb_to_hsv(self.input_data)
        self.output_data = treshold_otsu(source)


class LabSegmenter(SegmentationType):
    DISPLAY_NAME = 'LAB'

    def process(self):
        source = rgb_to_lab(self.input_data)
        self.output_data = global_threshold(source)


segmenter_class = {
'hsv': HSV_Segmentation,
'lab': LAB_Segmenter
}.get(procedure)

if not segmenter_class:
    raise ArgumentError("Invalid segmentation method '{}'".format(procedure))

for img in images:
    os.chdir(img_dir)
    processor =  = segmenter_class(img, img_dir, procedure)
    processor.start_pipeline()

However, I am not sure how to call the map funtion:

image_lst = os.listdir(my_image_path)

# We split the list into sublist with 5 elements because of 512 GB RAM limitation
if len(image_lst) > 4:
    nr_of_sublists = int(len(image_lst)/2.5)
    image_sub_lst  =np.array_split(image_lst, nr_of_sublists)
else:
    image_sub_lst = [image_lst]

# We do the analysis for each sublist
for sub_lst in image_sub_lst:
    print (sub_lst)
    pool = multiprocessing.Pool(8)
    
    # Call the processor 
    processor =  = segmenter_class(img, img_dir, procedure)
    processor.start_pipeline()
    # How to call map???
    pool.map(?, sub_lst)
    pool.terminate()
    

EDIT:

I tried to change the code to the comment but still getting an error:

import os
import multiprocessing

class SegmentationType(object):
    DISPLAY_NAME = "invalid"

    def __init__(self):

        print ('init')

    def read_image(self):
        print ('read')

    def write_image(self):
        print ('write')

    def process(self):
        # override in derived classes to perform an actual segmentation
        pass

    def start_pipeline(self, args):
        print ('ok starting')
        filename, path = args
        print(filename, path)
        self.process()

class HSV_Segmenter(SegmentationType):
    DISPLAY_NAME = 'HSV'

    def process(self):
        print ('ok HSV')

class LabSegmenter(SegmentationType):
    DISPLAY_NAME = 'LAB'

    def process(self):
        print ('ok LAB')

procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter
}.get(procedure)

images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

if __name__ == '__main__':
    pool = multiprocessing.Pool(3)
    pool.map(segmenter_class.start_pipeline, [images, img_dir])
    pool.terminate()

Error: The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:/Users/lueck/PycharmProjects/hyphae_cmd/hyphae_cmd/multi.py", line 50, in pool.map(segmenter_class.start_pipeline, [images, img_dir]) File "C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hyphae_env\lib\multiprocessing\pool.py", line 266, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hyphae_env\lib\multiprocessing\pool.py", line 644, in get raise self._value TypeError: start_pipeline() missing 1 required positional argument: 'args'

honeymoon
  • 2,400
  • 5
  • 34
  • 43
  • 1
    `map` needs function's name which has to get one argument - if you need more arguments then send as list of arguments. And it will run this function many times using `Pool` and every function with get one element form `sub_list` . I don't know if there is need to split list if it can run only 8 processes at the same time - maybe use `Pool(5)` instead of spliting list. If you create sublist with 5 items and use `Pool(8)` then 3 processes will never used. – furas Jul 08 '20 at 10:00
  • Thanks I tried to call the start_pipeline function with map but getting an AttributeError. Also print (filename, path) is not called. Any help would be very appreciated. – honeymoon Jul 14 '20 at 06:57
  • always put full error message (starting at word "Traceback") in question (not comment) as text (not screenshot). There are other useful information. – furas Jul 14 '20 at 07:24
  • `start_pipeline` doesn't get arguments so using it with `map()` is useless. When you run `map(start_pipeline, [images, img_dir])` then it try to run `start_pipeline(images)` and `start_pipeling(img_dir)` in separated processes. – furas Jul 14 '20 at 07:27
  • Ok, so I moved args into start_pipline function and added the full traceback. – honeymoon Jul 14 '20 at 07:37
  • 1
    you have to create instance of class - you need `()` in `segmenter_class().start_pipeline` – furas Jul 14 '20 at 07:43

1 Answers1

2

You have to create list with pairs (filename, path)

data = [(img, img_dir) for img in images]

and then map will run every pair in separated process.

But you have to get args in start_pipeline

    def start_pipeline(self, args):
        print('ok starting')
        
        filename, path = args
        print('filename: {}\npath: {}'.format(filename, path))
        
        return self.process()

And you have to use () to create instance of class segmenter_class to use start_pipeline

pool.map(segmenter_class().start_pipeline, data)

BTW: In example code I also return result from process.


import os
import multiprocessing

class SegmentationType(object):
    DISPLAY_NAME = "invalid"

    def __init__(self):
        print('init')

    def read_image(self):
        print('read')

    def write_image(self):
        print('write')

    def process(self):
        # override in derived classes to perform an actual segmentation
        pass

    def start_pipeline(self, args):
        print('ok starting')
        
        filename, path = args
        print('filename: {}\npath: {}'.format(filename, path))
        
        return self.process()

class HSV_Segmenter(SegmentationType):
    DISPLAY_NAME = 'HSV'

    def process(self):
        print('ok HSV')
        return "result HSV"
    
class LabSegmenter(SegmentationType):
    DISPLAY_NAME = 'LAB'

    def process(self):
        print('ok LAB')
        return "result LAB"

if __name__ == '__main__':

    procedure = 'hsv'
    
    segmenter_class = {
        'hsv': HSV_Segmenter,
        'lab': LabSegmenter,
    }.get(procedure)
    
    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    data = [(img, img_dir) for img in images]
    
    pool = multiprocessing.Pool(3)

    # example 1

    results = pool.map(segmenter_class().start_pipeline, data)
    print('Results:', results)

    # example 2

    for result in pool.map(segmenter_class().start_pipeline, data):
        print('result:', result)

    pool.terminate()

EDIT:

You can also create function which gets procedure and data and then use it in map - this way every process will create own instance of procedure or you can send different procedures to different processes.

import os
import multiprocessing

class SegmentationType(object):
    DISPLAY_NAME = "invalid"

    def __init__(self):
        print('init')

    def read_image(self):
        print('read')

    def write_image(self):
        print('write')

    def process(self):
        # override in derived classes to perform an actual segmentation
        pass

    def start_pipeline(self, args):
        print('ok starting')
        
        filename, path = args
        print('filename: {}\npath: {}'.format(filename, path))
        
        return self.process()

class HSV_Segmenter(SegmentationType):
    DISPLAY_NAME = 'HSV'

    def process(self):
        print('ok HSV')
        return "result HSV"
    
class LabSegmenter(SegmentationType):
    DISPLAY_NAME = 'LAB'

    def process(self):
        print('ok LAB')
        return "result LAB"

segmenters = {
    'hsv': HSV_Segmenter,
    'lab': LabSegmenter,
}

def start_process(args):

    procedure = args[0]
    data = args[1:]

    segmenter_class = segmenters.get(procedure)
    result = segmenter_class().start_pipeline(data)

    return result
    
if __name__ == '__main__':

    procedure = 'hsv'
    
    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    data = [(procedure, img, img_dir) for img in images]
    
    pool = multiprocessing.Pool(3)

    # example 1

    results = pool.map(start_process, data)
    print('Results:', results)

    # example 2

    for result in pool.map(segmenter_class().start_pipeline, data):
        print('result:', result)

    pool.terminate()

Example with different procedures

if __name__ == '__main__':

    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    pool = multiprocessing.Pool(3)

    data = [('hsv', img, img_dir) for img in images]
    results = pool.map(start_process, data)
    print('Results HSV:', results)

    data = [('lab', img, img_dir) for img in images]
    results = pool.map(start_process, data)
    print('Results LAB:', results)

    pool.terminate()

And the same with one map(). There are 6 processes to start and Pool(3) so it will run only 3 process at the same time and when it will have free process then map will get next values from list and run process.

if __name__ == '__main__':

    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    data_hsv = [('hsv', img, img_dir) for img in images]
    data_lab = [('lab', img, img_dir) for img in images]
    
    data = data_hsv + data_lab

    pool = multiprocessing.Pool(3)

    # example 1

    results = pool.map(start_process, data)
    print('Results:', results)

    # example 2

    for result in pool.map(start_process, data):
        print('results:', result)

    pool.terminate()

EDIT:

It works also with Ray

It needs only

from ray.util import multiprocessing

instead of

import multiprocessing

I didn't tested it with Dask, PySpark or Joblib


EDIT:

Example with Joblib

from joblib import Parallel, delayed

class SegmentationType(object):
    DISPLAY_NAME = "invalid"

    def __init__(self):
        print('init')

    def read_image(self):
        print('read')

    def write_image(self):
        print('write')

    def process(self):
        # override in derived classes to perform an actual segmentation
        pass

    def start_pipeline(self, args):
        print('ok starting')
        
        filename, path = args
        print('filename: {}\npath: {}'.format(filename, path))
        
        return self.process()

class HSV_Segmenter(SegmentationType):
    DISPLAY_NAME = 'HSV'

    def process(self):
        print('ok HSV')
        return "result HSV"

class LabSegmenter(SegmentationType):
    DISPLAY_NAME = 'LAB'

    def process(self):
        print('ok LAB')
        return "result LAB"

segmenters = {
    'hsv': HSV_Segmenter,
    'lab': LabSegmenter,
}

def start_process(args):
    
    procedure = args[0]
    data = args[1:]
    
    segmenter_class = segmenters.get(procedure)
    result = segmenter_class().start_pipeline(data)
    
    return result

if __name__ == '__main__':

    images = ['01.png', '02.png', '03.png']
    img_dir = 'C:/'
    
    data_hsv = [('hsv', img, img_dir) for img in images]
    data_lab = [('lab', img, img_dir) for img in images]
    
    data = data_hsv + data_lab

    # --- version 1 ---

    #pool = Parallel(n_jobs=3, backend='threading')
    #pool = Parallel(n_jobs=3, backend='multiprocessing')
    pool = Parallel(n_jobs=3)
    
    # example 1
    
    results = pool( delayed(start_process)(args) for args in data )
    print('Results:', results)

    # example 2
    
    for result in pool( delayed(start_process)(args) for args in data ):
        print('result:', result)

    # --- version 2 ---
    
    #with Parallel(n_jobs=3, backend='threading') as pool:
    #with Parallel(n_jobs=3, backend='multiprocessing') as pool:
    with Parallel(n_jobs=3) as pool:

        # example 1
        
        results = pool( delayed(start_process)(args) for args in data )
        print('Results:', results)

        # example 1

        for result in pool( delayed(start_process)(args) for args in data ):
            print('result:', result)
        
furas
  • 134,197
  • 12
  • 106
  • 148
  • Thanks a lot, now it's more clear to me how it works. The second option is interesting. Thanks again! – honeymoon Jul 14 '20 at 08:24
  • 1
    BTW: I tested it with module [Ray](https://docs.ray.io/en/master/index.html) instead of `multiprocessing` and it need only `from ray.util import multiprocessing` and rest is the same. I didn't try with module [Dask](https://dask.org/) or [PySpark](https://spark.apache.org/) – furas Jul 14 '20 at 09:37
  • 1
    Thanks for the hint. I did some comparisons, both modules performs quite similar: single: 5 minutes per image; multiprocessing: 2 minutes per image; Ray: 2 minutes per image ;Image processing on large images (~23880x 30400) – honeymoon Jul 14 '20 at 18:05
  • 1
    I added exmaple with `Joblib` - it can use `multiprocessing` or `threating`. – furas Jul 14 '20 at 19:31
  • Thanks, also going to try this option too. I have one more question. My understanding with multiprocessing.Pool() is that the tasks (reading, writing, image processing) to analysis all images is distributed over all available cores. Is it possible to dedicate one core for one image with the entire process? Or did I misunderstand something? The point is that in case of network or other hardware problems, I can not follow which images were processed and which not. The analysis time takes several hours to days. – honeymoon Jul 15 '20 at 06:39
  • 1
    `Pool()` only creates processes and reuses them without deleting and creating again - so it should work little faster. And with `Pool()` you need less code. As I know Python doesn't control cores because system does it - it has to distribute all running processes, not only python processes. But I saw some question about binding process to one code and at this moment found only [How to limit number of cores with threading](https://stackoverflow.com/questions/41401490/how-to-limit-number-of-cores-with-threading) which mentions `affinity` in `multiprocessing.Process()` but I never use it. – furas Jul 15 '20 at 07:02
  • 1
    as for problems with network/hardware you should save information about processed imags in `log`, normal file or database. In examples I use `result` to send information about finished process - and you can use it to send back some information which can be saved in log/file - but in this situation I would run it with `for result in pool.map(...): save_result` to save every result when it is ready and not wait for all results. – furas Jul 15 '20 at 07:06
  • 1
    other [How to pin different processes to individual cpu cores in Python](https://stackoverflow.com/questions/43538141/how-to-pin-different-processes-to-individual-cpu-cores-in-python/43538335) and [Designate specific CPU for a process - python multiprocessing](https://stackoverflow.com/questions/36172101/designate-specific-cpu-for-a-process-python-multiprocessing) – furas Jul 15 '20 at 07:17
  • Thanks, I see the point. This what I did now, I logged when an image got processed. – honeymoon Jul 15 '20 at 07:25