I am new to Tensorflow and wrote the following distributed training code. The code works fine.
import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_hub as hub
import tensorflow.python.keras.backend as K
#1. Define Workers
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec, job_name="worker", task_index=i, config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec, job_name="ps", task_index=i, protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, task_id=0, task_type="worker",rpc_layer="grpc")
return cluster_resolver
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
variable_partitioner = (
tf.distribute.experimental.partitioners.FixedShardsPartitioner(
num_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
"Universal Sentence Encoder embeddings also support short paragraphs. "
"There is no hard limit on how long the paragraph is. Roughly, the longer "
"the more 'diluted' the embedding will be.")
messages = [word, sentence, paragraph]
#labels=["1","2","3"]
reviews = [[1,0,0],[0,1,0],[0,0,1]]
encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
X_train=encoder(messages)
BUFFER_SIZE = len(X_train)
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 4
with strategy.scope():
model = keras.Sequential()
model.add(
keras.layers.Dense(
units=256,
input_shape=(X_train.shape[1],),
activation='relu'
)
)
model.add(
keras.layers.Dropout(rate=0.5)
)
model.add(
keras.layers.Dense(
units=128,
activation='relu'
)
)
model.add(
keras.layers.Dropout(rate=0.5)
)
model.add(keras.layers.Dense(3, activation='softmax'))
# model.compile(
# loss='categorical_crossentropy',
# optimizer=keras.optimizers.Adam(0.001),
# metrics=['accuracy']
# )
# history = model.fit(
# np.array(X_train), np.array(reviews),
# epochs=10,
# batch_size=16,
# verbose=1,
# shuffle=True
# )
optimizer=keras.optimizers.Adam(0.001)
accuracy = keras.metrics.Accuracy()
def step_fn(x_train_slice):
x_train, y_train = next(x_train_slice)
with tf.GradientTape() as tape:
pred=model(x_train,training=True)
# tf.print(x_train)
# tf.print(pred)
# tf.print(y_train)
per_example_loss = keras.losses.CategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(y_train, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
tf.print("train values are",x_train)
tf.print(" pred Values are : ", pred)
tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
tf.print(" actual_pred Values are : ", actual_pred)
tf.print(" Labels are : ", y_train)
tf.print(" Labels Max Values are : ", tf.argmax(y_train))
accuracy.update_state(y_train, actual_pred)
tf.print("Accuracy is : ",accuracy.result())
return loss
@tf.function
def distributed_train_step(x_train_slice):
losses = strategy.run(step_fn,args=(x_train_slice,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
@tf.function
def per_worker_dataset_fn():
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, reviews)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
# test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
# test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
return train_dist_dataset
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
num_epoches = 5
steps_per_epoch = 1
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
The problem is, in the step_fn once I get the prediction values I would like to get the corresponding labels, for this I have used this line of code
tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
The argmax gives the array of indices for max probabilities. I would like to extract this as numpy array and index it to reviews array (One-Hot encoded values) to get the confusion matrix.
But I'm not able to convert tf.math.argmax(pred,axis=0)
tensor to numpy array. I tried many approaches like eval(K.get_session()) and so on but nothing worked. Any help is appreciated.
Thanks much