My problem is a continue to this question How to create federated dataset from a CSV file?
i manage to load a federated dataset from a given csv file and load both the train and the test data.
My question now is how to reproduce a working example to build an iterative process that performs a custom federated averaging on this data.
Here is my code but it's not working:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from absl import app
from tensorflow.keras import layers
from src.main import Parameters
def main(args):
working_dir = "D:/User/Documents/GitHub/TriaBaseMLBackup/input/fakehdfs/nms/ystr=2016/ymstr=1/ymdstr=26"
client_id_colname = 'counter'
SHUFFLE_BUFFER = 1000
NUM_EPOCHS = 1
for root, dirs, files in os.walk(working_dir):
file_list = []
for filename in files:
if filename.endswith('.csv'):
file_list.append(os.path.join(root, filename))
df_list = []
for file in file_list:
df = pd.read_csv(file, delimiter="|", usecols=[1, 2, 6, 7], header=None, na_values=["NIL"],
na_filter=True, names=["meas_info", "counter", "value", "time"], index_col='time')
df_list.append(df[["value"]])
if df_list:
rawdata = pd.concat(df_list)
client_ids = df.get(client_id_colname)
train_client_ids = client_ids.sample(frac=0.5).tolist()
test_client_ids = [x for x in client_ids if x not in train_client_ids]
def create_tf_dataset_for_client_fn(client_id):
# a function which takes a client_id and returns a
# tf.data.Dataset for that client
client_data = df[df['value'] == client_id]
features = ['meas_info', 'counter']
LABEL_COLUMN = 'value'
dataset = tf.data.Dataset.from_tensor_slices(
(collections.OrderedDict(client_data[features].to_dict('list')),
client_data[LABEL_COLUMN].to_list())
)
global input_spec
input_spec = dataset.element_spec
dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
return dataset
train_data = tff.simulation.ClientData.from_clients_and_fn(
client_ids=train_client_ids,
create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
)
test_data = tff.simulation.ClientData.from_clients_and_fn(
client_ids=test_client_ids,
create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
)
example_dataset = train_data.create_tf_dataset_for_client(
train_data.client_ids[0]
)
# split client id into train and test clients
loss_builder = tf.keras.losses.SparseCategoricalCrossentropy
metrics_builder = lambda: [tf.keras.metrics.SparseCategoricalAccuracy()]
tff_model = tf.keras.Sequential([
layers.Dense(64),
layers.Dense(1)
])
def retrieve_model():
model = tf.keras.models.Sequential([
tf.keras.layers.LSTM(2, input_shape=(1,2), return_sequences=True),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Activation(tf.nn.softmax),
])
return model
def tff_model_fn() -> tff.learning.Model:
return tff.learning.from_keras_model(
keras_model=retrieve_model(),
input_spec=example_dataset.element_spec,
loss=loss_builder(),
metrics=metrics_builder())
iterative_process = tff.learning.build_federated_averaging_process(
tff_model_fn, Parameters.server_adam_optimizer_fn, Parameters.client_adam_optimizer_fn)
server_state = iterative_process.initialize()
for round_num in range(Parameters.FLAGS.total_rounds):
sampled_clients = np.random.choice(
train_data.client_ids,
size=Parameters.FLAGS.train_clients_per_round,
replace=False)
sampled_train_data = [
train_data.create_tf_dataset_for_client(client)
for client in sampled_clients
]
server_state, metrics = iterative_process.next(server_state, sampled_train_data)
train_metrics = metrics['train']
print(metrics)
if __name__ == '__main__':
app.run(main)
def start():
app.run(main)
This is the error that I got but I think my problem is more than this error. what I am doing wrong here ??
ValueError: The top-level structure in `input_spec` must contain exactly two top-level elements, as it must specify type information for both inputs to and predictions from the model. You passed input spec {'meas_info': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'counter': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'value': TensorSpec(shape=(None,), dtype=tf.float32, name=None)}.
thnx to @Zachary Garrett i solve the above error with his help by adding these line of code
client_data = df[df['value'] == client_id]
features = ['meas_info', 'counter']
LABEL_COLUMN = 'value'
dataset = tf.data.Dataset.from_tensor_slices(
(collections.OrderedDict(client_data[features].to_dict('list')),
client_data[LABEL_COLUMN].to_list())
)
global input_spec
input_spec = dataset.element_spec
dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
return dataset
My problem now that is throwing in the tff.learning.build_federated_averaging_process
is this
ValueError: Layer sequential expects 1 inputs, but it received 2 input tensors. Inputs received: [<tf.Tensor 'batch_input:0' shape=() dtype=float32>, <tf.Tensor 'batch_input_1:0' shape=() dtype=float32>]
what i miss again? maybe something in the layer sequential here
def retrieve_model():
model = tf.keras.models.Sequential([
tf.keras.layers.LSTM(2, input_shape=(1,2), return_sequences=True),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Activation(tf.nn.softmax),
])
return model