2

I am currently experimenting with generative adversarial networks in Keras. As proposed in this paper, I want to use the historical averaging loss function. Meaning that I want to penalize the change of the network weights. I am not sure how to implement it in a clever way.

I was implementing the custom loss function according to the answer to this post.

def historical_averaging_wrapper(current_weights, prev_weights):
    def historical_averaging(y_true, y_pred):
        diff = 0
        for i in range(len(current_weights)):
            diff += abs(np.sum(current_weights[i]) + np.sum(prev_weights[i]))
        return K.binary_crossentropy(y_true, y_pred) + diff
    return historical_averaging

The weights of the network are penalized, and the weights are changing after each batch of data.

My first idea was to update the loss function after each batch. Roughly like this:

prev_weights = model.get_weights()
for i in range(len(data)/batch_len):
    current_weights = model.get_weights()
    model.compile(loss=historical_averaging_wrapper(current_weights, prev_weights), optimizer='adam')
    model.fit(training_data[i*batch_size:(i+1)*batch_size], training_labels[i*batch_size:(i+1)*batch_size], epochs=1, batch_size=batch_size)
    prev_weights = current_weights

Is this reasonable? That approach seems to be a bit "messy" in my opinion. Is there another possibility to do this in a "smarter" way? Like maybe updating the loss function in a data generator and use fit_generator()? Thanks in advance.

MMikkk
  • 45
  • 7

1 Answers1

2

Loss functions are operations on the graph using tensors. You can define additional tensors in the loss function to hold previous values. This is an example:

import tensorflow as tf
import tensorflow.keras.backend as K
keras = tf.keras

class HistoricalAvgLoss(object):
  def __init__(self, model):
    # create tensors (initialized to zero) to hold the previous value of the
    # weights
    self.prev_weights = []
    for w in model.get_weights():
      self.prev_weights.append(K.variable(np.zeros(w.shape)))

  def loss(self, y_true, y_pred):
    err = keras.losses.mean_squared_error(y_true, y_pred)
    werr = [K.mean(K.abs(c - p)) for c, p in zip(model.get_weights(), self.prev_weights)]
    self.prev_weights = K.in_train_phase(
        [K.update(p, c) for c, p in zip(model.get_weights(), self.prev_weights)],
        self.prev_weights
    )
    return K.in_train_phase(err + K.sum(werr), err)

The variable prev_weights holds the previous values. Note that we added a K.update operation after the weight errors are calculated.

A sample model for testing:

model = keras.models.Sequential([
    keras.layers.Input(shape=(4,)),
    keras.layers.Dense(8),
    keras.layers.Dense(4),
    keras.layers.Dense(1),
])

loss_obj = HistoricalAvgLoss(model)

model.compile('adam', loss_obj.loss)
model.summary()

Some test data and objective function:

import numpy as np

def test_fn(x):
  return x[0]*x[1] + 2.0 * x[1]**2 + x[2]/x[3] + 3.0 * x[3]

X = np.random.rand(1000, 4)
y = np.apply_along_axis(test_fn, 1, X)

hist = model.fit(X, y, validation_split=0.25, epochs=10)

The model losses decrease over time, in my test.

Pedro Marques
  • 2,642
  • 1
  • 10
  • 10
  • Thanks for you answer Pedro. Great idea using a class to keep track of the previous weights. I still had a minor problem. The `model` variable in the loss function is not defined. I implemented a wrapper function like in the problem statement passing the model. That seems to work. – MMikkk Jul 22 '19 at 07:12