15

I built a custom architecture with keras (a convnet). The network has 4 heads, each outputting a tensor of different size. I am trying to write a custom loss function as a function of this 4 outputs. I have been implementing cusutom losses before, but it was either a different loss for each head or the same loss for each head. In this case, I need to combine the 4 outputs to calculate the loss.

I am used to the following:

def custom_loss(y_true, y_pred):
    return something
model.compile(optimizer, loss=custom_loss)

but in my case, I would need y_pred to be a list of the 4 outputs. I can pad the outputs with zeros and add a concatenate layer in my model, but I was wondering if there was an easier way around.

Edit

My loss function is rather complex, can I write something like:

model.add_loss(custom_loss(input1, input2, output1, output2))

where custom loss is defined as:

def custom_loss(input1, input2, output1, output2):
    return loss
nbro
  • 15,395
  • 32
  • 113
  • 196
defqoon
  • 173
  • 1
  • 2
  • 7

3 Answers3

20

You could try the model.add_loss() function. The idea is to construct your custom loss as a tensor instead of a function, add it to the model, and compile the model without further specifying a loss. See also this implementation of a variational autoencoder where a similar idea is used.

Example:

import keras.backend as K
from keras.layers import Input, Dense
from keras.models import Model
from keras.losses import mse
import numpy as np

# Some random training data
features = np.random.rand(100,20)
labels_1 = np.random.rand(100,4)
labels_2 = np.random.rand(100,1)

# Input layer, one hidden layer
input_layer = Input((20,))
dense_1 = Dense(128)(input_layer)

# Two outputs
output_1 = Dense(4)(dense_1)
output_2 = Dense(1)(dense_1)

# Two additional 'inputs' for the labels
label_layer_1 = Input((4,))
label_layer_2 = Input((1,))

# Instantiate model, pass label layers as inputs
model = Model(inputs=[input_layer, label_layer_1, label_layer_2], outputs=[output_1, output_2])

# Construct your custom loss as a tensor
loss = K.mean(mse(output_1, label_layer_1) * mse(output_2, label_layer_2))

# Add loss to model
model.add_loss(loss)

# Compile without specifying a loss
model.compile(optimizer='sgd')

dummy = np.zeros((100,))
model.fit([features, labels_1, labels_2], dummy, epochs=2)
Pedro
  • 842
  • 6
  • 16
sdcbr
  • 7,021
  • 3
  • 27
  • 44
  • 3
    how do you pass validation data using this method? the bracketings does not follow the normal definition of val_data = (Xval, [Yval1, Yval2, ...]) – Corse Dec 09 '19 at 08:32
  • 5
    This solution is not valid for tensorflow==2.1.0 and not valid for Keras==2.3.1. Can you please help revise this answer to reflect the updated packages? And is it possible to recall what version of the packages was used to generate this answer? – curious_dan May 06 '20 at 11:12
  • Hi, this was for sure with tensorflow 1.x. I don't really have time to figure it out right now, but I'll add a disclaimer to my answer – sdcbr May 06 '20 at 15:45
  • 1
    This worked for me! I'm using `tensorflow==2.2.0` and `tf.keras`. Perhaps vanilla keras does not have this functionality? – quartzsaber Jun 27 '20 at 05:28
  • 2
    I don't see a reason why this should not work. And in fact it does, just tested with the latest nightly from today (2.5.0.dev20201028). There is just a type-o in the loss function and the fit call was not correct, the latter leading to people thinking this does not work any more. I have edited the answer and hope this will get accepted soon. – Pedro Oct 28 '20 at 15:54
  • Thanks for that! :) – sdcbr Oct 28 '20 at 16:03
  • what if your target data is in the form of a tf.dataset object? – JohnVS Nov 23 '20 at 07:33
1

You could pack your outputs together in a tf.ExtensionType, and unpack it again in the loss function.

I made a Colab Notebook that demonstrates how to do this in tensorflow 2.8.0. (https://colab.research.google.com/drive/1MjlddizqFlezAUu5SOOW8svlnKQH4rog#scrollTo=pDMskk-86wFY)

Pros of using this approach vs add_loss():

  • No need to define "dummy" labels at inference time.
  • No need to define the loss within the model.
  • pretty nice

Cons:

  • your model now outputs an object with the outputs as fields instead of the tensors directly (which is maybe a pro for your use case).
  • At the time of writing this answer, tf.ExtensionTypess don't work with Tensorflow Serving)

I'm adding the full code here, just in case I accidentally delete the Colab Notebook:

import tensorflow as tf
import tensorflow_datasets as tfds
# tf.__version__ should be >= 2.8.0
print(tf.__version__)


class PackedTensor(tf.experimental.BatchableExtensionType):
    __name__ = 'extension_type_colab.PackedTensor'

    output_0: tf.Tensor
    output_1: tf.Tensor

    # shape and dtype hold no meaning in this context, so we use a dummy
    # to stop Keras from complaining

    shape = property(lambda self: self.output_0.shape)
    dtype = property(lambda self: self.output_0.dtype)

    class Spec:

        def __init__(self, shape, dtype=tf.float32):
            self.output_0 = tf.TensorSpec(shape, dtype)
            self.output_1 = tf.TensorSpec(shape, dtype)

        # shape and dtype hold no meaning in this context, so we use a dummy
        # to stop Keras from complaining
        shape: tf.TensorShape = tf.constant(1.).shape 
        dtype: tf.DType = tf.constant(1.).dtype


# these two functions have no meaning, but need dummy implementations
# to stop Keras from complaining
@tf.experimental.dispatch_for_api(tf.shape)
def packed_shape(input: PackedTensor, out_type=tf.int32, name=None):
    return tf.shape(input.col_ids)

@tf.experimental.dispatch_for_api(tf.cast)
def packed_cast(x: PackedTensor, dtype: str, name=None):
    return x


class SCCEWithExtraOutput(tf.keras.losses.Loss):
    """ This custom loss function is designed for models with an PackedTensor as
    a single output, so with attributes outputs_0 and outputs_1. This loss will 
    train a model so that outputs_0 represent the predicted class of the input
    image, and outputs_1 will be trained to always be zero (as a dummy). 
    """
    def __init__(self, *args, **kwargs):
        super(SCCEWithExtraOutput, self).__init__(*args, **kwargs)
        self.loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    def call(self, y_true, y_pred):
        output_0, output_1 = y_pred.output_0, y_pred.output_1
        scce = self.loss_fn(y_true, output_0)
        return scce + tf.abs(output_1)



# load the datasets
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)


# create a layer to combine to pack the outputs in a PackedTensor
class PackingLayer(tf.keras.layers.Layer):
  def call(self, inputs, training=None):
    first_output, second_output = inputs
    packed_output = PackedTensor(first_output, second_output)
    return packed_output

# define the model
#
# inputs -> flatten -> hidden -> Dense(10) -> PackingLayer() -> outputs
#                           |--> Dense(1)  ----^ 
inputs = tf.keras.Input(shape=(28, 28, 1), dtype=tf.float32)
flatten_layer = tf.keras.layers.Flatten()
hidden_layer = tf.keras.layers.Dense(128, activation='relu')
first_output_layer = tf.keras.layers.Dense(10)
second_output_layer = tf.keras.layers.Dense(1)
packing_layer = PackingLayer()

hidden = flatten_layer(inputs)
hidden = hidden_layer(hidden)
first_output = first_output_layer(hidden)
second_output = second_output_layer(hidden)
outputs = packing_layer((first_output, second_output))
model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss=SCCEWithExtraOutput(),
    # metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
)

model.fit(
    ds_train,
    epochs=1,
    # validation_data=ds_test,
)
model.save("savedmodel")

for index, sample in enumerate(ds_train):
  predicted_packed_tensor = model(sample[0])
  print(predicted_packed_tensor.output_0.shape, predicted_packed_tensor.output_1.shape)
  print(type(predicted_packed_tensor))
  if index > 10:
    break



# prove we can also load and infer the model in a completely new process
# notice that as the class PackedTensor does not exist in this process,
# the model now returns a tensorflow.python.framework.extension_type.AnonymousExtensionType
# with attributes "output_0" and "output_1".

import subprocess

script = """
import tensorflow as tf
import tensorflow_datasets as tfds
model = tf.saved_model.load("savedmodel")
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  return tf.cast(image, tf.float32) / 255., label

ds_train = ds_train.map(normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(20)

for index, sample in enumerate(ds_train):
  predicted = model(sample[0])
  print(predicted.output_0.shape, predicted.output_1.shape)
  print(type(predicted))
  if index > 5:
    break
"""
pipes = subprocess.Popen(["python3", "-c", script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
for line in std_out.decode().split("\n"):
  print(line)

Frederik Bode
  • 2,632
  • 1
  • 10
  • 17
0

dummy variables are not needed when fit the model

so, you might use model.fit([features, labels_1, labels_2], epochs=2)

then it works well under

tensorflow version '1.14.0' keras.version '2.3.1'