学習アルゴリズムの基本設計

機械学習の技法を実装する方法はいくらでもあるが、ここでは、3つのオブジェクトを中心とした方法を用いる。そのオブジェクトとは、データモデル、そしてアルゴリズムを表わしたものである。その特徴は次の通りである:

  • データ: これはクラスで、訓練および検証用の各種データを格納するために使う。
  • モデル: これもクラスで、データとアルゴリズムを引数とする関数を実装する。これらのメソッドは学習でも事後の評価でも使うような関数。
  • アルゴリズム: これはイテレータで(クラスでも)ある。モデルとデータを使いながら、学習の対象となるパラメータを逐次的に更新していく役割を担う。

この三者を使ったモジュラーな実装方法は、学習機の機能を直感的な形で分けてくれるだけでなく、多数の組み合わせが簡単に作れるため、多種多様の学習課題に対応できる、高い自由度も持つ。

具体例として、たとえば教師あり学習の文脈で線形回帰を実装したいとする。二乗誤差かHuber誤差かで、モデルが異なる。これに対して、勾配降下法というアルゴリズムのオブジェクトは、両方のモデルに使える。このようにしてアルゴリズムを一度実装しておけば、種々のモデル、学習課題への応用が容易にできる。プロトタイピングおよび教育の観点から、かなり有用な方法論であろうと考える。

自明なアルゴリズムを実装する

まずはきわめて単純なアルゴリズムを作る。特別な働きは何もないが、自身の「状態」と「かかった反復回数」を管理する能力、そして条件に応じて終了する能力は持つ。

In [1]:
class Algo_trivial:
    '''
    An iterator which does nothing special at all, but makes
    a trivial update to the initial parameters given, and
    keeps record of the number of iterations made.
    '''
    def __init__(self, w_init, t_max):
        
        self.w = np.copy(w_init)
        self.t_max = t_max


    def __iter__(self):

        self.t = 0
        print("(__iter__): t =", self.t)

        return self
    

    def __next__(self):
        
        # Condition for stopping.
        if self.t >= self.t_max:
            print("--- Condition reached! ---")
            raise StopIteration

        print("(__next__): t =", self.t)
        self.w += 5
        self.t += 1
        # Note: __next__ does not need to return anything.
        

    def __str__(self):

        out = "State of w:" + "\n" + "  " + str(self.w)
        return out

その振る舞いを見てみよう。

In [2]:
import numpy as np

al = Algo_trivial(w_init=np.array([1,2,3,4,5]), t_max=10)

print("Printing docstring:")
print(al.__doc__) # check the docstring

for mystep in al:
    pass # do nothing special
Printing docstring:

    An iterator which does nothing special at all, but makes
    a trivial update to the initial parameters given, and
    keeps record of the number of iterations made.
    
(__iter__): t = 0
(__next__): t = 0
(__next__): t = 1
(__next__): t = 2
(__next__): t = 3
(__next__): t = 4
(__next__): t = 5
(__next__): t = 6
(__next__): t = 7
(__next__): t = 8
(__next__): t = 9
--- Condition reached! ---

要点:

  • StopIterationという例外を挙げてから、ただちにループから脱出する。
  • 0番目のステップでは、iternextも呼び出される。

上記のforループと同じ働きは手動でもできる。

In [3]:
iter(al)
next(al)
next(al)
next(al)
next(al) # and so on...
(__iter__): t = 0
(__next__): t = 0
(__next__): t = 1
(__next__): t = 2
(__next__): t = 3
In [4]:
al = Algo_trivial(w_init=np.array([1,2,3,4,5]), t_max=10)

for mystep in al:
    print(al) # useful for monitoring state.

print("One last check after exiting:")
print(al) # ensure that no additional changes were made.
(__iter__): t = 0
(__next__): t = 0
State of w:
  [ 6  7  8  9 10]
(__next__): t = 1
State of w:
  [11 12 13 14 15]
(__next__): t = 2
State of w:
  [16 17 18 19 20]
(__next__): t = 3
State of w:
  [21 22 23 24 25]
(__next__): t = 4
State of w:
  [26 27 28 29 30]
(__next__): t = 5
State of w:
  [31 32 33 34 35]
(__next__): t = 6
State of w:
  [36 37 38 39 40]
(__next__): t = 7
State of w:
  [41 42 43 44 45]
(__next__): t = 8
State of w:
  [46 47 48 49 50]
(__next__): t = 9
State of w:
  [51 52 53 54 55]
--- Condition reached! ---
One last check after exiting:
State of w:
  [51 52 53 54 55]

Exercises (A):

  1. Algo_trivialを反復するごとに、wがどのように更新されているか説明すること。この算法を「自明」と呼んでいるのは、データやモデルたるものには一切依存しない更新則だからである。

  2. 反復するごとに、wの全要素が倍増されるようにAlgo_trivialを改造してみること。

  3. さらに、初期値が$w=(0,1,2,3,4)$で、何回か倍増を繰り返したあとの状態が$w=(0, 16, 32, 48, 64) = (0 \times 2^4, 1 \times 2^4, 2 \times 2^4, 3 \times 2^4, 4 \times 2^4)$となるように、alを初期化する際に使うパラメータを設定すること。

  4. alを初期化しなおすこと無く、forループを複数回走らせると、wの状態がどうなっていくか。

三者を結びつける

先ほどの事例は単純ではあったが、「学習」は到底できない。学習則を設計するには、データに応じる柔軟性が不可欠である。データとモデルのオブジェクトがここで登場する。線形回帰を例として、基本オブジェクトとなる三者を整えて、完全な学習機を作っていこう。

  • データ: 入力$x \in \mathbb{R}^{d}$と応答$y \in \mathbb{R}$。標本は$\{(x_{1},y_{1}),\ldots,(x_{n},y_{n})\}$である。

  • モデル:自乗誤差を使った線形回帰。つまり、データが$y = \langle w_{\ast}, x\rangle + \epsilon$という形で生成されるとする。ロス関数は$L(w;x,y) = (y - \langle w, x\rangle)^{2}/2$で、その勾配が$\nabla L(w;x,y) = -(y-\langle w, x\rangle)x$である。

  • アルゴリズム:最急降下法を用いる。ステップサイズは$\alpha > 0$。データを$z_{i}=(x_{i},y_{i})$と表記すると、更新式が下記の通りになる。

\begin{align} w_{(t+1)} \gets w_{(t)} - \alpha \frac{1}{n} \sum_{i=1}^{n}\nabla L(w_{(t)}; z_{i}). \end{align}

この学習機の要素は至って単純である。一つずつ実装していく。

In [1]:
# Initial preparation.
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
In [2]:
# First, the data class.

class Data:
    '''
    General data class for supervised regression tasks.
    '''
    
    def __init__(self):
        self.X_tr = None
        self.X_te = None
        self.y_tr = None
        self.y_te = None
    
    def init_tr(self, X, y):
        self.X_tr = X
        self.y_tr = y
        
    def init_te(self, X, y):
        self.X_te = X
        self.y_te = y
    
In [3]:
# Next, the model class.

class LeastSqLinReg:
    '''
    Model for least-squared linear regression.
    Assuming X is (n x d), y is (n x 1), w is (d x 1) numpy array.
    '''
    
    def predict(self, w, X):
        return np.dot(X,w)
    
    # Loss function related.
    def l_imp(self, w, X, y):
        return (y - self.predict(w=w, X=X))**2 / 2
    
    def l_tr(self, w, data):
        return self.l_imp(w=w, X=data.X_tr, y=data.y_tr)
    
    def l_te(self, w, data):
        return self.l_imp(w=w, X=data.X_te, y=data.y_te)
    
    # Gradient related.
    def g_imp(self, w, X, y):
        return (y - self.predict(w=w, X=X)) * (-1) * X
    
    def g_tr(self, w, data):
        return self.g_imp(w=w, X=data.X_tr, y=data.y_tr)
    
    def g_te(self, w, data):
        return self.g_imp(w=w, X=data.X_te, y=data.y_te)
    
In [4]:
# Finally, the algorithm class.

class Algo_GD:
    '''
    Iterator which implements a line-search steepest descent method,
    using the sample mean estimate of the gradient.
    '''
    
    def __init__(self, w_init, t_max, step, store):
        self.w = w_init
        self.t = None
        self.t_max = t_max
        self.step = step
        self.store = store
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore = np.zeros((self.w.size,t_max+1), dtype=np.float32)
            self.wstore[:,0] = self.w.flatten()
        else:
            self.wstore = None
        
    def __iter__(self):
        self.t = 0
        return self
        
    def __next__(self):
        if self.t >= self.t_max:
            raise StopIteration
        
    def newdir(self, model, data):
        return (-1) * np.mean(model.g_tr(w=self.w, data=data), axis=0, keepdims=True)

    def update(self, model, data):
        
        # Parameter update.
        stepsize = self.step(self.t)
        newdir = self.newdir(model=model, data=data)
        self.w = self.w + stepsize * np.transpose(newdir)
        
        # Monitor update.
        self.t += 1
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore[:,self.t] = self.w.flatten()

指定したモデルに従うデータを乱数として発生させて、新しく作ったクラスを働かせていこう。

In [5]:
# Generate data.
data = Data()
n = 500
d = 2
wstar = math.pi * np.ones((d,1), dtype=np.float32)
X = np.random.standard_normal(size=(n*2,d))
epsilon = np.random.standard_t(df=3, size=(n*2,1))
y = np.dot(X,wstar) + epsilon
data.init_tr(X=X[0:n,:], y=y[0:n,:]) # former half for training
data.init_te(X=X[n:,:], y=y[n:,:]) # latter half for testing

print("X_tr:", data.X_tr.shape, "... sumcheck =", np.sum(data.X_tr))
print("y_tr:", data.y_tr.shape, "... sumcheck =", np.sum(data.y_tr))
print("X_te:", data.X_te.shape, "... sumcheck =", np.sum(data.X_te))
print("y_te:", data.y_te.shape, "... sumcheck =", np.sum(data.y_te))
X_tr: (500, 2) ... sumcheck = 13.626594497785826
y_tr: (500, 1) ... sumcheck = 69.82200961978066
X_te: (500, 2) ... sumcheck = 20.84262816352377
y_te: (500, 1) ... sumcheck = 28.099113573721638
In [6]:
# Initialize model.
mod = LeastSqLinReg()
In [7]:
# Initialize learning algorithm.
w_init = np.random.uniform(size=(d,1))
def alpha_fixed(t, val):
    return val
def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep
al = Algo_GD(w_init=w_init,
             t_max=15,
             step=make_step(0.05),
             store=True)
In [8]:
# Iterate the learning algorithm.
for mystep in al:
    al.update(model=mod, data=data)

print("State of algorithm after completion:")
print("t =", al.t)
print("w =", al.w)
State of algorithm after completion:
t = 15
w = [[2.1596699 ]
 [1.92001442]]
In [9]:
# Visualize the output (in 2D case).
mypath = al.wstore

if (d == 2):

    init_diff = np.linalg.norm(wstar-w_init) * 1.0
    
    myfig = plt.figure(figsize=(7,7))
    ax = myfig.add_subplot(1,1,1)
    
    ax.quiver(mypath[0,:-1], mypath[1,:-1],
              mypath[0,1:]-mypath[0,:-1],
              mypath[1,1:]-mypath[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    
    ax.plot(*wstar, 'r*', markersize=12) # print true value.
    ax.plot(*mypath[:,-1], 'bo', markersize=6) # print our final estimate.
    plt.title('Trajectory of a simple GD routine')
    plt.xlim((wstar[0]-init_diff, wstar[0]+init_diff))
    plt.ylim((wstar[1]-init_diff, wstar[1]+init_diff))
    plt.show()

練習問題 (B):

  1. アルゴリズムの設定を一旦固定してから標本数、加法ノイズの分布など、データの生成方法を色々と変えてみること。アルゴリズムのたどる軌跡にどのような影響を及ぼすか。

  2. 今度はデータ分布を固定して、ステップサイズや反復回数、初期値などアルゴリズムのパラメータを色々と変えてみること。パラメータの如何によってアルゴリズムの振る舞いがどう変わっていくか説明すること。

パラメータの影響を調べる

本演習では、parameterの用法はかなりルーズであるが、簡単に大別すると、アルゴリズムを制御するためのパラメータ(例:勾配降下法ではステップサイズ$\alpha$)と、アルゴリズムを実行して反復的に求めていくパラメータがある。前者は設計者が決めるもので、後者は学習の対象そのものである。ベイズ統計の用語を借りるなら、前者とハイパーパラメータと呼んでもよかろう。

さて、ここで種々のパラメータをいじって、学習機の性能にどのような影響を及ぼすか、図示しながら調べていくことにする。

In [97]:
# Block including all the contents of the previous experiment.

# Initialize model.
mod = LeastSqLinReg()

# Initialize learning algorithm.
w_init = 10*np.random.uniform(size=(d,1))
def alpha_fixed(t, val):
    return val
def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep
t_max = 50
step = make_step(0.05)
al = Algo_GD(w_init=w_init,
             t_max=t_max,
             step=step,
             store=True)

# Set up for a loop over trials.
num_trials = 100
n = 50
d = 2
loss_tr = np.zeros((num_trials,t_max+1), dtype=np.float32)
loss_te = np.zeros((num_trials,t_max+1), dtype=np.float32)
truedist = np.zeros((num_trials,t_max+1), dtype=np.float32)
for tri in range(num_trials):
    
    #print("Running trial number", tri)
    data = Data()
    wstar = math.pi * np.ones((d,1), dtype=np.float32)
    X = np.random.standard_normal(size=(n*2,d))
    epsilon = np.random.standard_t(df=3, size=(n*2,1))
    y = np.dot(X,wstar) + epsilon
    data.init_tr(X=X[0:n,:], y=y[0:n,:]) # former half for training
    data.init_te(X=X[n:,:], y=y[n:,:]) # latter half for testing

    # Iterate the learning algorithm.
    idx = 1
    loss_tr[tri,0] = np.mean(mod.l_tr(w=w_init, data=data))
    loss_te[tri,0] = np.mean(mod.l_te(w=w_init, data=data))
    truedist[tri,0] = np.linalg.norm((w_init-wstar))
    for mystep in al:
        al.update(model=mod, data=data)
        # Record performance
        loss_tr[tri,idx] = np.mean(mod.l_tr(w=al.w, data=data))
        loss_te[tri,idx] = np.mean(mod.l_te(w=al.w, data=data))
        truedist[tri,idx] = np.linalg.norm((al.w-wstar))
        idx += 1
        
In [101]:
# Visualize the performance trajectories.

tvals = np.arange((t_max+1))

# Average over trials.
myfig = plt.figure(figsize=(14,7))

ax_tr = myfig.add_subplot(1, 3, 1)
loss_ave = np.mean(loss_tr, axis=0)
loss_sd = np.std(loss_tr, axis=0)
#plt.fill_between(tvals, loss_ave-loss_sd,
#                 loss_ave+loss_sd, color="pink")
ax_tr.semilogy(tvals, loss_ave, "-", color="red")
plt.ylabel("Squared error")
plt.xlabel("Iteration number")
plt.title("Training set performance")

ax_te = myfig.add_subplot(1, 3, 2, sharey=ax_tr)
loss_ave = np.mean(loss_te, axis=0)
loss_sd = np.std(loss_te, axis=0)
#plt.fill_between(tvals, loss_ave-loss_sd,
#                 loss_ave+loss_sd, color="lavender")
ax_te.semilogy(tvals, loss_ave, "-", color="blue")
plt.ylabel("Squared error")
plt.xlabel("Iteration number")
plt.title("Test set performance")

ax_dist = myfig.add_subplot(1, 3, 3)
dist_ave = np.mean(truedist, axis=0)
#dist_sd = np.std(truedist, axis=0)
#plt.fill_between(tvals, dist_ave-dist_sd,
#                 dist_ave+dist_sd, color="gray")
ax_dist.semilogy(tvals, dist_ave, "-", color="black")
plt.ylabel("l2 distance")
plt.xlabel("Iteration number")
plt.title("Distance from true model")

plt.show()

特に調整してみたいのは下記のパラメータである。

  • n: 標本数
  • d: 学習の対象となるパラメータの数
  • step: ステップサイズを決める手段
  • t_max: 最大の反復回数

練習問題 (C):

  1. アルゴリズムの設定を一旦固定してから標本数、加法ノイズの分布など、データの生成方法を色々と変えてみること。アルゴリズムの誤差の軌跡にどのような影響を及ぼすか説明すること。

  2. 同様に、データの設定を固定してからアルゴリズムのパラメータを色々と変えてみること。データの如何によって、「最良」のアルゴリズム設定がどのように変わっていくか。

最適化から学習へ

これまで見てきた内容から明らかなように、アルゴリズム(と呼んでいるオブジェクト)を使って、関数の最小化や最大化を行なう手法は容易に実装できる。その目的関数がモデルオブジェクトのメソッドを呼び出し、学習用のデータに依存するようなものであれば、学習アルゴリズムとして使えそうである。

しかし、ここで一つ注意すべきは、最適化を完璧にこなしたとしても、それが必ずしも高い汎化能力につながるとは限らないという事実である。その理由は簡単である。実装しうる目的関数というのは、真の学習課題においては、あくまで近似である(統計学の用語を使うと推定量である)からである。

この発想をもう少し詰めていくことにする。学習課題を定式化する際、リスク(期待損失)を理想の目的関数とすることが一般的:

\begin{align} \min_{w \in \mathbb{R}^{d}} R(w), \quad R(w) = \mathbf{E}_{Z} L(w;z). \end{align}

実際には、データの分布は未知で、持っているのは損失関数$L$と標本$z_{1},\ldots,z_{n}$だけである。

わかりやすい具体例を一つ取り上げて、この話をさらに深めていこう。$R$が二次関数として表せる場合をまず考える。表記としては、

\begin{align} R(w) = (w - w^{\ast})^{T}A(w - w^{\ast}) + b^2. \end{align}

条件$\mathbf{E}_{Z} L(w;z) = R(w)$が任意の$w \in \mathbb{R}^{d}$に対して成り立つためには、線形回帰モデルの下で自乗誤差をロス関数とすることが十分である。仮定として、加法ノイズが入力とは独立で、平均ゼロ、分散が有限であること。

明示するなら、データ点を$z = (x,y)$として、

\begin{align} y = \langle w^{\ast}, x \rangle + \varepsilon \end{align}

というモデルである。分散を$\mathbf{E}\varepsilon^2 = b^2$と置く。さらに、$\varepsilon$と$x$の独立性を使うと、$\mathbf{E}\varepsilon x = 0$となる。

自乗誤差を展開し、積分しておくと、$\mathbf{E}_{Z} L(w;z) = R(w)$が成り立つことがわかる。そのときの$b$は先程の分散の対数で、$A$は次の通り:

\begin{align} A = \mathbf{E}xx^{T}. \end{align}

もし$\mathbf{E}_{X}x = 0$ならば、この$A$が$A =\text{cov}\,x$、つまり入力の共分散行列そのものである。

このようにして、$L$も$R$わかるような学習課題は数値シミュレーションとして最高の舞台である。なぜなら、データ分布のさまざまな性質を人工的にコントロールすることで、訓練データでの振る舞いと汎化能力が同時に観察できるからである。したがって、どのようなデータに対して、どのようなアルゴリズム(あるいはアルゴリズムの設定)が最適かは、実験的に検証可能になる。

$R$を計算するための関数を用意して、前の実験を拡張していこう。

In [105]:
def riskMaker(w, A, b, wstar):
    diff = w - wstar
    quad = np.dot(np.transpose(diff), np.dot(A,diff).reshape(diff.shape))
    return np.float32(quad) + b**2
In [106]:
# Prepare the risk-related parameters.
epsilon_sd = 2.0
X_sd = 1.0
X_cov = X_sd * np.eye(d)
wstar = math.pi * np.ones((d,1), dtype=np.float32)
In [107]:
# Put together risk function.
def risk(w):
    return riskMaker(w=w, A=X_cov, b=epsilon_sd, wstar=wstar)
In [115]:
# Generate data.
data = Data()
n = 15
d = 2
X = np.random.normal(loc=0.0, scale=X_sd, size=(n,d))
epsilon = np.random.normal(loc=0.0, scale=epsilon_sd, size=(n,1))
y = np.dot(X,wstar) + epsilon
data.init_tr(X=X, y=y) # all for training
In [116]:
# Initialize model.
mod = LeastSqLinReg()
In [117]:
# Initialize learning algorithm.
w_init = np.random.uniform(size=(d,1))
def alpha_fixed(t, val):
    return val
def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep
al = Algo_GD(w_init=w_init,
             t_max=15,
             step=make_step(0.05),
             store=True)
In [118]:
# Iterate the learning algorithm.
for mystep in al:
    al.update(model=mod, data=data)
In [119]:
import matplotlib
import matplotlib.pyplot as plt

# Visualize the output (in 2D case).
mypath = al.wstore

def risk2D_helper(w1, w2):
    w2D = np.array([w1,w2]).reshape((2,1))
    return risk(w=w2D)

risk2D = np.vectorize(risk2D_helper)

if (d == 2):

    tmpdel = np.linalg.norm(wstar-w_init) * 1
    xvals = np.arange(wstar[0]-tmpdel,wstar[0]+tmpdel, 0.1)
    yvals = np.arange(wstar[1]-tmpdel,wstar[1]+tmpdel, 0.1)
    X, Y = np.meshgrid(xvals, yvals)
    Z = risk2D(w1=X, w2=Y)

    myfig = plt.figure(figsize=(7,7))
    ax = myfig.add_subplot(1,1,1)
    CS = ax.contour(X, Y, Z)
    ax.quiver(mypath[0,:-1], mypath[1,:-1],
              mypath[0,1:]-mypath[0,:-1],
              mypath[1,1:]-mypath[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    CS.clabel(inline=1, fontsize=10)
    ax.plot(*wstar, 'r*', markersize=12) # print true value.
    ax.plot(*mypath[:,-1], 'bo', markersize=6) # print our final estimate.
    plt.title('Trajectory of GD routine')
    plt.show()

上記のようにリスク関数の等高線と、そのリスク関数の推定量を最小化するアルゴリズムの軌跡を合わせて可視化することで、機械学習を「近似的な最適化」と捉えることができる。

勾配降下法を使うような状況であれば、理想としてはリスクの勾配ベクトル$\nabla R(w_{(t)})$を知りたいところだが、これは$R$と同じく観測不可能なので、$\nabla L(w;z)$のサンプル平均で近似することになる。

Exercises (D):

  1. 「真の目的関数」の等高線を背景に、前と同様にアルゴリズムとデータのパラメータを色々と変えてみること。

  2. 「過学習」や「汎化能力」の概念に触れながら、アルゴリズムの振る舞いを説明してみること。

更なる事例:差分法を使った勾配降下法

基本的な技法と概念を押さえたので、学習アルゴリズムの理解をさらに深めていくために、勾配降下法の種々の改造を試してみることにする。

正真正銘のGDだと、当然ながら学習の対象となるすべてのパラメータについて偏微分を求める必要がある。自乗誤差ならば勾配は普通求まるのだが、ほかのロス関数では、勾配が定義されていても容易に計算できないケースがある(例:ロス自体が陽に表せないときなど)。

このような状況下では、ロスが十分滑らかであるなら、差分法で勾配の良い近似は簡単にできる。この方法は遅くてもオイラーの頃から知られている。座標軸を一つずつ動かしてみる:

\begin{align} \Delta_{j} & = (0,\ldots,\delta,\ldots,0), \text{ such that} \\ w + \Delta_{j} & = (w_{1},\ldots,(w_{j}+\delta),\ldots,w_{d}), \quad j = 1,\ldots,d \end{align}

このように差分の比を取ることで偏微分の近似が得られる:

\begin{align} \widehat{g}_{i,j}(w) & = \frac{L((w+\Delta_{j});z_{i})-L(w;z_{i})}{\delta} \approx \frac{\partial L(w;z_{i})}{\partial w_{j}}, \quad i=1,\ldots,n \\ \widehat{g}_{i}(w) & = \left(\widehat{g}_{i,1}(w),\ldots,\widehat{g}_{i,d}(w)\right), \quad i=1,\ldots,n. \end{align}

次に実装したいアルゴリズムは、偏微分が解析的に求められない状況下での近似的な勾配降下法である。勾配の代わりに、差分法による近似を使って、通常の更新式に代入するだけである:

\begin{align} w_{(t+1)} = w_{(t)} - \alpha \frac{1}{n}\sum_{i=1}^{n} \widehat{g}_{i}(w_{(t)}). \end{align}

具体的には、下記の通りになる。

In [165]:
class Algo_FDGD:
    '''
    Modification of gradient descent procedure to use
    an approximation of the gradient via finite-diffs.
    '''
    
    def __init__(self, w_init, t_max, step, delta, store):
        self.w = w_init
        self.t = None
        self.t_max = t_max
        self.step = step
        self.store = store
        self.delta = delta
        self.delmtx = np.eye(self.w.size) * delta
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore = np.zeros((self.w.size,t_max+1), dtype=np.float32)
            self.wstore[:,0] = self.w.flatten()
        else:
            self.wstore = None
        
    def __iter__(self):
        self.t = 0
        return self
        
    def __next__(self):
        if self.t >= self.t_max:
            raise StopIteration
        
    def newdir(self, model, data):
        
        out = np.zeros(self.w.shape, dtype=self.w.dtype)
        loss = model.l_tr(w=self.w, data=data)
        
        # Perturb one coordinate at a time, compute finite difference.
        for j in range(self.w.size):
            delj = np.take(self.delmtx,[j],axis=1)
            loss_delta = model.l_tr((self.w + delj), data=data)
            out[j,:] = np.mean(loss_delta-loss) / self.delta
            
        return out * (-1)

    def update(self, model, data):
        
        # Parameter update.
        stepsize = self.step(self.t)
        newdir = self.newdir(model=model, data=data)
        self.w = self.w + stepsize * newdir
        
        # Monitor update.
        self.t += 1
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore[:,self.t] = self.w.flatten()

前とまったく同様な数値実験も行なうことができる。通常の勾配降下法との唯一の違いは、精度を定めるパラメータdeltaを決めないといけないことである(これは引数の「差分」である$\delta$のこと)。

In [155]:
def riskMaker(w, A, b, wstar):
    diff = w - wstar
    quad = np.dot(np.transpose(diff), np.dot(A,diff).reshape(diff.shape))
    return np.float32(quad) + b**2
In [156]:
# Prepare the risk-related parameters.
epsilon_sd = 2.0
X_sd = 1.0
X_cov = X_sd * np.eye(d)
wstar = math.pi * np.ones((d,1), dtype=np.float32)
In [157]:
# Put together risk function.
def risk(w):
    return riskMaker(w=w, A=X_cov, b=epsilon_sd, wstar=wstar)
In [158]:
# Generate data.
data = Data()
n = 15
d = 2
X = np.random.normal(loc=0.0, scale=X_sd, size=(n,d))
epsilon = np.random.normal(loc=0.0, scale=epsilon_sd, size=(n,1))
y = np.dot(X,wstar) + epsilon
data.init_tr(X=X, y=y) # all for training
In [159]:
# Initialize model.
mod = LeastSqLinReg()
In [160]:
# Initialize learning algorithm.
w_init = np.random.uniform(size=(d,1))
def alpha_fixed(t, val):
    return val
def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep
al = Algo_FDGD(w_init=w_init,
               t_max=15,
               step=make_step(0.05),
               delta=0.01,
               store=True)
In [161]:
# Iterate the learning algorithm.
for mystep in al:
    al.update(model=mod, data=data)
In [163]:
import matplotlib
import matplotlib.pyplot as plt

# Visualize the output (in 2D case).
mypath = al.wstore

def risk2D_helper(w1, w2):
    w2D = np.array([w1,w2]).reshape((2,1))
    return risk(w=w2D)

risk2D = np.vectorize(risk2D_helper)

if (d == 2):

    tmpdel = np.linalg.norm(wstar-w_init) * 1
    xvals = np.arange(wstar[0]-tmpdel,wstar[0]+tmpdel, 0.1)
    yvals = np.arange(wstar[1]-tmpdel,wstar[1]+tmpdel, 0.1)
    X, Y = np.meshgrid(xvals, yvals)
    Z = risk2D(w1=X, w2=Y)

    myfig = plt.figure(figsize=(7,7))
    ax = myfig.add_subplot(1,1,1)
    CS = ax.contour(X, Y, Z)
    ax.quiver(mypath[0,:-1], mypath[1,:-1],
              mypath[0,1:]-mypath[0,:-1],
              mypath[1,1:]-mypath[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    CS.clabel(inline=1, fontsize=10)
    ax.plot(*wstar, 'r*', markersize=12) # print true value.
    ax.plot(*mypath[:,-1], 'bo', markersize=6) # print our final estimate.
    plt.title('Trajectory of FDGD routine')
    plt.show()
    

余談にはなるが、上記のアルゴリズムは“forward difference”と呼ばれる差分法の一例である。正の値を足しているからforwardというが、当然、引き算でも問題ないし、点をまたぐような形でも良い。つまり、

\begin{align} \frac{l(w;z_{i})-l(w-\Delta_{j} ;z_{i})}{\delta} \quad \text{and} \quad \frac{l(w+\Delta_{j};z_{i})-l(w-\Delta_{j};z_{i})}{2\delta} \end{align}

という2種類の差分もある($j$番目の偏微分の“backward difference”および“central difference”と呼ぶ)。直感的にもわかるように、後者のほうが断然、精度としては優れている。

練習問題 (E):

  1. 先ほどのAlgo_FDGDを改造し、backward differenceとcentral differenceの差分法を生かした勾配降下法を実装してみること。ほかのアルゴリズムと比較して、軌跡がどう異なるか。収束する速度はどうか。$n$が小さいときの頑健性はどうか。

更なる事例:確率的勾配降下法

学習の対象となるパラメータが$w=(w_{1},\ldots,w_{d})$きわめて数多くある状況について考える。この場合、勾配降下法を使うには、物凄い数の偏微分を計算しないといけないが、データのサンプル数が多くなると計算量に圧迫されて、通常のGDは現実的な手法でなくなる。

この計算コストを大幅に削減する良い方法として、サンプルの全体ではなく、無作為に選ぶサブセット(別名:ミニバッチ)の分だけ勾配を求める確率的な最適化法がある。まずはインデックスをランダムに取る:

\begin{align} \mathcal{I} \subset \{1,2,\ldots,n\} \end{align}

このミニバッチの大きさは$B = |\mathcal{I}| \ll n$となって、その部分集合の勾配を求めて、経験平均ベクトルを通常の更新式に代入する:

\begin{align} w_{(t+1)} = w_{(t)} - \alpha_{(t)} \frac{1}{B}\sum_{i \in \mathcal{I}} \nabla L(w_{(t)};z_{i}). \end{align}

古典的には、$B=1$とするのが主流で、これを確率的勾配降下法(stochastic gradient descent, SGD)と呼ぶ。今では$B>1$のミニバッチが用いられることも多い。一般化すると$i \in \mathcal{I}$が独立かつ一様に$\{1,\ldots,n\}$からサンプルされるとすると、これらのランダム添え字について期待値を取ると:

\begin{align} \mathbf{E} \left( \frac{1}{B}\sum_{i \in \mathcal{I}} \nabla L(w;z_{i}) \right) & = \frac{1}{B}\sum_{i \in \mathcal{I}} \mathbf{E} \nabla L(w;z_{i})\\ & = \frac{1}{B}\sum_{i \in \mathcal{I}}\left(\frac{1}{n} \sum_{j=1}^{n} \nabla L(w;z_{j}) \right)\\ & = \frac{1}{n} \sum_{j=1}^{n} \nabla L(w;z_{j}). \end{align}

つまり、このミニバッチ平均の期待値というのは、全データ点を使った場合の平均ベクトルにほかならない。確実なことはいえないが、コストの小さい反復回数をたくさん投じることで、一回一回のばらつきが平滑化され、最終的には良い解の近辺にたどり着くだろう、という発想である。

実際の動きを自分の手で確かめてみることにしよう。実装するためには、例のnewdirメソッドのたったの数行を追加するだけで済む。

In [166]:
class Algo_SGD:
    '''
    Modification of gradient descent routine, which uses
    randomly selected mini-batches for fast (but noisy)
    iterations.
    '''
    
    def __init__(self, w_init, t_max, step, batchsize, store):
        self.w = w_init
        self.t = None
        self.t_max = t_max
        self.step = step
        self.batchsize = batchsize
        self.store = store
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore = np.zeros((self.w.size,t_max+1), dtype=np.float32)
            self.wstore[:,0] = self.w.flatten()
        else:
            self.wstore = None
        
    def __iter__(self):
        self.t = 0
        return self
        
    def __next__(self):
        if self.t >= self.t_max:
            raise StopIteration
        
    def newdir(self, model, data):
        idx = np.random.choice(a=data.X_tr.shape[0], size=self.batchsize, replace=True)
        data_sub = Data()
        data_sub.init_tr(X=data.X_tr[idx,:],y=data.y_tr[idx,:])
        return (-1) * np.mean(model.g_tr(w=self.w, data=data_sub), axis=0, keepdims=True)
    
    def update(self, model, data):
        
        # Parameter update.
        stepsize = self.step(self.t)
        newdir = self.newdir(model=model, data=data)
        self.w = self.w + stepsize * np.transpose(newdir)
        
        # Monitor update.
        self.t += 1
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore[:,self.t] = self.w.flatten()
In [167]:
def riskMaker(w, A, b, wstar):
    diff = w - wstar
    quad = np.dot(np.transpose(diff), np.dot(A,diff).reshape(diff.shape))
    return np.float32(quad) + b**2
In [168]:
# Prepare the risk-related parameters.
epsilon_sd = 2.0
X_sd = 1.0
X_cov = X_sd * np.eye(d)
wstar = math.pi * np.ones((d,1), dtype=np.float32)
In [169]:
# Put together risk function.
def risk(w):
    return riskMaker(w=w, A=X_cov, b=epsilon_sd, wstar=wstar)
In [170]:
# Generate data.
data = Data()
n = 15
d = 2
X = np.random.normal(loc=0.0, scale=X_sd, size=(n,d))
epsilon = np.random.normal(loc=0.0, scale=epsilon_sd, size=(n,1))
y = np.dot(X,wstar) + epsilon
data.init_tr(X=X, y=y) # all for training
In [171]:
# Initialize model.
mod = LeastSqLinReg()
In [178]:
# Initialize learning algorithm.
w_init = np.random.uniform(size=(d,1))
def alpha_fixed(t, val):
    return val
def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep
al = Algo_SGD(w_init=w_init,
              t_max=15,
              step=make_step(0.05),
              batchsize=1,
              store=True)
In [179]:
# Iterate the learning algorithm.
for mystep in al:
    al.update(model=mod, data=data)
In [181]:
import matplotlib
import matplotlib.pyplot as plt

# Visualize the output (in 2D case).
mypath = al.wstore

def risk2D_helper(w1, w2):
    w2D = np.array([w1,w2]).reshape((2,1))
    return risk(w=w2D)

risk2D = np.vectorize(risk2D_helper)

if (d == 2):

    tmpdel = np.linalg.norm(wstar-w_init) * 1
    xvals = np.arange(wstar[0]-tmpdel,wstar[0]+tmpdel, 0.1)
    yvals = np.arange(wstar[1]-tmpdel,wstar[1]+tmpdel, 0.1)
    X, Y = np.meshgrid(xvals, yvals)
    Z = risk2D(w1=X, w2=Y)

    myfig = plt.figure(figsize=(7,7))
    ax = myfig.add_subplot(1,1,1)
    CS = ax.contour(X, Y, Z)
    ax.quiver(mypath[0,:-1], mypath[1,:-1],
              mypath[0,1:]-mypath[0,:-1],
              mypath[1,1:]-mypath[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    CS.clabel(inline=1, fontsize=10)
    ax.plot(*wstar, 'r*', markersize=12) # print true value.
    ax.plot(*mypath[:,-1], 'bo', markersize=6) # print our final estimate.
    plt.title('Trajectory of SGD routine')
    plt.show()
    

これまで通り、基本的なパラメータと調節することで、アルゴリズムの働きがどう変わるか、その傾向を見極めることで学ぶものが多い。

練習問題 (F):

  1. アルゴリズムの設定を一旦固定してから標本数、加法ノイズの分布など、データの生成方法を色々と変えてみること。アルゴリズムのたどる軌跡にどのような影響を及ぼすか。

  2. 今度はデータ分布を固定して、ステップサイズや反復回数、初期値などアルゴリズムのパラメータを色々と変えてみること。パラメータの如何によってアルゴリズムの振る舞いがどう変わっていくか説明すること。

更なる事例

作成中。

参考文献: