本实验根据TensorFlow Federated(TFF)官方教程以及提出了FederatedAveragin(FedAvg)算法的论文《Communication-Efficient Learning of Deep Networks from Decentralized Data》模拟实现了FedAvg算法。
!pip install --quiet --upgrade tensorflow_federated
%load_ext tensorboard
|████████████████████████████████| 460kB 3.5MB/s |████████████████████████████████| 174kB 11.0MB/s |████████████████████████████████| 296kB 16.3MB/s |████████████████████████████████| 92kB 8.4MB/s |████████████████████████████████| 1.0MB 17.8MB/s
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
tf.compat.v1.enable_v2_behavior()
# TODO(b/148678573,b/148685415): must use the ReferenceExecutor because it
# supports unbounded references and tff.sequence_* intrinsics.
tff.framework.set_default_executor(tff.test.ReferenceExecutor())
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
'Hello, World!'
说明:
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE)
标注的代码包含了服务器端和客户端的操作,是一种抽象。在实际开发中应自己实现
#下载mnist数据集
mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
[(x.dtype, x.shape) for x in mnist_train]
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step
[(dtype('uint8'), (60000, 28, 28)), (dtype('uint8'), (60000,))]
对于每个用户的样本,根据BATCH_SIZE将其分割为大小固定的批次。BATCH_SIZE*批数=样本总数。
另外,为了模拟non-IID,用minst数据集中每个数字对应的图片(如标签为5的所有手写图片)模拟了一个设备中的本地数据。
NUM_EXAMPLES_PER_USER = 1000
BATCH_SIZE = 200
#BATCHES不为整数时如何处理?
BATCHES = int(NUM_EXAMPLES_PER_USER/BATCH_SIZE)
#NUM_EXAMPLES_PER_USER/BATCH_SIZE=5,即每个设备有5批数据
#该函数实现了选出source中标签=digit的全部图片并处理
#【FedAVG@C】split 设备总样本 into batches of size BATCH_SIZE
def get_data_for_digit(source, digit):
output_sequence = []
#记录标签列表中,等于指定digit的元素对应的序号
#例如标签列表是[4,5,6,5,1],指定digit=5,那么all_sample=[1,3]
all_samples = [i for i, d in enumerate(source[1]) if d == digit]
#以下for循环实现了对all_sample数组中所有指针指向的图片的处理
for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
#每次按顺序选取BATCH_SIZE张
batch_samples = all_samples[i:i + BATCH_SIZE]
output_sequence.append({
'x':
#归一化
np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
dtype=np.float32),
'y':
np.array([source[1][i] for i in batch_samples], dtype=np.int32)
})
#output_sequence是用户数据的列表,而用户数据又是批数据的列表
return output_sequence
#根据手写数字1~10的标签来分割为10个数据集,代表了10个设备中的non-IID的数据
federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
为了验证上述预处理是否正确,可以查看标签为5(也就是第5个设备)的数据集中的倒数第一批数据,它的x是像素图片,y是标签
print(federated_train_data[5][-1]['y'])
from matplotlib import pyplot as plt
plt.imshow(federated_train_data[5][-1]['x'][-1].reshape(28, 28), cmap='gray')
plt.grid(False)
plt.show()
[5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5]
TFF是强类型的,参数必须定义好类型。由于最后一批数据的大小不一定是BATCH_SIZE,我们将每批的容量设置为None
#定义一个抽象的TFF类型规范
BATCH_SPEC = collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None], dtype=tf.int32))
BATCH_TYPE = tff.to_type(BATCH_SPEC)
str(BATCH_TYPE)
MODEL_SPEC = collections.OrderedDict(
weights=tf.TensorSpec(shape=[784, 10], dtype=tf.float32),
bias=tf.TensorSpec(shape=[10], dtype=tf.float32))
MODEL_TYPE = tff.to_type(MODEL_SPEC)
str(MODEL_TYPE)
'<weights=float32[784,10],bias=float32[10]>'
下面我们开始定义前向传播函数
# NOTE: `forward_pass` is defined separately from `batch_loss` so that it can
# be later called from within another tf.function. Necessary because a
# @tf.function decorated method cannot invoke a @tff.tf_computation.
@tf.function
def forward_pass(model, batch):
#定义一个两层的神经网络(2NN)
predicted_y = tf.nn.softmax(
tf.matmul(batch['x'], model['weights']) + model['bias'])
predictions = tf.cast(tf.argmax(predicted_y, 1), tf.int32)
#print(predictions)=[0,...,0]
flat_labels = tf.reshape(batch['y'], [-1])
#print(batch['y'])=[5,...,5]
loss = -tf.reduce_mean(
tf.reduce_sum(
tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1]))
accuracy = tf.reduce_mean(
tf.cast(tf.equal(predictions, batch['y']), tf.float32))
#print(accuracy)
return loss, accuracy
'''
return -tf.reduce_mean(
tf.reduce_sum(
tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1]))
'''
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE)
def batch_loss(model, batch):
return forward_pass(model, batch)
现在将权重w和偏置值b都设置为0,简单测试下上述代码是否正确,同时初始化initial_model作为后续联邦训练的输入。
#【FedAVG@S】initialize w
initial_model = collections.OrderedDict(
weights=np.zeros([784, 10], dtype=np.float32),
bias=np.zeros([10], dtype=np.float32))
#选取标签为5的所有图片作为样例
sample_batch = federated_train_data[5][-1]
batch_loss(initial_model, sample_batch)
(2.3025854, 0.0)
分析:batch_loss函数也就是forward_pass函数接收了两个参数:模型和样本,模型的w和b在函数内被接收,再加上样本的x,计算出y和损失函数的值2.3025854
。
损失函数是模型(w和b)的函数,为了求w和b为何值时损失函数最小:$$w^*=argmin L(w,b)$$可以使用梯度下降算法。
@tff.tf_computation(MODEL_TYPE, BATCH_TYPE, tf.float32)
def batch_train(initial_model, batch, learning_rate):
# Define a group of model variables and set them to `initial_model`. Must
# be defined outside the @tf.function.
model_vars = collections.OrderedDict([
(name, tf.Variable(name=name, initial_value=value))
for name, value in initial_model.items()
])
optimizer = tf.keras.optimizers.SGD(learning_rate)
@tf.function
def _train_on_batch(model_vars, batch):
# Perform one step of gradient descent using loss from `batch_loss`.
with tf.GradientTape() as tape:
loss = forward_pass(model_vars, batch)
#求出梯度
grads = tape.gradient(loss, model_vars)
#反向传播: model_vars=model_vars-learning_rate*grads
optimizer.apply_gradients(
zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars)))
#返回新的梯度到batch_train,再到batch_fn,再到local_train,
#再到locally_trained_model,再到local_eval中结合数据来计算损失函数
return model_vars
return _train_on_batch(model_vars, batch)
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/resource_variable_ops.py:1666: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers.
#训练5次来测试上述代码
model = initial_model
losses = []
for _ in range(5):
model = batch_train(model, sample_batch, 0.1)
losses.append(batch_loss(model, sample_batch))
print(losses)
[(0.21017574, 1.0), (0.13879976, 1.0), (0.105693415, 1.0), (0.085996725, 1.0), (0.07277652, 1.0)]
以上输出说明在单批数据上,多次梯度下降是有效果的。接下来需要在若干批数据也就是设备中的全部本地数据使用梯度下降。
EPOCH = 10
#将对设备所有数据的训练分割为 对设备中每批数据的训练
LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE)
print(LOCAL_DATA_TYPE)
@tff.federated_computation(MODEL_TYPE, tf.float32, LOCAL_DATA_TYPE)
#对每批样本,依次使用batch_train更新参数,直到覆盖所有样本
def local_train(initial_model, learning_rate, all_batches):
@tff.federated_computation(MODEL_TYPE, BATCH_TYPE)
def batch_fn(model, batch):
return batch_train(model, batch, learning_rate)
model = tff.sequence_reduce(all_batches, initial_model, batch_fn)
#【FedAVG@C】for each local epoch i from 1 to EPOCH do
for _ in range(EPOCH-1):
#【FedAVG@C】for batch b∈B do w <- w-learning_rate*g
model = tff.sequence_reduce(all_batches, model, batch_fn)
#【FedAVG@C】return w to server
return model
<x=float32[?,784],y=int32[?]>*
接下来进行本地训练
locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5])
本地评估的方法是:对所有批次数据,每批执行batch_loss计算损失值、准确率,将其加在一起(不是取平均值)得到本地损失值与准确率。这里TODO注释的意思应该是TFF官方还没有写出来tff.sequence_average()方法,所以只能用sum。那么我后面将accuracy除以批数即可得平均值。
@tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE)
def local_eval(model, all_batches):
#all_batches指标签为5的所有图片,最多1000张,每批BATCH_SIZE张,BATCHES批
# TODO(b/120157713): Replace with `tff.sequence_average()` once implemented.
return tff.sequence_sum(
#对每批数据求损失以及准确率,再连加
tff.sequence_map(
tff.federated_computation(lambda b: batch_loss(model, b), BATCH_TYPE),
all_batches))
initial_model_loss,initial_model_accuracy=local_eval(initial_model,federated_train_data[5])
local_model_loss,local_model_accuracy=local_eval(locally_trained_model, federated_train_data[5])
print('initial_model loss ={} ,accuracy={}'.format(
initial_model_loss,initial_model_accuracy/BATCHES))
print('locally_trained_model loss ={},accuracy={}'.format(
local_model_loss,local_model_accuracy/BATCHES))
initial_model loss =11.512927055358887 ,accuracy=0.0 locally_trained_model loss =0.05232632905244827,accuracy=1.0
分析结果:
说明本地训练的轮次越多,效果越好
federated_train_data[6]
来评估时,可以发现准确率很低,这是因为:最开始我们为了模拟数据的non-IID,假设10个用户每人手写1~10这10个不同的数字。然后我们用第5个用户的数据训练模型,成功识别出了数字5,然而面对数字6则无法识别。所以应在全局视角下使用联邦评估,评估整个系统的模型的损失和准确率。
#定义联邦类型
#MODEL_TYPE: <weights=float32[784,10],bias=float32[10]>
SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER)
#LOCAL_DATA_TYPE: <x=float32[?,784],y=int32[?]>*
CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS)
联邦评估的步骤是:将模型分发给各个设备,让每个设备计算自己所有批数据的损失函数,并回传,再取平均值
@tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE)
def federated_eval(model, data):
return tff.federated_mean(
#让每个客户端执行local_eval方法
tff.federated_map(local_eval, [tff.federated_broadcast(model), data]))
tff.federated_broadcast(model)
指将模型参数分发给参与训练的设备,在本实验中是一种抽象的表示,所以在实际的分布式系统中需要自己实现这个方法。
print('initial_model loss =', federated_eval(initial_model,
federated_train_data))
print('locally_trained_model loss =',
federated_eval(locally_trained_model, federated_train_data))
initial_model loss = <11.512926,0.5> locally_trained_model loss = <35.812584,0.5>
可以看出使用一个设备训练出的模型来评估系统中所有设备中的数据,准确率也不高。接下来我们需要使用联邦训练,让所有设备开始训练。
联邦训练的步骤是,使用federated_map()
方法,服务器将参数和学习率广播到每个参与者的设备上,每个参与者使用local_train()
开始本地训练。本地训练完毕后,会返回训练完的模型(w和b)给服务器,服务器使用federated_mean()
方法取加权平均值,得到了本轮系统训练结束后的模型。
SERVER_FLOAT_TYPE = tff.FederatedType(tf.float32, tff.SERVER)
@tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE, CLIENT_DATA_TYPE)
def federated_train(model, learning_rate, data):
return tff.federated_mean(
tff.federated_map(local_train, [
tff.federated_broadcast(model),
tff.federated_broadcast(learning_rate), data
]))
回顾下本次实验参数的设定值
初始0.1之后每轮*0.9
100
5
10
10(全部,C=1)
200
1000
non-IID
开始进行100轮的联邦训练:
logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
#最初模型的w和b都是0
model = initial_model
learning_rate = 0.1
with summary_writer.as_default():
for round_num in range(50):
model = federated_train(model, learning_rate, federated_train_data)
learning_rate = learning_rate * 0.9
loss,accuracy = federated_eval(model, federated_train_data)
print('round {}, loss={},accuarcy={}'.format(round_num, loss,accuracy/BATCHES))
tf.summary.scalar('loss', loss, step=round_num)
tf.summary.scalar('accuracy', accuracy/BATCHES, step=round_num)
round 0, loss=10.643875122070312,accuarcy=0.6812000274658203 round 1, loss=9.886220932006836,accuarcy=0.7333000183105469 round 2, loss=9.223746299743652,accuarcy=0.7595000267028809 round 3, loss=8.645051956176758,accuarcy=0.7732000827789307 round 4, loss=8.139508247375488,accuarcy=0.7823999881744385 round 5, loss=7.697207450866699,accuarcy=0.7903000831604003 round 6, loss=7.309234619140625,accuarcy=0.79660005569458 round 7, loss=6.967775821685791,accuarcy=0.8027999877929688 round 8, loss=6.666112422943115,accuarcy=0.8083000183105469 round 9, loss=6.398534297943115,accuarcy=0.8115999221801757 round 10, loss=6.160220146179199,accuarcy=0.8152999877929688 round 11, loss=5.947114944458008,accuarcy=0.8187000274658203 round 12, loss=5.755807876586914,accuarcy=0.8229000091552734 round 13, loss=5.583432197570801,accuarcy=0.8253999710083008 round 14, loss=5.427569389343262,accuarcy=0.828700065612793 round 15, loss=5.286177635192871,accuarcy=0.8323999404907226 round 16, loss=5.157526969909668,accuarcy=0.8357000350952148 round 17, loss=5.040145397186279,accuarcy=0.8386999130249023 round 18, loss=4.93277645111084,accuarcy=0.8414999961853027 round 19, loss=4.834345817565918,accuarcy=0.8426000595092773 round 20, loss=4.743927001953125,accuarcy=0.8446001052856446 round 21, loss=4.660722732543945,accuarcy=0.8460999488830566 round 22, loss=4.584041118621826,accuarcy=0.8483999252319336 round 23, loss=4.513279438018799,accuarcy=0.850200080871582 round 24, loss=4.447912693023682,accuarcy=0.851400089263916 round 25, loss=4.387477397918701,accuarcy=0.8524999618530273 round 26, loss=4.331568717956543,accuarcy=0.8539999008178711 round 27, loss=4.279825687408447,accuarcy=0.8550000190734863 round 28, loss=4.231926918029785,accuarcy=0.8563999176025391 round 29, loss=4.187587261199951,accuarcy=0.857699966430664 round 30, loss=4.146548271179199,accuarcy=0.8585999488830567 round 31, loss=4.108576774597168,accuarcy=0.8592000961303711 round 32, loss=4.073460102081299,accuarcy=0.8602999687194824 round 33, loss=4.041004657745361,accuarcy=0.8604000091552735 round 34, loss=4.011031150817871,accuarcy=0.8612000465393066 round 35, loss=3.9833743572235107,accuarcy=0.8616999626159668 round 36, loss=3.9578800201416016,accuarcy=0.8621000289916992 round 37, loss=3.934403896331787,accuarcy=0.862600040435791 round 38, loss=3.9128100872039795,accuarcy=0.8628999710083007 round 39, loss=3.8929710388183594,accuarcy=0.8627999305725098 round 40, loss=3.8747658729553223,accuarcy=0.8628000259399414 round 41, loss=3.858079433441162,accuarcy=0.8625999450683594 round 42, loss=3.8428051471710205,accuarcy=0.8630999565124512 round 43, loss=3.828838348388672,accuarcy=0.8631000518798828 round 44, loss=3.816082715988159,accuarcy=0.8632000923156739 round 45, loss=3.8044466972351074,accuarcy=0.8634000778198242 round 46, loss=3.793842315673828,accuarcy=0.8632000923156739 round 47, loss=3.784188747406006,accuarcy=0.8635999679565429 round 48, loss=3.77540922164917,accuarcy=0.8637999534606934 round 49, loss=3.7674317359924316,accuarcy=0.8637999534606934
%tensorboard --logdir /tmp/logs/scalars/ --port=0
# Run this this cell to clean your directory of old output for future graphs from this directory.
!rm -R /tmp/logs/scalars/*
使用测试集评估
'''
print('initial_model test loss =',
federated_eval(initial_model, federated_test_data))
print('trained_model test loss =', federated_eval(model, federated_test_data))
'''
initial_model_loss,initial_model_accuracy=federated_eval(initial_model, federated_test_data)
local_model_loss,local_model_accuracy=federated_eval(model, federated_test_data)
print('initial_model test loss ={} ,accuracy={}'.format(
initial_model_loss,initial_model_accuracy/BATCHES))
print('locally_trained_model test loss ={},accuracy={}'.format(
local_model_loss,local_model_accuracy/BATCHES))
initial_model test loss = <11.512926,0.5> trained_model test loss = <5.610701,4.1564684>
本次实验是模拟实验,在一个机器上模拟了联邦学习,实现了Federated averaging算法。 需要继续改进的地方: