!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio
import nest_asyncio
nest_asyncio.apply()
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
联邦数据抽象地指代分布在不同设备上的本地数据。
例如,一组温度传感器中的数据认为是一个federated value,有数据类型、数据存放位置:
federated_float_on_clients = tff.type_at_clients(tf.float32)
>>> str(federated_float_on_clients.member)
'float32'
>>> str(federated_float_on_clients.placement)
'CLIENTS'
定义一个federated type由T和G组成,记作{T}@G
。这里由于每个设备中的值不相同,我们需要用花括号将T括起来
>>> str(federated_float_on_clients)
'{float32}@CLIENTS'
all_equal
属性表示它们是否相同,默认为false>>> federated_float_on_clients.all_equal
False
此时,由于每个设备中的值相同,我们就去掉花括号,记作T@G
>>> str(tff.type_at_clients(tf.float32, all_equal=True))
'float32@CLIENTS'
例如一个线性回归模型有a和b两个参数:
simple_regression_model_type = (
tff.StructType([('a', tf.float32), ('b', tf.float32)]))
>>> str(simple_regression_model_type)
'<a=float32,b=float32>@CLIENTS'
注意,这里的`tf.float32`是`tff.TensorType(dtype=tf.float32, shape=[])`的缩写。tff.TensorType方法创建一个TFF中的tensor类型。(?)
str(tff.type_at_clients(
simple_regression_model_type, all_equal=True))
'<a=float32,b=float32>@CLIENTS'
表示所有设备中都有a和b这两个参数,且都相等。
our goal is for TFF to enable writing code that you could deploy for execution on groups of physical devices in a distributed system, potentially including mobile or embedded devices running Android. Each of of those devices would receive a separate set of instructions to execute locally, depending on the role it plays in the system (an end-user device, a centralized coordinator, an intermediate layer in a multi-tier architecture, etc.). It is important to be able to reason about which subsets of devices execute what code, and where different portions of the data might physically materialize.
以上原文说明了数据为什么要搞的这么复杂。TFF代码写好后,是可以生成一整套前后端代码的,此时就会用到数据存放位置这个概念。比如服务器端的参数和设备端的参数,在TFF中写在一起,但分割后就会存放在不同的设备上。
大多数设备不能运行python,因此,TFF不关心操作符,而关心数据。前者因编程语言而异,在不同的编程环境下有不同的实现方式(安卓、ios、web);后者则不变。TFF中很多函数是抽象的,是跨网络、跨设备的,比如broadcast
函数,将参数分发给部分设备。
Within the body of TFF code, by design, there's no way to enumerate the devices that constitute the group represented by tff.CLIENTS, or to probe for the existence of a specific device in the group. There's no concept of a device or client identity anywhere in the Federated Core API, the underlying set of architectural abstractions, or the core runtime infrastructure we provide to support simulations. All the computation logic you write will be expressed as operations on the entire client group.
以上原文说明了所有设备只能被看作整体,无法从中探测到某个具体的设备。因为TFF是以联邦的视角来设计的,如果能针对某一个具体设备来操作,就无法顾全大局。
事实上,联邦数据集足以代表所有设备(如果不考虑设备异构)。
接收federated value,输出federated value。
@tff.federated_computation(tff.type_at_clients(tf.float32))
def get_average_temperature(sensor_readings):
return tff.federated_mean(sensor_readings)
看到这里,你可能有疑问,用tf的现成的方法不是一步就能做出来吗?但是我们这里写的get_average_temperature
不是tf代码,也不是python代码,是一种分布式系统的语言(it's a specification of a distributed system in an internal platform-independent glue language)。
>>> str(get_average_temperature.type_signature)
'({float32}@CLIENTS -> float32@SERVER)'
这个输出说明,此函数的参数是各个设备上的浮点数据,输出服务器上的一个浮点数据。这告诉我们,不应该将一个联邦计算过程想象是在服务器或者某个机器上执行的过程,而应该想,它完成了一个多方协作的任务。
使用python语言即可调用
>>> get_average_temperature([68.5, 70.3, 69.8])
69.53334
在执行以上计算时,你就像一个外部观察者,带着全局视野,完成分布式任务的一步操作。
另外,在函数体内的语句一定会被执行:
@tff.federated_computation(tff.type_at_clients(tf.float32))
def get_average_temperature(sensor_readings):
print ('Getting traced, the argument is "{}".'.format(
type(sensor_readings).__name__))
return tff.federated_mean(sensor_readings)
#以下是输出
Getting traced, the argument is "ValueImpl".
@tff.tf_computation(tf.float32)
def add_half(x):
return tf.add(x, 0.5)
@tff.federated_computation(tff.type_at_clients(tf.float32))
def add_half_on_clients(x):
return tff.federated_map(add_half, x)
add_half_on_clients([1.0, 3.0, 2.0])
#以下是输出
[<tf.Tensor: shape=(), dtype=float32, numpy=1.5>,
<tf.Tensor: shape=(), dtype=float32, numpy=3.5>,
<tf.Tensor: shape=(), dtype=float32, numpy=2.5>]
如下代码会出错,因为tf.constant()函数是在@tff.federated_computation包装外使用的。可以理解为,外部环境是tff环境,包装后是tf环境。
try:
# Eager mode
constant_10 = tf.constant(10.)
@tff.tf_computation(tf.float32)
def add_ten(x):
return x + constant_10
except Exception as err:
print (err)
注意,从tf环境中调用的函数,也还是运行在tf环境中,于是如下代码是正确的:
def get_constant_10():
return tf.constant(10.)
@tff.tf_computation(tf.float32)
def add_ten(x):
return x + get_constant_10()
add_ten(5.0)
#输出 15.0
@tff.tf_computation(tff.SequenceType(tf.float32))
def get_local_temperature_average(local_temperatures):
sum_and_count = (
local_temperatures.reduce((0.0, 0), lambda x, y: (x[0] + y, x[1] + 1)))
return sum_and_count[0] / tf.cast(sum_and_count[1], tf.float32)
get_local_temperature_average([68.5, 70.3, 69.8])
下面实现传感器内数据平均,再在服务器上实现数据平均的功能
@tff.federated_computation(
tff.type_at_clients(tff.SequenceType(tf.float32)))
def get_global_temperature_average(sensor_readings):
return tff.federated_mean(
tff.federated_map(get_local_temperature_average, sensor_readings))
get_global_temperature_average([[68.0, 70.0], [71.0], [68.0, 72.0, 70.0]])
#输出70.0