I am writing a pipeline to slides pictures into 256 * 256, each of those 256 * 256 will be processed with image operation like right flipping, left flipping, elastic distortion, gamma correction, etc. The operations itself are not implemented by me but Numpy, Skiimage or OpenCV, so the problem can not be the operations themselves.
My idea is to create a thread pool of 24 threads, each of them will get an initial amount of images which they should process independently of each other, after the processing I will collect the result and return them back. However my code doesn't seem to utilize the CPU power very well.
The implementation of a single thread.
class ImageWorker(Thread):
def __init__(self):
Thread.__init__(self)
self.tasks = []
self.result = []
self.pipeline = get_pipeline()
def add_task(self, task):
self.tasks.append(task)
def run(self):
for _ in range(len(self.tasks)):
task = self.tasks.pop(0)
for p in self.pipeline:
result = p.do(task)
self.result.append(result)
The implementation of a thread pool
class ImageWorkerPool:
def __init__(self, num_threads):
self.workers = []
self.work_index = 0
for _ in range(num_threads):
self.workers.append(ImageWorker())
def add_task(self, task):
self.workers[self.work_index].add_task(task)
self.work_index += 1
self.work_index = self.work_index % len(self.workers)
assert self.work_index < len(self.workers)
def start(self):
for worker in self.workers:
worker.start()
def complete_and_return_result(self):
for worker in self.workers:
worker.join()
result = []
for worker in self.workers:
result.extend(worker.result)
return result
And this is how I create and populate a thread pool.
threadpool = ImageWorkerPool(num_threads=24)
for _ in tqdm(range(len(images)), desc="Augmentation"):
task = tasks.pop(0)
threadpool.add_task(task)
threadpool.start()
result = threadpool.complete_and_return_result()
I have a very beefy CPU with 24 Threads, but they are mostly utilized at 10% most. What is the problem?
Edited: After changing from multithreading
to multiprocessing
, this is how the performance looks like. The code finished after 20 seconds in comparison to 15 minutes with multithreading
. Thanks, @AMC and @quamrana