2

My problem is a continue to this question How to create federated dataset from a CSV file?

i manage to load a federated dataset from a given csv file and load both the train and the test data.

My question now is how to reproduce a working example to build an iterative process that performs a custom federated averaging on this data.

Here is my code but it's not working:

import os

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from absl import app
from tensorflow.keras import layers

from src.main import Parameters


def main(args):
    working_dir = "D:/User/Documents/GitHub/TriaBaseMLBackup/input/fakehdfs/nms/ystr=2016/ymstr=1/ymdstr=26"
    client_id_colname = 'counter'
    SHUFFLE_BUFFER = 1000
    NUM_EPOCHS = 1

    for root, dirs, files in os.walk(working_dir):
        file_list = []

        for filename in files:
            if filename.endswith('.csv'):
                file_list.append(os.path.join(root, filename))
        df_list = []
        for file in file_list:
            df = pd.read_csv(file, delimiter="|", usecols=[1, 2, 6, 7], header=None, na_values=["NIL"],
                             na_filter=True, names=["meas_info", "counter", "value", "time"], index_col='time')
            df_list.append(df[["value"]])

        if df_list:
            rawdata = pd.concat(df_list)

    client_ids = df.get(client_id_colname)
    train_client_ids = client_ids.sample(frac=0.5).tolist()
    test_client_ids = [x for x in client_ids if x not in train_client_ids]

    def create_tf_dataset_for_client_fn(client_id):
        # a function which takes a client_id and returns a
        # tf.data.Dataset for that client
        client_data = df[df['value'] == client_id]
    features = ['meas_info', 'counter']
    LABEL_COLUMN = 'value'
    dataset = tf.data.Dataset.from_tensor_slices(
        (collections.OrderedDict(client_data[features].to_dict('list')),
         client_data[LABEL_COLUMN].to_list())
    )
    global input_spec
    input_spec = dataset.element_spec
    dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
    return dataset

    train_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=train_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )
    test_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=test_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )
    example_dataset = train_data.create_tf_dataset_for_client(
        train_data.client_ids[0]
    )
    # split client id into train and test clients
    loss_builder = tf.keras.losses.SparseCategoricalCrossentropy
    metrics_builder = lambda: [tf.keras.metrics.SparseCategoricalAccuracy()]
    tff_model = tf.keras.Sequential([
        layers.Dense(64),
        layers.Dense(1)
    ])

    def retrieve_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.LSTM(2, input_shape=(1,2), return_sequences=True),
        tf.keras.layers.Dense(256, activation=tf.nn.relu),
        tf.keras.layers.Activation(tf.nn.softmax),
    ])

    return model

    def tff_model_fn() -> tff.learning.Model:
        return tff.learning.from_keras_model(
            keras_model=retrieve_model(),
            input_spec=example_dataset.element_spec,
            loss=loss_builder(),
            metrics=metrics_builder())

    iterative_process = tff.learning.build_federated_averaging_process(
        tff_model_fn, Parameters.server_adam_optimizer_fn, Parameters.client_adam_optimizer_fn)
    server_state = iterative_process.initialize()

    for round_num in range(Parameters.FLAGS.total_rounds):
        sampled_clients = np.random.choice(
            train_data.client_ids,
            size=Parameters.FLAGS.train_clients_per_round,
            replace=False)
        sampled_train_data = [
            train_data.create_tf_dataset_for_client(client)
            for client in sampled_clients
        ]
        server_state, metrics = iterative_process.next(server_state, sampled_train_data)
        train_metrics = metrics['train']
        print(metrics)


if __name__ == '__main__':
    app.run(main)


def start():
    app.run(main)

This is the error that I got but I think my problem is more than this error. what I am doing wrong here ??

ValueError: The top-level structure in `input_spec` must contain exactly two top-level elements, as it must specify type information for both inputs to and predictions from the model. You passed input spec {'meas_info': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'counter': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'value': TensorSpec(shape=(None,), dtype=tf.float32, name=None)}.

enter image description here

thnx to @Zachary Garrett i solve the above error with his help by adding these line of code

 client_data = df[df['value'] == client_id]
        features = ['meas_info', 'counter']
        LABEL_COLUMN = 'value'
        dataset = tf.data.Dataset.from_tensor_slices(
            (collections.OrderedDict(client_data[features].to_dict('list')),
             client_data[LABEL_COLUMN].to_list())
        )
        global input_spec
        input_spec = dataset.element_spec
        dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
        return dataset

My problem now that is throwing in the tff.learning.build_federated_averaging_process is this

ValueError: Layer sequential expects 1 inputs, but it received 2 input tensors. Inputs received: [<tf.Tensor 'batch_input:0' shape=() dtype=float32>, <tf.Tensor 'batch_input_1:0' shape=() dtype=float32>]

what i miss again? maybe something in the layer sequential here

def retrieve_model():
        model = tf.keras.models.Sequential([
            tf.keras.layers.LSTM(2, input_shape=(1,2), return_sequences=True),
            tf.keras.layers.Dense(256, activation=tf.nn.relu),
            tf.keras.layers.Activation(tf.nn.softmax),
        ])

        return model
Panagiotis Drakatos
  • 2,851
  • 4
  • 31
  • 43

1 Answers1

1

The processes in the tff.learning package generally want datasets that yield sequences (tuples or lists) in the form (x, y). x and y can be a single tensor, or a nested structure (dict, list, etc) of tensors.

An easy way to see what the format of the dataset is to print the .element_spec attribute.

From the code above, I suspect the dataset is only yielding a single dict, because of this line:

dataset = tf.data.Dataset.from_tensor_slices(client_data.to_dict('list'))

This doesn't separate the x (features) and y (labels) in the way expected by TFF. Something like the following maybe work:

FEATURE_COLUMNS = [...]
LABEL_COLUMN = '...'
dataset = tf.data.Dataset.from_tensor_slices(
  (client_data[FEATURE_COLUMNS].to_dict('list'),
   client_data[LABEL_COLUMN].to_list())
)
Zachary Garrett
  • 2,911
  • 15
  • 23
  • i edit my question with your answer, also I mentioned you for the help, but i still have one error that don't let me build the federated averaging process. Would you mind taking a quick look at the question again – Panagiotis Drakatos Jan 09 '21 at 11:35
  • @Zachary Garrett i have the same problem can you take a look here https://stackoverflow.com/questions/68412517/build-custom-federated-averaging-process-with-valueerror-layer-sequential-expec – Mixalis Navridis Jul 16 '21 at 19:59
  • Could you mind take a quick also look here a similar problem https://stackoverflow.com/questions/68467540/build-custom-federated-learning-valueerror-input-0-of-layer-lstm-is-incompatibl – Panagiotis Drakatos Jul 21 '21 at 15:51