I want to use multiprocessing to analyze several images in parallel, with my class:
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self, filename, path):
self.filename = filename
self.path = path
self.input_data = None
self.output_data = None
def read_image(self):
self.input_data = cv2.imread(self.path + self.filename)[1]
def write_image(self):
cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self):
self.read_image()
self.process()
self.write_image()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
source = rgb_to_hsv(self.input_data)
self.output_data = treshold_otsu(source)
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
source = rgb_to_lab(self.input_data)
self.output_data = global_threshold(source)
segmenter_class = {
'hsv': HSV_Segmentation,
'lab': LAB_Segmenter
}.get(procedure)
if not segmenter_class:
raise ArgumentError("Invalid segmentation method '{}'".format(procedure))
for img in images:
os.chdir(img_dir)
processor = = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()
However, I am not sure how to call the map funtion:
image_lst = os.listdir(my_image_path)
# We split the list into sublist with 5 elements because of 512 GB RAM limitation
if len(image_lst) > 4:
nr_of_sublists = int(len(image_lst)/2.5)
image_sub_lst =np.array_split(image_lst, nr_of_sublists)
else:
image_sub_lst = [image_lst]
# We do the analysis for each sublist
for sub_lst in image_sub_lst:
print (sub_lst)
pool = multiprocessing.Pool(8)
# Call the processor
processor = = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()
# How to call map???
pool.map(?, sub_lst)
pool.terminate()
EDIT:
I tried to change the code to the comment but still getting an error:
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print ('init')
def read_image(self):
print ('read')
def write_image(self):
print ('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print ('ok starting')
filename, path = args
print(filename, path)
self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print ('ok HSV')
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print ('ok LAB')
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
if __name__ == '__main__':
pool = multiprocessing.Pool(3)
pool.map(segmenter_class.start_pipeline, [images, img_dir])
pool.terminate()
Error: The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "C:/Users/lueck/PycharmProjects/hyphae_cmd/hyphae_cmd/multi.py", line 50, in pool.map(segmenter_class.start_pipeline, [images, img_dir]) File "C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hyphae_env\lib\multiprocessing\pool.py", line 266, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "C:\Users\lueck\AppData\Local\Continuum\anaconda3\envs\hyphae_env\lib\multiprocessing\pool.py", line 644, in get raise self._value TypeError: start_pipeline() missing 1 required positional argument: 'args'