機械学習のプロトタイプづくり

機械学習の手法を実装するにあたって、アプローチはいくつもあり、その目的に依存する部分が多く、ただ一つの正確があるわけではない。我々の目的としては、機械学習の手法を効率よく「発想」の段階から「動くソフトウェア」の段階まで持っていくことができれば良い。なぜなら、色々な手法を試したり、調整したりする必要があるからである。実装の細かいところに神経を使わないといけないのであれば、バグは間違いなく続出するであろうし、本来の目的である「機械学習の理解」がおろそかにされてしまう可能性が高い。

そのため、簡明でわかりやすく、また多様な手法でも使い勝手の良い実装方法を心がける。ここで紹介するアプローチは、データモデル、そしてアルゴリズムという3つのオブジェクトを中心としている。その基本的な性質は下記の通りである。

  • データ(オブジェクト): 訓練、検証などに使うすべてのデータを格納するクラスオブジェクトである。

  • モデル(オブジェクト): アルゴリズムの状態とデータに依存する関数をメソッドとして持ち、訓練中および訓練後の評価に使うクラスオブジェクトである。

  • アルゴリズム(オブジェクト): 制御対象となるパラメータなどの「状態」を持つイテレータである。データとモデルに依存する関数をメソッドとして持ち、繰り返して更新するごとに呼び出す。

データとモデルを定義してみる

簡単な具体例を拠り所として、基本的なオブジェクトの定義を説明していく。古典的な線形モデル、つまり

\begin{align*} y = \langle w^{\ast}, x \rangle + \varepsilon, \quad \mathbf{E}\varepsilon = 0, \mathbf{E}\varepsilon^2 < \infty, x \in \mathbb{R}^d \end{align*}

を出発点とする。データ$(x_1, y_1),\ldots,(x_n, y_n)$を所与として、未知の$(x,y)$に対して$\langle \widehat{w}, x \rangle \approx y$という近似がなるべく正確であるように、$\widehat{w}$を決めたい。

段取りとして、まず必要なのは、これら$n$個のデータ点を格納すること。DataSetをベースクラスとして以下のように定義する。

In [1]:
import numpy as np
In [2]:
class DataSet:
    '''
    Base class for data objects.
    '''
    
    def __init__(self, paras=None):
        self.X_tr = None
        self.X_te = None
        self.y_tr = None
        self.y_te = None
        self.paras = paras
    
    def init_tr(self, X, y):
        self.X_tr = X
        self.y_tr = y
        self.n_tr = X.shape[0]
        
    def init_te(self, X, y):
        self.X_te = X
        self.y_te = y
        self.n_te = X.shape[0]

今の学習課題は線形モデルを前提とした回帰である。我々の使うモデルオブジェクトとして、Modelと呼ぶベースクラスを作りたいところだが、この「線形モデル」と「回帰」のどこがモデルオブジェクトの範囲内なのか。以下のように決めておこう。

  • Modelのサブクラスは、訓練および検証等の評価に使う各種の損失(ロス)関数(必要に応じてその勾配等の情報)を実装したメソッドを持つとする。

このように決めておくとあとは簡単である。モデルオブジェクトのベースクラスとなるModelを以下のように定義する。

In [3]:
class Model:
    '''
    Base class for model objects.
    '''

    def __init__(self, name=None):
        self.name = name

    def l_imp(self, w=None, X=None, y=None, lamreg=None):
        raise NotImplementedError
    
    def l_tr(self, w, data, n_idx=None, lamreg=None):
        if n_idx is None:
            return self.l_imp(w=w, X=data.X_tr,
                              y=data.y_tr,
                              lamreg=lamreg)
        else:
            return self.l_imp(w=w, X=data.X_tr[n_idx,:],
                              y=data.y_tr[n_idx,:],
                              lamreg=lamreg)
    
    def l_te(self, w, data, n_idx=None, lamreg=None):
        if n_idx is None:
            return self.l_imp(w=w, X=data.X_te,
                              y=data.y_te,
                              lamreg=lamreg)
        else:
            return self.l_imp(w=w, X=data.X_te[n_idx,:],
                              y=data.y_te[n_idx,:],
                              lamreg=lamreg)

    def g_imp(self, w=None, X=None, y=None, lamreg=None):
        raise NotImplementedError
    
    def g_tr(self, w, data, n_idx=None, lamreg=None):
        if n_idx is None:
            return self.g_imp(w=w, X=data.X_tr,
                              y=data.y_tr,
                              lamreg=lamreg)
        else:
            return self.g_imp(w=w, X=data.X_tr[n_idx,:],
                              y=data.y_tr[n_idx,:],
                              lamreg=lamreg)
    
    def g_te(self, w, data, n_idx=None, lamreg=None):
        if n_idx is None:
            return self.g_imp(w=w, X=data.X_te,
                              y=data.y_te,
                              lamreg=lamreg)
        else:
            return self.g_imp(w=w, X=data.X_te[n_idx,:],
                              y=data.y_te[n_idx,:],
                              lamreg=lamreg)

ベースクラスなのでModelでは何も実装していないが、ロス関数にかかわるメソッドはDataSetを前提としたdataと任意のパラメータwを引数とする。

回帰モデルを実装するためには、言うまでもなくロスや勾配の詳細を定めて、それらを実装する必要がある。どのロスを使うかは我々の自由である。典型例として、「最小二乗法」として古くから知られている回帰モデルを使うことにする。この場合、wが線形写像を定めるベクトル$w \in \mathbb{R}^d$に相当する。「最小」と呼ぶのは、最終的にこの誤差が小さくなるように最適化していくことを表わす。式で示すと以下のとおりである。

\begin{align*} \min_{w \in \mathbb{R}^d} \frac{1}{n} \sum_{i=1}^{n} (\langle w, x_i \rangle - y_i)^2 \to \hat{w}. \end{align*}

この最適化を行うにあたって、ロスの取る値そのもの、あるいはそのロス関数の勾配ベクトルを$w$について求める。具体的には、下記を実装する必要がある。

  • l_imp: 任意のペア$(x,y)$に対して、$(y - \langle w, x \rangle)^2 / 2$を計算する。

  • g_imp: 任意のペア$(x,y)$に対して、$(y - \langle w, x \rangle)(-1)x \in \mathbb{R}^d$を計算する。

これほど単純なモデルはほかにないので、実装することも大して苦労しない。LinRegをベースクラスとして、LinearL2をそのサブクラスとして定義する。

In [4]:
class LinReg(Model):
    '''
    General-purpose linear regression model.
    No losses are implemented, just a predict()
    method.
    '''
    
    def __init__(self, data=None, name=None):
        super(LinReg, self).__init__(name=name)
        pass

    def predict(self, w, X):
        '''
        Predict real-valued response.
        w is a (d x 1) array of weights.
        X is a (k x d) matrix of k observations.
        Returns array of shape (k x 1) of predicted values.
        '''
        return X.dot(w)


class LinearL2(LinReg):
    '''
    An orthodox linear regression model
    using the l2 error; typically this
    will be used for the classical least
    squares regression.
    '''
    
    def __init__(self, data=None):
        super(LinearL2, self).__init__(data=data)

        
    def l_imp(self, w, X, y, lamreg=None):
        '''
        Implementation of l2-loss under linear model
        for regression.

        Input:
        w is a (d x 1) matrix of weights.
        X is a (k x numfeat) matrix of k observations.
        y is a (k x 1) matrix of labels in {-1,1}.
        lamreg is a regularization parameter (l2 penalty).

        Output:
        A vector of length k with losses evaluated at k points.
        '''
        if lamreg is None:
            return (y-self.predict(w=w,X=X))**2/2
        else:
            penalty = lamreg * np.linalg.norm(w)**2
            return (y-self.predict(w=w,X=X))**2/2 + penalty
    
    
    def g_imp(self, w, X, y, lamreg=None):
        '''
        Implementation of the gradient of the l2-loss
        under a linear regression model.

        Input:
        w is a (d x 1) matrix of weights.
        X is a (k x numfeat) matrix of k observations.
        y is a (k x 1) matrix of labels in {-1,1}.

        Output:
        A (k x numfeat) matrix of gradients evaluated
        at k points.
        '''
        if lamreg is None:
            return (y-self.predict(w=w,X=X))*(-1)*X
        else:
            penalty = lamreg*2*w.T
            return (y-self.predict(w=w,X=X))*(-1)*X + penalty

上記のオブジェクトがあれば、データと評価の仕組みなどができているので、学習アルゴリズムに渡す情報の準備はできている。あとはアルゴリズムそのもののクラスを用意するだけだが、その前に簡単な使用例を示しておく。

In [5]:
# Prepare simulated data.
d = 5
n_tr = 100
n_te = 100
w_star = np.arange(d).reshape((d,1)) + 1.0

X_tr = np.random.normal(size=(n_tr,d))
noise_tr = np.random.normal(size=(n_tr,1))
y_tr = X_tr.dot(w_star) + noise_tr

X_te = np.random.normal(size=(n_te,d))
noise_te = np.random.normal(size=(n_te,1))
y_te = X_te.dot(w_star) + noise_te

# Prepare data object.
data = DataSet()
data.init_tr(X=X_tr, y=y_tr)
data.init_te(X=X_te, y=y_te)

# Prepare model object.
mod = LinearL2()

# Prepare random initial value, and take closer to solution.
w_init = np.random.uniform(low=-5.0, high=5.0, size=(d,1))
weight_balance = np.linspace(0.0, 1.0, 5)
print("===============")
for wt in weight_balance:
    w_est = wt * w_star + (1-wt) * w_init
    print("l_tr:", np.mean(mod.l_tr(w_est, data=data)),
          "l_te:", np.mean(mod.l_te(w_est, data=data)))
    print("g_tr:\n", np.mean(mod.g_tr(w_est, data=data), axis=0))
    print("g_te:\n", np.mean(mod.g_te(w_est, data=data), axis=0))
    print("===============")
===============
l_tr: 63.67480525704821 l_te: 72.52161001938249
g_tr:
 [-1.1240842  -6.437987   -1.79680855 -7.2479921  -4.13374471]
g_te:
 [ 1.23114245 -6.7552612  -1.8668131  -8.89835267 -4.93590447]
===============
l_tr: 35.945556539267216 l_te: 40.85252548386378
g_tr:
 [-0.84580072 -4.82831557 -1.36736611 -5.42541324 -3.08957585]
g_te:
 [ 0.94982208 -5.06642986 -1.37297361 -6.65188944 -3.6999843 ]
===============
l_tr: 16.167935313729377 l_te: 18.30555145320523
g_tr:
 [-0.56751724 -3.21864415 -0.93792366 -3.60283438 -2.045407  ]
g_te:
 [ 0.66850172 -3.37759852 -0.87913411 -4.40542621 -2.46406413]
===============
l_tr: 4.3419415804346695 l_te: 4.880687927406857
g_tr:
 [-0.28923376 -1.60897273 -0.50848121 -1.78025552 -1.00123814]
g_te:
 [ 0.38718136 -1.68876719 -0.38529462 -2.15896298 -1.22814396]
===============
l_tr: 0.46757533938310303 l_te: 0.5779349064686516
g_tr:
 [-0.01095028  0.00069869 -0.07903877  0.04232334  0.04293072]
g_te:
 [1.05860995e-01 6.41498784e-05 1.08544880e-01 8.75002561e-02
 7.77620457e-03]
===============

明らかなように、パラメータを$w^{\ast}$に近づけていくにつれて、ロスの値が小さくなり、その点での勾配も平坦になっていく。次は学習アルゴリズムについて考える。

自明なアルゴリズムを実装する

まずはきわめて単純なアルゴリズムを作る。特別な働きは何もないが、自身の「状態」と「かかった反復回数」を管理する能力、そして条件に応じて終了する能力は持つ。

In [6]:
class Algo_trivial:
    '''
    An iterator which does nothing special at all, but makes
    a trivial update to the initial parameters given, and
    keeps record of the number of iterations made.
    '''

    def __init__(self, w_init, t_max):
        
        self.w = np.copy(w_init)
        self.t_max = t_max


    def __iter__(self):

        self.t = 0
        print("(__iter__): t =", self.t)

        return self
    

    def __next__(self):
        
        # Condition for stopping.
        if self.t >= self.t_max:
            print("--- Condition reached! ---")
            raise StopIteration

        print("(__next__): t =", self.t)
        self.w += 5
        self.t += 1
        # Note: __next__ does not need to return anything.
        

    def __str__(self):

        out = "State of w:" + "\n" + "  " + str(self.w)
        return out

その振る舞いを見てみよう。

In [7]:
import numpy as np

al = Algo_trivial(w_init=np.array([1,2,3,4,5]), t_max=10)

print("Printing docstring:")
print(al.__doc__) # check the docstring

for onestep in al:
    pass # do nothing special
Printing docstring:

    An iterator which does nothing special at all, but makes
    a trivial update to the initial parameters given, and
    keeps record of the number of iterations made.
    
(__iter__): t = 0
(__next__): t = 0
(__next__): t = 1
(__next__): t = 2
(__next__): t = 3
(__next__): t = 4
(__next__): t = 5
(__next__): t = 6
(__next__): t = 7
(__next__): t = 8
(__next__): t = 9
--- Condition reached! ---

要点:

  • StopIterationという例外を挙げてから、ただちにループから脱出する。
  • 0番目のステップでは、iternextも呼び出される。

上記のforループと同じ働きは手動でもできる。

In [8]:
iter(al)
next(al)
next(al)
next(al)
next(al) # and so on...
(__iter__): t = 0
(__next__): t = 0
(__next__): t = 1
(__next__): t = 2
(__next__): t = 3
In [9]:
al = Algo_trivial(w_init=np.array([1,2,3,4,5]), t_max=10)

for onestep in al:
    print(al) # useful for monitoring state.

print("One last check after exiting:")
print(al) # ensure that no additional changes were made.
(__iter__): t = 0
(__next__): t = 0
State of w:
  [ 6  7  8  9 10]
(__next__): t = 1
State of w:
  [11 12 13 14 15]
(__next__): t = 2
State of w:
  [16 17 18 19 20]
(__next__): t = 3
State of w:
  [21 22 23 24 25]
(__next__): t = 4
State of w:
  [26 27 28 29 30]
(__next__): t = 5
State of w:
  [31 32 33 34 35]
(__next__): t = 6
State of w:
  [36 37 38 39 40]
(__next__): t = 7
State of w:
  [41 42 43 44 45]
(__next__): t = 8
State of w:
  [46 47 48 49 50]
(__next__): t = 9
State of w:
  [51 52 53 54 55]
--- Condition reached! ---
One last check after exiting:
State of w:
  [51 52 53 54 55]

Exercises:

  1. Algo_trivialを反復するごとに、wがどのように更新されているか説明すること。この算法を「自明」と呼んでいるのは、データやモデルたるものには一切依存しない更新則だからである。

  2. 反復するごとに、wの全要素が倍増されるようにAlgo_trivialを改造してみること。

  3. さらに、初期値が$w=(0,1,2,3,4)$で、何回か倍増を繰り返したあとの状態が$w=(0, 16, 32, 48, 64) = (0 \times 2^4, 1 \times 2^4, 2 \times 2^4, 3 \times 2^4, 4 \times 2^4)$となるように、alを初期化する際に使うパラメータを設定すること。

  4. alを初期化しなおすこと無く、forループを複数回走らせると、wの状態がどうなっていくか。

三者を結びつける

先ほどの線形回帰の具体例に戻って、その実装を完成させていこう。有用なアルゴリズムを作るには、上手に観測データに応じたパラメータ更新則が不可欠である。ここで、経験期待損失最小化(empirical risk minimization, ERM)を、最急降下法で実装した学習アルゴリズムにする。

要点は以下のとおりである。

  • データ:入力ベクトルと実数値の応答$x \in \mathbb{R}^{d}$, $y \in \mathbb{R}$.データセットの全体は$\{(x_{1},y_{1}),\ldots,(x_{n},y_{n})\}$とする。

  • モデル:二乗誤差を用いた線形回帰モデル。つまり、$y = \langle w^{\ast}, x\rangle + \varepsilon$と仮定している。ロス関数は$L(w;x,y) = (y - \langle w, x\rangle)^{2}/2$で、勾配が$\nabla L(w;x,y) = -(y-\langle w, x\rangle)x$である。

  • アルゴリズム:最急降下法を使って、固定したステップサイズ$\alpha > 0$を用いる。数式で表わすと、$z_{i}=(x_{i},y_{i})$として以下の通りである。

\begin{align*} w_{(t+1)} \gets w_{(t)} - \alpha \frac{1}{n} \sum_{i=1}^{n}\nabla L(w_{(t)}; z_{i}). \end{align*}

データもモデルもできているので、あとは先ほどのイテレータの準備を踏まえて、アルゴリズムのオブジェクトを定義することである。Algo_SimpleGDと名づけて、以下の通りである。

In [10]:
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
In [11]:
class Algo_SimpleGD:
    '''
    Iterator which implements a line-search steepest descent method,
    using the sample mean estimate of the gradient.
    '''
    
    def __init__(self, w_init, t_max, step, store):
        self.w = w_init
        self.t = None
        self.t_max = t_max
        self.step = step
        self.store = store
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore = np.zeros((self.w.size,t_max+1), dtype=np.float32)
            self.wstore[:,0] = self.w.flatten()
        else:
            self.wstore = None
        
    def __iter__(self):
        self.t = 0
        return self
        
    def __next__(self):
        if self.t >= self.t_max:
            raise StopIteration
        
    def newdir(self, model, data):
        return (-1) * np.mean(model.g_tr(w=self.w, data=data), axis=0, keepdims=True)

    def update(self, model, data):
        
        # Parameter update.
        stepsize = self.step(self.t)
        newdir = self.newdir(model=model, data=data)
        self.w = self.w + stepsize * np.transpose(newdir)
        
        # Monitor update.
        self.t += 1
        
        # Keep record of all updates (optional).
        if self.store:
            self.wstore[:,self.t] = self.w.flatten()

念頭におくべき要点:

  • w_initが初期値の$w_{(0)}$で、アルゴリズムの初期化の際に必要である。

  • newdirは更新方向を返す。

  • stepはコールバック関数で、たとえばステップ数のtに応じてステップサイズを変えるときなどに活用できる。

  • updateはパラメータの更新を実際に行うメソッドである。そのほかに計算資源のコスト等を管理するメソッドを呼び出す。

まずは試しに、乱数を発生させてデータを生成し、自前の学習機を動かしてみる。

In [12]:
# Generate data.
data = DataSet()
n = 100
d = 2
w_star = math.pi * np.ones((d,1), dtype=np.float32)
X = np.random.standard_normal(size=(n*2,d))
epsilon = np.random.standard_t(df=3, size=(n*2,1))
y = X.dot(w_star) + epsilon
data.init_tr(X=X[0:n,:], y=y[0:n,:]) # former half for training
data.init_te(X=X[n:,:], y=y[n:,:]) # latter half for testing

print("X_tr:", data.X_tr.shape, "... sumcheck =", np.sum(data.X_tr))
print("y_tr:", data.y_tr.shape, "... sumcheck =", np.sum(data.y_tr))
print("X_te:", data.X_te.shape, "... sumcheck =", np.sum(data.X_te))
print("y_te:", data.y_te.shape, "... sumcheck =", np.sum(data.y_te))
X_tr: (100, 2) ... sumcheck = -9.94677583241537
y_tr: (100, 1) ... sumcheck = -40.20352170786684
X_te: (100, 2) ... sumcheck = -1.089890767972685
y_te: (100, 1) ... sumcheck = 16.95676631804479
In [13]:
# Initialize model.
mod = LinearL2()
In [14]:
# Initialize learning algorithm.

w_init = np.random.uniform(size=(d,1))

def alpha_fixed(t, val):
    return val

def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep

al = Algo_SimpleGD(w_init=w_init,
                   t_max=15,
                   step=make_step(0.15),
                   store=True)
In [15]:
# Iterate the learning algorithm.
for onestep in al:
    al.update(model=mod, data=data)

print("State of algorithm after completion:")
print("t =", al.t)
print("w =", al.w)
State of algorithm after completion:
t = 15
w = [[2.97596384]
 [2.74678402]]

比較対象として、線形モデルの下で二乗誤差の和を最小にする解が解析的に求まることはわかっているので、それも合わせて計算しておく(w_OLSと書く)。

In [16]:
w_OLS = np.linalg.lstsq(a=data.X_tr, b=data.y_tr, rcond=None)[0]
In [17]:
# Visualize the output (in 2D case).
mypath = al.wstore

if (d == 2):

    init_diff = np.linalg.norm(w_star-w_init) * 1.0
    
    myfig = plt.figure(figsize=(7,7))
    ax = myfig.add_subplot(1,1,1)
    
    ax.quiver(mypath[0,:-1], mypath[1,:-1],
              mypath[0,1:]-mypath[0,:-1],
              mypath[1,1:]-mypath[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    
    ax.plot(*w_star, 'r*', markersize=12, label="w_star") # print true value.
    ax.plot(*w_OLS, 'gv', markersize=8, label="w_OLS") # print OLS estimate.
    ax.plot(*mypath[:,-1], 'bo', markersize=8, label="w") # print our final estimate.
    ax.legend(loc=1,ncol=1)
    plt.title('Trajectory of a simple GD routine')
    plt.xlim((w_star[0]-init_diff, w_star[0]+init_diff))
    plt.ylim((w_star[1]-init_diff, w_star[1]+init_diff))
    plt.show()

練習問題

  1. 学習アルゴリズムの設定を固定した上で、サンプル数やノイズの分布を変えてみること。その結果として、探索の軌道がどう変わるか。

  2. データ分布の設定を固定した上で、アルゴリズムの設定(ステップサイズ、反復回数の上限、初期値)を変えてみること。その結果として、探索の軌道がどう変わるか。

最適化と学習

素朴な疑問として、「学習と最適化の違いとは一体何か」ということが当然、気になる。端的にその違いを述べるならば、パフォーマンスの評価の仕方が違う。つまり、最適化に成功したとしても、学習に失敗すること(過学習)も、最適化に失敗しても学習に成功することもある。

「最適化」をするとき、何らかの制約の下で、関心のある目的関数の返す値がもっとも小さくなるように、その関数に渡すベクトルや集合などを選定する。工学の観点からは、目的関数が未知であれば、最適化を考える意味がない。$f(\cdot)$を最小にしたいときに、候補の$\widehat{x}$があっても、もし$f(\widehat{x})$すら計算できないのであれば、任意の$x$に対して$f(\widehat{x}) \leq f(x)$なのかどうか、知ることは到底できない。もちろん、$f$の取る値を見ずに、その勾配ベクトル$\nabla f$だけで最小化することもしばしばあるが、$f$について確かな情報があることは変わらない。ざっくりいえば、知っている関数を最小にすることが最適化である。

一方で、学習では「推論」が必要である。学習能力を評価する方法はいくらでもあるが、通常の考え方として換言すれば、知らない関数を最小にすることが学習の基本である。具体的に例示してみよう。古くからある統計的決定理論にも登場するのだが、確率モデルを前提とした学習モデルの典型例が「リスク(期待損失)最小化」である。

\begin{align*} \min_{w \in \mathbb{R}^{d}} R(w), \quad R(w) = \mathbf{E}_{Z} L(w;z). \end{align*}

この関数$R$を最小にすれば、学習に成功したといえる、という学習モデルである。したがって、厳密にいえば最適化問題でもあるが、データ$z$の分布は未知なので、期待値は知らない。よって$R$も計算できないし、その勾配も何もかもわからないという状況である。実際、知ることができるのは、$L$だけである。それと観測データ$z_{1},\ldots,z_{n}$を使って、$R$について推し量るしかない。

上記を踏まえて、最適化と学習をはっきりと区別することができる。我々としては、最適化は学習を行う上で必要不可欠な作業であって、学習そのものではない。先ほどの線形回帰の事例では、OLS解(最適化を完璧にこなした結果)が$w^{\ast}$とは一致しないことが一般的である。これは統計的誤差である。一方、学習機が「状態」として更新していく$w_{(t)}$とOLS解との距離は計算誤差である。うまく学習させるには、この2種の誤差を小さくすることが基本である。

事例の深化:リスク関数が二次関数の場合

さらに理解を促すため、簡単なシミュレーションを用いた学習課題を考える。ここで強調したいのは、同じリスク関数でも、データを生成する確率分布の違いによって、学習アルゴリズムの性能が大きく変わりうることである。

仮に、期待損失$R$が単純な二次関数の形式を取るとする。

\begin{align*} R(w) = (w - w^{\ast})^{T}A(w - w^{\ast}) + b^2. \end{align*}

現にあるロス関数$L$をとって、任意の$w \in \mathbb{R}^{d}$に対して$\mathbf{E}_{Z} L(w;z) = R(w)$という条件を満たすには、線形回帰モデルを仮定して、入力と加法ノイズが互いに独立であるならば、十分である。

つまり、$z = (x,y)$と書くと、

\begin{align*} y = \langle w^{\ast}, x \rangle + \varepsilon \end{align*}

というモデルで、$\varepsilon$が$x$と独立であるという仮定である。その結果として$\mathbf{E}\varepsilon x = 0$となる。したがって、

\begin{align*} b^2 & = \mathbf{E}\varepsilon^2\\ A & = \mathbf{E}xx^{T} \end{align*}

というときに$\mathbf{E}_{Z} L(w;z) = R(w)$が成り立つ。これらの仮定は先程の線形回帰とまったく同じである。さらに$\mathbf{E}_{X}x = 0$と置くと$A =\text{cov}\,x$と、入力の共分散行列がリスク関数に姿を表わす。

上記を踏まえて、シミュレーションをする場合のみ、確率分布がわかるので、$L$のみならず$R$もわかるということになる。そこで面白いのは、同じ$R$でも、データの確率分布の違いによって、機械学習の手法の優劣を調べることである。新しいアルゴリズムを開発する上で、大変便利なアプローチである。

さて、$R$を計算するためのコードを前の事例に加えて、$R$の等高線入りで学習機の軌道を表わすことにする。

In [18]:
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
In [19]:
# Data distribution and sample parameters.

n = 500
d = 2
X_sd = 1.0
cov_X = X_sd * np.eye(d) # covariance matrix of the inputs.
w_star = math.pi * np.ones((d,1), dtype=np.float32) # true vec.
noise_dict = {}
risk_dict = {}


def vlnorm(meanlog, sdlog):
    '''
    Variance of the log-Normal distribution.
    '''
    return (math.exp(sdlog**2) - 1) * math.exp((2*meanlog + sdlog**2))

def mlnorm(meanlog, sdlog):
    '''
    Mean of log-Normal distribution.
    '''
    return math.exp((meanlog + sdlog**2/2))


def vnorm(shift, scale):
    '''
    Variance of the Normal distribution.
    '''
    return scale**2

def mnorm(shift, scale):
    '''
    Mean of Normal distribution.
    '''
    return shift


def riskMaker(w, A, b, w_star):
    diff = w - w_star
    quad = diff.T.dot(A.dot(diff).reshape(diff.shape))
    return np.float32(quad) + b**2


# Normal prep.
def risk_norm(w):
    return riskMaker(w=w,
                     A=cov_X,
                     b=math.sqrt(vnorm(shift=0, scale=20.0)),
                     w_star=w_star)
noise_dict["norm"] = np.random.normal(loc=0.0, scale=20.0, size=(n,1))-mnorm(shift=0, scale=20.0)

# log-Normal prep.
def risk_lnorm(w):
    return riskMaker(w=w,
                     A=cov_X,
                     b=math.sqrt(vlnorm(meanlog=0, sdlog=1.75)),
                     w_star=w_star)
noise_dict["lnorm"] = np.random.lognormal(mean=0.0, sigma=1.75, size=(n,1))-mlnorm(meanlog=0, sdlog=1.75)
In [20]:
# Generate the training data.
data_norm = DataSet()
data_lnorm = DataSet()
X = np.random.normal(loc=0.0, scale=X_sd, size=(n,d))
#epsilon = np.random.normal(loc=0.0, scale=epsilon_sd, size=(n,1))
#y = np.dot(X, w_star) + epsilon
data_norm.init_tr(X=X,
                  y=(X.dot(w_star)+noise_dict["norm"]))
data_lnorm.init_tr(X=X,
                   y=(X.dot(w_star)+noise_dict["lnorm"]))
In [21]:
# Initialize model.
mod = LinearL2()

# Initialize learning algorithm.
w_init = np.random.uniform(size=(d,1))
tostore = True
t_max = 15
alphaval = 0.1

def alpha_fixed(t, val):
    return val

def make_step(u):
    def mystep(t):
        return alpha_fixed(t=t, val=u)
    return mystep

al = Algo_SimpleGD(w_init=w_init,
                   t_max=t_max,
                   step=make_step(alphaval),
                   store=tostore)
In [22]:
# First, run it for the Normal case.
for onestep in al:
    al.update(model=mod, data=data_norm)
    
mypath_norm = al.wstore
In [23]:
# Then, re-initialize the algorithm and do it for log-Normal.
al = Algo_SimpleGD(w_init=w_init,
                   t_max=t_max,
                   step=make_step(alphaval),
                   store=tostore)
for onestep in al:
    al.update(model=mod, data=data_lnorm)

mypath_lnorm = al.wstore
In [24]:
# Get the OLS solutions.
w_OLS_norm = np.linalg.lstsq(a=data_norm.X_tr,
                             b=data_norm.y_tr, rcond=None)[0]
w_OLS_lnorm = np.linalg.lstsq(a=data_lnorm.X_tr,
                              b=data_lnorm.y_tr, rcond=None)[0]
In [25]:
print(risk_norm(w_star))
print(risk_lnorm(w_star))
[[400.]]
[[435.76376]]
In [26]:
import matplotlib
import matplotlib.pyplot as plt

# Visualize the output (in 2D case).
mypath = al.wstore

if (d == 2):
    
    myfig = plt.figure(figsize=(7,14))
    
    # Prep for contour lines.
    tmpdel = np.linalg.norm(w_star-w_init) * 1
    xvals = np.arange(w_star[0]-tmpdel,w_star[0]+tmpdel, 0.1)
    yvals = np.arange(w_star[1]-tmpdel,w_star[1]+tmpdel, 0.1)
    X, Y = np.meshgrid(xvals, yvals)
    
    # Trajectory for the normal case.
    def risk2D_helper(w1, w2):
        w2D = np.array([w1,w2]).reshape((2,1))
        return risk_norm(w=w2D)
    
    risk2D = np.vectorize(risk2D_helper)
    Z = risk2D(w1=X, w2=Y)
    ax_norm = myfig.add_subplot(2,1,1)
    CS = ax_norm.contour(X, Y, Z)
    ax_norm.quiver(mypath_norm[0,:-1], mypath_norm[1,:-1],
              mypath_norm[0,1:]-mypath_norm[0,:-1],
              mypath_norm[1,1:]-mypath_norm[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    CS.clabel(inline=1, fontsize=10)
    ax_norm.plot(*w_star, 'r*', markersize=12, label="w_star") # print true value.
    ax_norm.plot(*w_OLS_norm, 'gv', markersize=8, label="w_OLS") # print OLS estimate.
    ax_norm.plot(*mypath_norm[:,-1], 'bo', markersize=8, label="w") # print our final estimate.
    ax_norm.legend(loc=1,ncol=1)
    plt.title('Trajectory of GD routine (Normal noise)')
    
    print("Distance:", np.linalg.norm(mypath_norm[:,-1]-w_star))
    
    # Then log-Normal case.
    def risk2D_helper(w1, w2):
        w2D = np.array([w1,w2]).reshape((2,1))
        return risk_lnorm(w=w2D)
    
    risk2D = np.vectorize(risk2D_helper)
    Z = risk2D(w1=X, w2=Y)

    ax_lnorm = myfig.add_subplot(2,1,2)
    CS = ax_lnorm.contour(X, Y, Z)
    ax_lnorm.quiver(mypath_lnorm[0,:-1], mypath_lnorm[1,:-1],
              mypath_lnorm[0,1:]-mypath_lnorm[0,:-1],
              mypath_lnorm[1,1:]-mypath_lnorm[1,:-1],
              scale_units='xy', angles='xy', scale=1, color='k')
    CS.clabel(inline=1, fontsize=10)
    ax_lnorm.plot(*w_star, 'r*', markersize=12, label="w_star") # print true value.
    ax_lnorm.plot(*w_OLS_lnorm, 'gv', markersize=8, label="w_OLS") # print OLS estimate.
    ax_lnorm.plot(*mypath_lnorm[:,-1], 'bo', markersize=8, label="w") # print our final estimate.
    ax_lnorm.legend(loc=1,ncol=1)
    plt.title('Trajectory of GD routine (log-Normal noise)')
    
    print("Distance:", np.linalg.norm(mypath_lnorm[:,-1]-w_star))
    
    plt.show()
    
    
Distance: 1.3772832
Distance: 2.4935608

このように、ロス関数による目的関数のほかに、潜在する「真の目的関数」があることがわかる(つまり$R$で、riskとして算出)。学習アルゴリズムの本来の目的関数は、前者ではなく、後者である。勾配を使った最急降下法などでは、理想として$\nabla R(w_{(t)})$を知りたいところだが、未知なので$\nabla L(w;z)$のサンプル平均を使って推定している。

練習問題

  1. 前の練習問題のように、真の目的関数の等高線が見えるなかで再びアルゴリズムと確率分布のパラメータを変更してみること。

  2. 上記の問題でパラメータを変更させた結果を、「汎化能力」や「過学習」といった言葉を使って説明すること。