0

I am new to Tensorflow and wrote the following distributed training code. The code works fine.

import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow_hub as hub
import tensorflow.python.keras.backend as K
#1. Define Workers
def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_id=0, task_type="worker",rpc_layer="grpc")
  return cluster_resolver

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
    "Universal Sentence Encoder embeddings also support short paragraphs. "
    "There is no hard limit on how long the paragraph is. Roughly, the longer "
    "the more 'diluted' the embedding will be.")
messages = [word, sentence, paragraph]
#labels=["1","2","3"]
reviews = [[1,0,0],[0,1,0],[0,0,1]]


encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

X_train=encoder(messages)

BUFFER_SIZE = len(X_train)
BATCH_SIZE_PER_REPLICA = 2
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 4


with strategy.scope():

    model = keras.Sequential()

    model.add(
        keras.layers.Dense(
            units=256,
            input_shape=(X_train.shape[1],),
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(
        keras.layers.Dense(
            units=128,
            activation='relu'
        )
    )
    model.add(
        keras.layers.Dropout(rate=0.5)
    )

    model.add(keras.layers.Dense(3, activation='softmax'))
    # model.compile(
    #     loss='categorical_crossentropy',
    #     optimizer=keras.optimizers.Adam(0.001),
    #     metrics=['accuracy']
    # )

    # history = model.fit(
    #     np.array(X_train), np.array(reviews),
    #     epochs=10,
    #     batch_size=16,
    #     verbose=1,
    #     shuffle=True
    # )
    optimizer=keras.optimizers.Adam(0.001)
    accuracy = keras.metrics.Accuracy()


def step_fn(x_train_slice):

    x_train, y_train = next(x_train_slice)
    with tf.GradientTape() as tape:
        pred=model(x_train,training=True)
        # tf.print(x_train)
        # tf.print(pred)
        # tf.print(y_train)

        per_example_loss = keras.losses.CategoricalCrossentropy(
            reduction=tf.keras.losses.Reduction.NONE)(y_train, pred)
        loss = tf.nn.compute_average_loss(per_example_loss)
        gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    tf.print("train values are",x_train)
    tf.print(" pred Values are : ", pred)
    tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem
    tf.print(" actual_pred Values are : ", actual_pred)
    tf.print(" Labels  are : ", y_train)
    tf.print(" Labels Max Values are : ", tf.argmax(y_train))
    accuracy.update_state(y_train, actual_pred)
    tf.print("Accuracy is : ",accuracy.result())
    return loss

@tf.function
def distributed_train_step(x_train_slice):
    losses = strategy.run(step_fn,args=(x_train_slice,))
    return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)


@tf.function
def per_worker_dataset_fn():
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, reviews)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
    # test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
    # test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
    return train_dist_dataset


coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
num_epoches = 5
steps_per_epoch = 1
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
    # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))

The problem is, in the step_fn once I get the prediction values I would like to get the corresponding labels, for this I have used this line of code tf.print(" ArgMAx Values are ",tf.math.argmax(pred,axis=0)) #problem

The argmax gives the array of indices for max probabilities. I would like to extract this as numpy array and index it to reviews array (One-Hot encoded values) to get the confusion matrix.

But I'm not able to convert tf.math.argmax(pred,axis=0) tensor to numpy array. I tried many approaches like eval(K.get_session()) and so on but nothing worked. Any help is appreciated.

Thanks much

Raj
  • 401
  • 6
  • 20
  • Possibly a duplicate of: https://stackoverflow.com/questions/34097281/convert-a-tensor-to-numpy-array-in-tensorflow#34097344 – Mark H Feb 07 '21 at 23:59
  • @MarkH The problem I have is the same but none of the solutions described there works for this code as it is a distributed training code and eager evaluation is turned off. I also tried the `tf.math.argmax(pred,axis=0)` options described none worked for me – Raj Feb 08 '21 at 00:10
  • Right, I see that complicates things. You might have to 'gather' those variables back to the box running the coordinator. See https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/ParameterServerStrategy#gather – Mark H Feb 08 '21 at 01:18
  • Thanks @MarkH tried to implement gather but was not able to get the code working it is giving `RuntimeError: tf.distribute.Strategy.gather method requires cross-replica context, use get_replica_context().all_gather() instead` Besides I don't think it will give numpy array. – Raj Feb 08 '21 at 03:00

1 Answers1

1

OK, I found two solutions here.

Here's the way you probably should do it:

Add some more Keras metrics after accuracy that you can use in computing the confusion matrix:

accuracy = keras.metrics.Accuracy()
tp = keras.metrics.TruePositives()
tn = keras.metrics.TrueNegatives()
fp = keras.metrics.FalsePositives()
fn = keras.metrics.FalseNegatives()

Now update those as well in step_fn:

accuracy.update_state(y_train, actual_pred)
argmax_pred = tf.one_hot(tf.math.argmax(pred,axis=1),depth=pred.shape[1])
tp.update_state(y_train, argmax_pred)
tn.update_state(y_train, argmax_pred)
fp.update_state(y_train, argmax_pred)
fn.update_state(y_train, argmax_pred)

Now you can access the result back where you were accessing the accuracy results:

coordinator.join()
print ("Finished epoch %d, accuracy is %f.",(i,accuracy.result().numpy()))
print ("TP=%f  TN=%f  FP=%f  FN=%f" % (tp.result().numpy(),tn.result().numpy(),fp.result().numpy(),fn.result().numpy()))

That should do the trick for you.


Here's another way to do it:

The strategy is just keep returning your argmax values until they're back in your main loop where they'll appear as RemoteValue objects and then fetch() their values.

For example, in step_fn, send back your argmax values to the calling function:

return (loss, tf.math.argmax(pred,axis=0))

Then, in distributed_train_step, adjust for the tuple being returned, and keep returning the argmax to the next step, maybe like so:

def distributed_train_step(x_train_slice):
    (losses,argmaxes) = strategy.run(step_fn,args=(x_train_slice,))
    strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
    return argmaxes

Notice there that I moved your strategy.reduce from the return line to its own line. You weren't using the returned value anyway, because you had no lval for the coordinator.schedule line, but now you can add one to grab those returned argmaxes:

argmaxes = coordinator.schedule(distributed_train_step, args=(per_worker_iterator,))
print ("Back at home, argmaxes=",argmaxes.fetch())

Make sure you use the fetch() command, because argmaxes will be different than a Tensor once it makes it back like this. The RemoteValue class is documented here: https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/coordinator/RemoteValue

You'd need to expand this solution by returning any other values you were going to use for calculating TP/FP/TN/FN on your own.

Mark H
  • 4,246
  • 3
  • 12
  • 1
    Hi Mark, Thank you so much for your effort! The first solution is good, but in the line `tp.update_state(y_train, actual_pred)` instead of `actual_pred` should be `tf.argmax(...)` and then use those indices to fetch the `reviews` values, that is the problem I am trying to solve. `actual_pred` is giving wrong values because I just need to pick the max probability not greater than 0.5. The second solution, did give the indices at the last step, but we don't have the slice of y_train that corresponds to the argmaxes – Raj Feb 08 '21 at 06:23
  • To obtain the slice I did `x_train,y_train=next(per_worker_iterator)` before `argmaxes = coordinator.schedule(...)` this threw `NotImplementedError: Iterating over an AsyncDistributedIterator is not supported right now.` Error. Somehow, we should replace `actual_pred` with `tf.argmax(...)` then the first solution will be very clean – Raj Feb 08 '21 at 06:27
  • You're right - that would be the right value to use! The second solution wasn't meant to be a full solution to your problem, just a fundamental way to return values, so I added a note on returning more values than just argmax. That begins to get really messy, so I agree that the keras.metric.* approach is really the best. – Mark H Feb 08 '21 at 07:04
  • Just fixed the missing one hot encoding as well. – Mark H Feb 08 '21 at 07:40
  • Thank you so much for the help Mark! this relieved a lot of pressure on me. This works as expected now. But the accuracy is very low in few slices it is 0.33 but that is a different issue let me see how I can fix it. – Raj Feb 08 '21 at 16:49
  • I think you're supposed to be calling model.fit(x_train, y_train) to train the model and then use model.evaluate() to check performance. – Mark H Feb 08 '21 at 17:36
  • Yes, I wrote the code in that way initially but it threw an exception saying that you should be calling model(...) and not fit. So changed the call to what it is now. In the stand alone mode I have used fit and evaluate for the same dataset it produces 0.99 accuracy but with distributed setup it is giving 0.33 – Raj Feb 08 '21 at 18:12
  • Besides, I replaced Universal Sentence Encoder with sBERT model still the accuracy is around 0.33 not sure what do to next – Raj Feb 08 '21 at 18:15
  • I got this error `RuntimeError: Detected a call to Model.fit inside a tf.function. Model.fit is a high-level endpoint that manages its own tf.function. Please move the call to Model.fit outside of all enclosing tf.functions. Note that you can call a `Model` directly on Tensors inside a tf.function like: model(x)`. – Raj Feb 08 '21 at 18:23
  • I can't test code now, but maybe try: model((x_train,y_train),training=True) – Mark H Feb 08 '21 at 20:49
  • You're correct we need to pass labels while training. Tried and got the following error `ValueError: Layer sequential expects 1 input(s), but it received 2 input tensors. Inputs received: [, ]`. – Raj Feb 08 '21 at 21:55
  • Beside, I also tried passing model(x_train,y_train,training=True) and got this error now `TypeError: call() got multiple values for argument 'training'`. There should be a way to pass the labels during the training face. – Raj Feb 08 '21 at 21:56