0

I have lots of images that I want to process in parallel.

By default Tensorflow can use multiple cores, here is some info on this https://stackoverflow.com/a/41233901/1179925

"Currently, this means that each thread pool will have one thread per CPU core in your machine."

By looking at htop I can see that not all cores are utilized at 100% in this default setting, so I want to set intra_op_parallelism_threads=1 and inter_op_parallelism_threads=1 and run n_cpu models in parallel, howewer it performs even worser.

On my notebook with 8 cores:

Single core sequential processing:

Model init time: 0.77 sec
Processing time: 37.58 sec

Multi CPU default Tensorflow settings:

Model init time: 0.76 sec
Processing time: 20.16 sec

This code using multiprocessing:

Model init time: 0.78 sec
Processing time: 39.14 sec

Here is my code using multiprocessing, I'm missing something?:

import os
import glob
import time
import argparse
from multiprocessing.pool import ThreadPool
import multiprocessing
import itertools

import tensorflow as tf
import numpy as np
from tqdm import tqdm
import cv2

MODEL_FILEPATH = './tensorflow_example/inception_v3_2016_08_28_frozen.pb'

def get_image_filepaths(dataset_dir):
    if not os.path.isdir(dataset_dir):
        raise Exception(dataset_dir, 'not dir!')

    img_filepaths = []
    extensions = ['**/*.jpg', '**/*.png', '**/*.JPG', '**/*.PNG']
    for ext in extensions:
        img_filepaths.extend(glob.iglob(os.path.join(dataset_dir, ext), recursive=True))

    return img_filepaths


class ModelWrapper():
    def __init__(self, model_filepath):
        # TODO: estimate this from graph itself
        # Hardcoded for inception_v3_2016_08_28_frozen.pb
        self.input_node_names = ['input']
        self.output_node_names = ['InceptionV3/Predictions/Reshape_1']
        self.input_img_w = 299
        self.input_img_h = 299

        input_tensor_names = [name + ":0" for name in self.input_node_names]
        output_tensor_names = [name + ":0" for name in self.output_node_names]

        self.graph = self.load_graph(model_filepath)

        self.inputs = []
        for input_tensor_name in input_tensor_names:
            self.inputs.append(self.graph.get_tensor_by_name(input_tensor_name))

        self.outputs = []
        for output_tensor_name in output_tensor_names:
            self.outputs.append(self.graph.get_tensor_by_name(output_tensor_name))

        config_proto = tf.ConfigProto(device_count={'GPU': 0},
                                      intra_op_parallelism_threads=1,
                                      inter_op_parallelism_threads=1)
        self.sess = tf.Session(graph=self.graph, config=config_proto)

    def load_graph(self, model_filepath):
        # Expects frozen graph in .pb format
        with tf.gfile.GFile(model_filepath, "rb") as f:
            graph_def = tf.GraphDef()
            graph_def.ParseFromString(f.read())
        with tf.Graph().as_default() as graph:
            tf.import_graph_def(graph_def, name="")
        return graph

    def predict(self, img):
        h, w, c = img.shape
        if h != self.input_img_h or w != self.input_img_w:
            img = cv2.resize(img, (self.input_img_w, self.input_img_h))

        batch = img[np.newaxis, ...]
        feed_dict = {self.inputs[0] : batch}
        outputs = self.sess.run(self.outputs, feed_dict=feed_dict) # (1, 1001)

        return outputs


def process_single_file(args):
    model, img_filepath = args

    img = cv2.imread(img_filepath)
    output = model.predict(img)


def process_dataset(dataset_dir):
    img_filepaths = get_image_filepaths(dataset_dir)

    start = time.time()
    model = ModelWrapper(MODEL_FILEPATH)
    print('Model init time:', round(time.time() - start, 2), 'sec')

    start = time.time()
    n_cpu = multiprocessing.cpu_count()
    for _ in tqdm(ThreadPool(n_cpu).imap_unordered(process_single_file,
                                                   zip(itertools.repeat(model), img_filepaths)),
                                                   total=len(img_filepaths)):
        pass
    print('Processing time:', round(time.time() - start, 2), 'sec')


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(dest='dataset_dir')
    args = parser.parse_args()

    process_dataset(args.dataset_dir)

Update:

After replacing multiprocessing.pool.ThreadPool with multiprocessing.Pool:

def process_dataset(dataset_dir):
    img_filepaths = get_image_filepaths(dataset_dir)

    start = time.time()
    model = ModelWrapper(MODEL_FILEPATH)
    print('Model init time:', round(time.time() - start, 2), 'sec')

    start = time.time()
    n_cpu = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(n_cpu)

    it = pool.imap_unordered(process_single_file, zip(itertools.repeat(model), img_filepaths))
    for _ in tqdm(it, total=len(img_filepaths)):
        pass

    print('Processing time:', round(time.time() - start, 2), 'sec')

I get an error:

Traceback (most recent call last):
  File "tensorflow_example/multi_core_cpu_inference_multiprocessing.py", line 110, in <module>
    process_dataset(args.dataset_dir)
  File "tensorflow_example/multi_core_cpu_inference_multiprocessing.py", line 99, in process_dataset
    for _ in tqdm(it, total=len(img_filepaths)):
  File "/usr/local/lib/python3.6/site-packages/tqdm/_tqdm.py", line 979, in __iter__
    for obj in iterable:
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/Cellar/python/3.6.5_1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.RLock objects
mrgloom
  • 20,061
  • 36
  • 171
  • 301
  • As its name suggests, `ThreadPool` uses multiple threads, not processes https://stackoverflow.com/questions/46045956/whats-the-difference-between-threadpool-vs-pool-in-python-multiprocessing-modul – Yohanes Gultom May 26 '19 at 21:48
  • @YohanesGultom What is the problem with threads? – mrgloom May 26 '19 at 22:46
  • Threads run only on a single core https://stackoverflow.com/questions/4496680/python-threads-all-executing-on-a-single-core – Yohanes Gultom May 27 '19 at 00:32
  • I think you want `multiprocessing.pool()` instead https://sebastianraschka.com/Articles/2014_multiprocessing.html#the-pool-class – Yohanes Gultom May 27 '19 at 00:48
  • @YohanesGultom What is typical use case of `ThreadPool` then? – mrgloom Jun 02 '19 at 00:44
  • Whenever you want to limit utilization to a single core or when most of the tasks are parallel but don't require much execution time, I guess – Yohanes Gultom Jun 02 '19 at 03:09

1 Answers1

0

Based on this unswer: https://stackoverflow.com/a/46779776/1179925

It works but not much faster than default paralellism offered by tensorflow itself.

import os
import glob
import time
import argparse
import multiprocessing

import tensorflow as tf
import numpy as np
from tqdm import tqdm
import cv2

# Running N_PROCESSES processes using multiprocessing pool

N_PROCESSES = 2
N_CPU = multiprocessing.cpu_count()
INTRA_N_THREADS = max(1, N_CPU // N_PROCESSES)
INTER_N_THREADS = max(1, N_CPU // N_PROCESSES)

print('N_PROCESSES', N_PROCESSES)
print('N_CPU', N_CPU)
print('INTRA_N_THREADS', INTRA_N_THREADS)
print('INTER_N_THREADS', INTER_N_THREADS)

MODEL_FILEPATH = './tensorflow_example/inception_v3_2016_08_28_frozen.pb'

def get_image_filepaths(dataset_dir):
    if not os.path.isdir(dataset_dir):
        raise Exception(dataset_dir, 'not dir!')

    img_filepaths = []
    extensions = ['**/*.jpg', '**/*.png', '**/*.JPG', '**/*.PNG']
    for ext in extensions:
        img_filepaths.extend(glob.iglob(os.path.join(dataset_dir, ext), recursive=True))

    return img_filepaths


class ModelWrapper():
    def __init__(self, model_filepath):
        # TODO: estimate this from graph itself
        # Hardcoded for inception_v3_2016_08_28_frozen.pb
        self.input_node_names = ['input']
        self.output_node_names = ['InceptionV3/Predictions/Reshape_1']
        self.input_img_w = 299
        self.input_img_h = 299

        input_tensor_names = [name + ":0" for name in self.input_node_names]
        output_tensor_names = [name + ":0" for name in self.output_node_names]

        self.graph = self.load_graph(model_filepath)

        self.inputs = []
        for input_tensor_name in input_tensor_names:
            self.inputs.append(self.graph.get_tensor_by_name(input_tensor_name))

        self.outputs = []
        for output_tensor_name in output_tensor_names:
            self.outputs.append(self.graph.get_tensor_by_name(output_tensor_name))

        config_proto = tf.ConfigProto(device_count={'GPU': 0},
                                      intra_op_parallelism_threads=INTRA_N_THREADS,
                                      inter_op_parallelism_threads=INTER_N_THREADS)
        self.sess = tf.Session(graph=self.graph, config=config_proto)

    def load_graph(self, model_filepath):
        # Expects frozen graph in .pb format
        with tf.gfile.GFile(model_filepath, "rb") as f:
            graph_def = tf.GraphDef()
            graph_def.ParseFromString(f.read())
        with tf.Graph().as_default() as graph:
            tf.import_graph_def(graph_def, name="")
        return graph

    def predict(self, img):
        h, w, c = img.shape
        if h != self.input_img_h or w != self.input_img_w:
            img = cv2.resize(img, (self.input_img_w, self.input_img_h))

        batch = img[np.newaxis, ...]
        feed_dict = {self.inputs[0] : batch}
        outputs = self.sess.run(self.outputs, feed_dict=feed_dict) # (1, 1001)

        return outputs


def process_chunk(img_filepaths):
    start = time.time()
    model = ModelWrapper(MODEL_FILEPATH)
    print('Model init time:', round(time.time() - start, 2), 'sec')

    for img_filepath in img_filepaths:
        img = cv2.imread(img_filepath)
        output = model.predict(img)


def process_dataset(dataset_dir):
    img_filepaths = get_image_filepaths(dataset_dir)

    start = time.time()
    pool = multiprocessing.Pool(N_PROCESSES)

    chunks = []
    n = len(img_filepaths) // N_PROCESSES
    for i in range(0, len(img_filepaths), n):
        chunk = img_filepaths[i:i+n]
        chunks.append(chunk)

    it = pool.imap_unordered(process_chunk, chunks)
    for _ in tqdm(it, total=len(img_filepaths)):
        pass

    print('Processing time:', round(time.time() - start, 2), 'sec')


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(dest='dataset_dir')
    args = parser.parse_args()

    process_dataset(args.dataset_dir)
mrgloom
  • 20,061
  • 36
  • 171
  • 301