Update: solved by including x
, y
, and sess
into the arguments of the worker.
Original question:
I have two GPUs and plan to create one thread for one GPU to achieve (data) parallelism. My attempt:
import tensorflow as tf
gpus = ['/device:GPU:0', '/device:GPU:1']
n_devices = len(gpus)
graphs = []
for gpu in gpus:
with tf.device(gpu):
with tf.gfile.GFile(graph_def_pb_path, "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
for node in graph_def.node:
node.device = gpu
with tf.Graph().as_default() as g:
tf.import_graph_def(graph_def)
graphs.append(g)
xs = [g.get_tensor_by_name(<input_tensor_name>) for g in graphs]
ys = [g.get_tensor_by_name(<output_tensor_name>) for g in graphs]
sessions = [tf.Session(graph=g,
config=tf.ConfigProto(log_device_placement=True)) for g in graphs]
workers = [lambda x_val: sess.run(y, feed_dict={x: x_val})
for sess, x, y in zip(sessions, xs, ys)]
n_devices = len(graphs)
results = []
threads = [None] * n_devices
for i, image in enumerate(data):
t_idx = i % n_devices
thread = threads[t_idx]
if thread is not None:
output = thread.join()
results.append(output)
# see https://stackoverflow.com/q/6893968/688080
thread = ThreadWithReturnValue(target=workers[t_idx], args=(image,))
thread.start()
threads[t_idx] = thread
for thread in threads:
output = thread.join()
results.append(output)
for sess in sessions:
sess.close()
However, it seems only the last device is doing the job. And if I change the code to single-threaded as below, I can see the GPUs are occupied alternatingly, which indicates the devices are assigned correctly:
# after import the graph into the devices
# x: input tensor, y: output tensor
sessions = [tf.Session(graph=g,
config=tf.ConfigProto(log_device_placement=True)) for g in graphs]
result = [None] * len(data)
for i, image in enumerate(data):
sess = sessions[i % n_devices]
result[i] = sess.run(y, feed_dict={x: image})
So how to correct the code or what is the correct way to do data parallelism in inference stage for freezed graph?