!pip install --quiet --upgrade tensorflow_federated %load_ext tensorboard import collections import numpy as np import tensorflow as tf import tensorflow_federated as tff tf.compat.v1.enable_v2_behavior() # TODO(b/148678573,b/148685415): must use the ReferenceExecutor because it # supports unbounded references and tff.sequence_* intrinsics. tff.framework.set_default_executor(tff.test.ReferenceExecutor()) @tff.federated_computation def hello_world(): return 'Hello, World!' hello_world() #下载mnist数据集 mnist_train, mnist_test = tf.keras.datasets.mnist.load_data() [(x.dtype, x.shape) for x in mnist_train] NUM_EXAMPLES_PER_USER = 1000 BATCH_SIZE = 200 #BATCHES不为整数时如何处理? BATCHES = int(NUM_EXAMPLES_PER_USER/BATCH_SIZE) #NUM_EXAMPLES_PER_USER/BATCH_SIZE=5,即每个设备有5批数据 #该函数实现了选出source中标签=digit的全部图片并处理 #【FedAVG@C】split 设备总样本 into batches of size BATCH_SIZE def get_data_for_digit(source, digit): output_sequence = [] #记录标签列表中,等于指定digit的元素对应的序号 #例如标签列表是[4,5,6,5,1],指定digit=5,那么all_sample=[1,3] all_samples = [i for i, d in enumerate(source[1]) if d == digit] #以下for循环实现了对all_sample数组中所有指针指向的图片的处理 for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE): #每次按顺序选取BATCH_SIZE张 batch_samples = all_samples[i:i + BATCH_SIZE] output_sequence.append({ 'x': #归一化 np.array([source[0][i].flatten() / 255.0 for i in batch_samples], dtype=np.float32), 'y': np.array([source[1][i] for i in batch_samples], dtype=np.int32) }) #output_sequence是用户数据的列表,而用户数据又是批数据的列表 return output_sequence #根据手写数字1~10的标签来分割为10个数据集,代表了10个设备中的non-IID的数据 federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)] federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)] print(federated_train_data[5][-1]['y']) from matplotlib import pyplot as plt plt.imshow(federated_train_data[5][-1]['x'][-1].reshape(28, 28), cmap='gray') plt.grid(False) plt.show() #定义一个抽象的TFF类型规范 BATCH_SPEC = collections.OrderedDict( x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32), y=tf.TensorSpec(shape=[None], dtype=tf.int32)) BATCH_TYPE = tff.to_type(BATCH_SPEC) str(BATCH_TYPE) MODEL_SPEC = collections.OrderedDict( weights=tf.TensorSpec(shape=[784, 10], dtype=tf.float32), bias=tf.TensorSpec(shape=[10], dtype=tf.float32)) MODEL_TYPE = tff.to_type(MODEL_SPEC) str(MODEL_TYPE) # NOTE: `forward_pass` is defined separately from `batch_loss` so that it can # be later called from within another tf.function. Necessary because a # @tf.function decorated method cannot invoke a @tff.tf_computation. @tf.function def forward_pass(model, batch): #定义一个两层的神经网络(2NN) predicted_y = tf.nn.softmax( tf.matmul(batch['x'], model['weights']) + model['bias']) predictions = tf.cast(tf.argmax(predicted_y, 1), tf.int32) #print(predictions)=[0,...,0] flat_labels = tf.reshape(batch['y'], [-1]) #print(batch['y'])=[5,...,5] loss = -tf.reduce_mean( tf.reduce_sum( tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1])) accuracy = tf.reduce_mean( tf.cast(tf.equal(predictions, batch['y']), tf.float32)) #print(accuracy) return loss, accuracy ''' return -tf.reduce_mean( tf.reduce_sum( tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1])) ''' @tff.tf_computation(MODEL_TYPE, BATCH_TYPE) def batch_loss(model, batch): return forward_pass(model, batch) #【FedAVG@S】initialize w initial_model = collections.OrderedDict( weights=np.zeros([784, 10], dtype=np.float32), bias=np.zeros([10], dtype=np.float32)) #选取标签为5的所有图片作为样例 sample_batch = federated_train_data[5][-1] batch_loss(initial_model, sample_batch) @tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32) def batch_train(initial_model, batch, learning_rate): # Define a group of model variables and set them to `initial_model`. Must # be defined outside the @tf.function. model_vars = collections.OrderedDict([ (name, tf.Variable(name=name, initial_value=value)) for name, value in initial_model.items() ]) optimizer = tf.keras.optimizers.SGD(learning_rate) @tf.function def _train_on_batch(model_vars, batch): # Perform one step of gradient descent using loss from `batch_loss`. with tf.GradientTape() as tape: loss = forward_pass(model_vars, batch) #求出梯度 grads = tape.gradient(loss, model_vars) #反向传播: model_vars=model_vars-learning_rate*grads optimizer.apply_gradients( zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars))) #返回新的梯度到batch_train,再到batch_fn,再到local_train, #再到locally_trained_model,再到local_eval中结合数据来计算损失函数 return model_vars return _train_on_batch(model_vars, batch) #训练5次来测试上述代码 model = initial_model losses = [] for _ in range(5): model = batch_train(model, sample_batch, 0.1) losses.append(batch_loss(model, sample_batch)) print(losses) EPOCH = 10 #将对设备所有数据的训练分割为 对设备中每批数据的训练 LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE) print(LOCAL_DATA_TYPE) @tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE) #对每批样本,依次使用batch_train更新参数,直到覆盖所有样本 def local_train(initial_model, learning_rate, all_batches): @tff.federated_computation(MODEL_TYPE, BATCH_TYPE) def batch_fn(model, batch): return batch_train(model, batch, learning_rate) model = tff.sequence_reduce(all_batches, initial_model, batch_fn) #【FedAVG@C】for each local epoch i from 1 to EPOCH do for _ in range(EPOCH-1): #【FedAVG@C】for batch b∈B do w <- w-learning_rate*g model = tff.sequence_reduce(all_batches, model, batch_fn) #【FedAVG@C】return w to server return model locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5]) @tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE) def local_eval(model, all_batches): #all_batches指标签为5的所有图片,最多1000张,每批BATCH_SIZE张,BATCHES批 # TODO(b/120157713): Replace with `tff.sequence_average()` once implemented. return tff.sequence_sum( #对每批数据求损失以及准确率,再连加 tff.sequence_map( tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE), all_batches)) initial_model_loss,initial_model_accuracy=local_eval(initial_model,federated_train_data[5]) local_model_loss,local_model_accuracy=local_eval(locally_trained_model, federated_train_data[5]) print('initial_model loss ={} ,accuracy={}'.format( initial_model_loss,initial_model_accuracy/BATCHES)) print('locally_trained_model loss ={},accuracy={}'.format( local_model_loss,local_model_accuracy/BATCHES)) #定义联邦类型 #MODEL_TYPE: SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER) #LOCAL_DATA_TYPE: * CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS) @tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE) def federated_eval(model, data): return tff.federated_mean( #让每个客户端执行local_eval方法 tff.federated_map(local_eval, [tff.federated_broadcast(model), data])) print('initial_model loss =', federated_eval(initial_model, federated_train_data)) print('locally_trained_model loss =', federated_eval(locally_trained_model, federated_train_data)) SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER) @tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE, CLIENT_DATA_TYPE) def federated_train(model, learning_rate, data): return tff.federated_mean( tff.federated_map(local_train, [ tff.federated_broadcast(model), tff.federated_broadcast(learning_rate), data ])) logdir = "/tmp/logs/scalars/training/" summary_writer = tf.summary.create_file_writer(logdir) #最初模型的w和b都是0 model = initial_model learning_rate = 0.1 with summary_writer.as_default(): for round_num in range(50): model = federated_train(model, learning_rate, federated_train_data) learning_rate = learning_rate * 0.9 loss,accuracy = federated_eval(model, federated_train_data) print('round {}, loss={},accuarcy={}'.format(round_num, loss,accuracy/BATCHES)) tf.summary.scalar('loss', loss, step=round_num) tf.summary.scalar('accuracy', accuracy/BATCHES, step=round_num) %tensorboard --logdir /tmp/logs/scalars/ --port=0 # Run this this cell to clean your directory of old output for future graphs from this directory. !rm -R /tmp/logs/scalars/* ''' print('initial_model test loss =', federated_eval(initial_model, federated_test_data)) print('trained_model test loss =', federated_eval(model, federated_test_data)) ''' initial_model_loss,initial_model_accuracy=federated_eval(initial_model, federated_test_data) local_model_loss,local_model_accuracy=federated_eval(model, federated_test_data) print('initial_model test loss ={} ,accuracy={}'.format( initial_model_loss,initial_model_accuracy/BATCHES)) print('locally_trained_model test loss ={},accuracy={}'.format( local_model_loss,local_model_accuracy/BATCHES))