I'm trying to test a compression technique in federated learning with non-IID using this API tff.simulation.datasets.build_single_label_dataset(), following these posts:
- TensorFlow Federated: How to tune non-IIDness in federated dataset?
- AttributeError: 'MapDataset' object has no attribute 'preprocess' in tensorflow_federated tff
But after defining the model and training it, I got this error :
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-16-b04459984716> in <module>()
10
11 train(federated_averaging_process=federated_averaging, num_rounds=10,
---> 12 num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)
<ipython-input-15-7157bce2bb0f> in train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer)
11 # sample the clients parcitipated in this round.
12 sampled_clients = np.random.choice(
---> 13 fed_emnist_train.client_ids,
14 size=num_clients_per_round,
15 replace=False)
AttributeError: 'MapDataset' object has no attribute 'client_ids'
The code:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=False)
# for non-IID we use this API tff.simulation.datasets.build_single_label_dataset()
fed_emnist_train = tff.simulation.datasets.build_single_label_dataset(
emnist_train.create_tf_dataset_from_all_clients(),
label_key='label', desired_label=1)
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
return (dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
.repeat(CLIENT_EPOCHS_PER_ROUND)
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
.map(reshape_emnist_element))
fed_emnist_train = preprocess_train_dataset(fed_emnist_train)
# for unbalanced dataset
import random
NUM_CLIENTS = 100
client_datasets = [
fed_emnist_train.take(random.randint(1, CLIENT_BATCH_SIZE))
for _ in range(NUM_CLIENTS)
]
# defining a model
def create_original_fedavg_cnn_model(only_digits=False):
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
input_spec = client_datasets[0].element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
# training the model
federated_averaging = tff.learning.build_federated_averaging_process(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
# utility function
def format_size(size):
size = float(size)
for unit in ['bit','Kibit','Mibit','Gibit']:
if size < 1024.0:
return "{size:3.2f}{unit}".format(size=size, unit=unit)
size /= 1024.0
return "{size:.2f}{unit}".format(size=size, unit='TiB')
def set_sizing_environment():
sizing_factory = tff.framework.sizing_executor_factory()
context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
tff.framework.set_default_context(context)
return sizing_factory
# trains the federated averaging process and output metrics
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
# create a environment to get communication cost
environment = set_sizing_environment()
# initialize the FedAvg algorithm to get the initial server state
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
fed_emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# create a list of `tf.Dataset` instances from the data of sampled clients
sampled_train_data = [
fed_emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
state, metrics = federated_averaging_process.next(state, sampled_train_data)
size_info = environment.get_size_info()
broadcasted_bits = size_info.broadcast_bits[-1]
aggregated_bits = size_info.aggregate_bits[-1]
print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))
# add metrics to Tensorboard
for name, value in metrics['train'].items():
tf.summary.scalar(name, value, step=round_num)
tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
summary_writer.flush()
# first, clean the log directory to avoid conflicts
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass
# set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-16-b04459984716> in <module>()
10
11 train(federated_averaging_process=federated_averaging, num_rounds=10,
---> 12 num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)
<ipython-input-15-7157bce2bb0f> in train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer)
11 # sample the clients parcitipated in this round.
12 sampled_clients = np.random.choice(
---> 13 fed_emnist_train.client_ids,
14 size=num_clients_per_round,
15 replace=False)
AttributeError: 'MapDataset' object has no attribute 'client_ids'
What does that mean? Appreciate any help!