#!/usr/bin/env python # coding: utf-8 # # SVRGと数値実験 # # この章では、NIPS 2013で発表されたstochastic variance reduced gradient (SVRG)という学習アルゴリズムを実装し、それを線形モデルおよび非線形モデルに適用し、そのパフォーマンスをほかのアルゴリズムと比較しつつ、著名なベンチマークデータを用いて検証することを主な目標とする。 # # 性能評価を行う数値実験は我流ではなく、SVRGを提案した元の論文(*Accelerating Stochastic Gradient Descent using Predictive Variance Reduction*, NIPS 2013, link)での数値実験を模倣するという方針である。 # # __目次__ # # - アルゴリズムの定式化 # # - SVRGの実装 # # - 汎用的なSVRG # # - Chainer向けのSVRG # # - 数値実験の概要 # # - 学習と性能検証 # # - 線形モデル向けの検証(目的関数が凸である) # # - 線形モデル向けの検証(目的関数が非凸である) # # - パフォーマンスの評価 # # ___ # # ## アルゴリズムの定式化 # # 基本的な問題設定をまず述べておく。目的関数は実数値を返す関数の和である。定義域は$\mathbb{R}^d$である。我々の損失関数の表記を前の章から継承させると、以下のようになる。 # # \begin{align*} # \min_{w \in \mathbb{R}^{d}} F(w), \quad F(w) = \frac{1}{n} \sum_{i=1}^{n} L(w;z_{i}). # \end{align*} # # この関数を高速に最小化する方法として、確率的勾配降下法(SGD)が有名であることは前の章で述べた通りである。SGDでは、パラメータ空間を探索しながら、全サンプルを使った場合と同じ方向へと平均的に動くが、コストは圧倒的に安いというメリットがある。 # # 一方、各ステップにおいてSGDがデータセットのごく小さな部分集合しか使わないため、ステップサイズが極限$t \to \infty$において$\alpha_{(t)} \to 0$でなければ収束しない。サブサンプルを取ることで、SGDの各ステップの更新が、GDを毎回別の目的関数に対して実行することに等しい。永遠に相異なる方向へ引っ張られてしまうから収束しないのである。 # # そこで、SVRGを提案した二人の基本的なアイディアは、以下のような折衷案である。 # # - ほとんどすべての更新では、ミニバッチの大きさが$B \ll n$と小さくて、安価である。 # # - ごく一部の更新では、全サンプル($n$個)を使って、正確な勾配ベクトルを計算し、それを以後の更新を安定化させるために何度も再利用する。 # # 簡明な発想で、容易に実装できる強い学習アルゴリズムである。詳細を見ていこう。 # SVRGの更新則は基本的に2つのループから成る。まずは$\tilde{w}_{0}$を初期化する。これを「基準ベクトル」として使う。外側のループでは、バッチの全体を使って、この基準ベクトルでの勾配を計算し、内側のループの演算に使う。具体的な算法として、各$s = 1,2,\ldots$に対して、以下を行う。 # # 0. $\tilde{w} = \tilde{w}_{(s-1)}$ # # 0. $\displaystyle \tilde{g} = \frac{1}{n} \sum_{i=1}^{n} \nabla L(\tilde{w};z_{i})$ # # 0. $w_{(0)} = \tilde{w}$ # # 0. 内側のループでは$w_{(0)}$を$T$回だけ繰り返して更新し、$w_{(T)}$を返す。 # # 0. $\tilde{w}_{s} = w_{(T)}$と置く。 # # もっとも重要な計算は$\tilde{g}$である。最小化したい$F$が$n$個の関数の和であることを考えると、この$\tilde{g}$はまさに$\tilde{w}$での理想の更新方向である。$w_{(0)}$の設定は単に内側のループに先立つ初期化である。次は、内側のループの中身を見ていく。各$t=0,1,\ldots,T$に対して、以下を行う。 # # 0. 無作為に$I_{t} \in \{1,\ldots,n\}$をサブサンプル。 # 0. $\Delta_{t} = \nabla L(\tilde{w};z_{I_{t}}) - \tilde{g}$ # 0. 重みベクトルを更新:$\displaystyle w_{(t+1)} = w_{(t)} - \alpha \left(\nabla L(w_{(t)};z_{I_{t}}) - \Delta_{t} \right)$ # # このアルゴリズムの設計は実に明瞭である。この$\Delta_{t}$は、基準ベクトル$\tilde{w}$に着目し、$n$個のデータ点ではなく、たったの1点を使うことによる「誤差」を次元ごとにまとめたベクトルである。 # 言うまでもなく、このアルゴリズムが良い解に至るためには、いくつかの仮定が必要である。特に重要なのは、基準ベクトルを定期的に更新しておけば、$\tilde{w}$で測った誤差と$w_{(t)}$での本当の誤差がさほど変わらない、ということである。 # # あとは、SGDと同様に、$w_{(t)}$を条件として、無作為に選んでいる$I_{t}$について期待値を取ると、以下のようにバッチ全体を使った場合と一致する。 # # \begin{align*} # \mathbf{E} \left( \nabla L(w_{(t)};z_{I_{t}}) - \Delta_{t} \right) & = \frac{1}{n} \sum_{i=1}^{n} L(w_{(t)};z_{i}) - \mathbf{E} \left( \nabla L(\tilde{w};z_{I_{t}}) - \tilde{g}\right)\\ # & = \frac{1}{n} \sum_{i=1}^{n} L(w_{(t)};z_{i}). # \end{align*} # # ## SVRGの実装 # # ここで、SVRGを2回実装する。最初は、前の章で紹介した方法論に則って、`Algo_SVRG`という完全なる自作で、`Algo_LineSearch`のサブクラスとして汎用的なものである。その後は、Chainerを前提としたニューラルネットワーク向け実装である。 # In[1]: import math import numpy as np import chainer as ch import matplotlib import matplotlib.pyplot as plt import algorithms import models import models_ch import dataclass import helpers as hlp # # ### 汎用的なSVRG # In[2]: class Algo_SVRG(algorithms.Algo_LineSearch): ''' Stochastic variance reduced gradient descent. ''' def __init__(self, w_init, step, batchsize, replace, out_max, in_max, thres, store, lamreg): super(Algo_SVRG, self).__init__(w_init=w_init, step=step, t_max=(out_max*in_max), thres=thres, store=store, lamreg=lamreg) self.out_max = out_max self.in_max = in_max self.batchsize = batchsize self.replace = replace # Computed internally. self.nseen = 0 self.npasses = 0 self.idx_inner = 0 self.torecord = True def newdir(self, model, data): ''' Determine the direction of the update. ''' if self.idx_inner == 0: self.w_snap = np.copy(self.w) self.g_snap = np.mean(model.g_tr(w=self.w_snap, data=data, lamreg=self.lamreg), axis=0, keepdims=True) shufidx = np.random.choice(data.n_tr, size=self.batchsize, replace=self.replace) g_sgd = np.mean(model.g_tr(w=self.w, n_idx=shufidx, data=data, lamreg=self.lamreg), axis=0, keepdims=True) correction = np.mean(model.g_tr(w=self.w_snap, n_idx=shufidx, data=data, lamreg=self.lamreg), axis=0, keepdims=True) - self.g_snap return (-1) * (g_sgd-correction) def monitor_update(self, model, data): ''' Update the counters and convergence monitors used by the algorithm. This is executed once every step. ''' self.t += 1 self.idx_inner += 1 if self.idx_inner == self.in_max: self.idx_inner = 0 # Check differences every "epoch" over data. self.nseen += self.batchsize if self.nseen >= data.n_tr: self.torecord = True self.npasses += 1 self.diff = np.linalg.norm((self.w-self.w_old)) self.w_old = np.copy(self.w) self.nseen = self.nseen % data.n_tr def cost_update(self, model, data): ''' Update the amount of computational resources used by the routine. Cost computation based on number of gradients computed: - Each inner loop step requires mini-batch gradients for two vectors, w and w_snap. - Each outer loop step additionally requires a full batch of gradients. ''' if self.idx_inner == self.in_max: self.stepcost = 2 * self.batchsize + data.n_tr else: self.stepcost = 2 * self.batchsize self.cumcost += self.stepcost # # ### Chainer向けのSVRG # # In[3]: ## Experiment setup. # Data-related. data = dataclass.DataSet() # Initialize one data object; will be re-populated at each trial. n = 500 # sample size d = 2 # number of parameters init_delta = 5.0 # controls support of random initialization num_trials = 250 # number of independent random trials to conduct cov_X = np.eye(d) # covariance matrix of the inputs. w_star = np.ones(d).reshape((d,1)) # vector specifying true model # Algorithm-related. m_idx_todo = [0,1] # let's us manually pick and choose which methods to evaluate. out_max = 5 batchsize = 1 in_max = 50 t_max = out_max*in_max # termination condition; maximum number of iterations. thres = -1.0 # termination condition; if negative, runs for max iterations. def alpha_fixed(t, val): # step-size callback function. return val def make_step(u): def mystep(t, model=None, data=None, newdir=None): return alpha_fixed(t=t, val=u) return mystep alphaval = 0.05 # Clerical. mth_names = ["svrg", "svrg-ch"] num_mths = len(mth_names) mth_colours = ["black", "blue"] # In[4]: # Make choice of additive noise distribution (un-comment your choice). #paras = {"name": "norm", "shift": 0.0, "scale": 20.0} paras = {"name": "lnorm", "meanlog": 0.0, "sdlog": 1.75} # Put together risk function. def risk(w): mean_noise, var_noise = hlp.noise_risk(paras=paras) return hlp.riskMaker(w=w, A=cov_X, b=math.sqrt(var_noise), w_star=w_star) risk_star = risk(w=w_star) # optimal risk value. # In[5]: ## Running the algorithms. # Prepare storage for performance metrics. riskvals = np.zeros((num_trials,t_max,num_mths), dtype=np.float32) loss_tr = np.zeros((num_trials,t_max,num_mths), dtype=np.float32) truedist = np.zeros((num_trials,t_max,num_mths), dtype=np.float32) # Loop over trials. for tri in range(num_trials): # Generate new data (with *centered* noise). X = np.random.normal(loc=0.0, scale=1.0, size=(n,d)) noise = hlp.noise_data(n=n, paras=paras) y = np.dot(X, w_star) + noise data.init_tr(X=X, y=y) # Data for Chainer model. Z = ch.datasets.TupleDataset(np.float32(X), np.float32(y)) # Initial weight settings. w_init = w_star + np.random.uniform(low=-init_delta, high=init_delta, size=d).reshape((d,1)) w_init = np.float32(w_init) # Initialize models (hand-built). mod_learner = models.LinearL2(data=data) risk_star = risk(w=w_star) # optimal risk value. loss_star = np.mean(mod_learner.l_tr(w=w_star, data=data)) # Initialize models (Chainer-based). mod_chainer = models_ch.MyChain(out_l0=d, out_l1=1, init_W=w_init.T, init_b=None, init_delta=init_delta, nobias=True) mod_snap = models_ch.MyChain(out_l0=d, out_l1=1, init_W=w_init.T, init_b=None, init_delta=init_delta, nobias=True) # Initialize algorithms (hand-built). al_svrg = Algo_SVRG(w_init=w_init, step=make_step(alphaval), batchsize=batchsize, replace=False, out_max=out_max, in_max=in_max, thres=thres, store=True, lamreg=None) # Run all algorithms and save their performance. ## ERM-SVRG. mthidx = 0 if mthidx in m_idx_todo: idx = 0 for onestep in al_svrg: al_svrg.update(model=mod_learner, data=data) # Record performance loss_tr[tri,idx,mthidx] = np.mean(mod_learner.l_tr(w=al_svrg.w, data=data))-loss_star riskvals[tri,idx,mthidx] = risk(w=al_svrg.w)-risk_star truedist[tri,idx,mthidx] = np.linalg.norm(w_star-al_svrg.w)-0 idx += 1 ## Replication of ERM-SVRG using Chainer. mthidx = 1 if mthidx in m_idx_todo: idx = 0 t = 0 iter_train = ch.iterators.SerialIterator(dataset=Z, batch_size=batchsize, repeat=True, shuffle=False) while t < t_max: # If condition holds, run "outer loop" update. if t % in_max == 0: # Use the output of "inner loop" as new snapshot. mod_snap = mod_chainer.copy(mode="copy") # Use the full data set here. X_batch, y_batch = ch.dataset.concat_examples(Z) prediction_tr_snap = mod_snap(X_batch) # Compute loss and full-batch gradient. loss_snap = ch.functions.mean_squared_error(prediction_tr_snap, y_batch) / 2.0 mod_snap.cleargrads() loss_snap.backward() # Store the gradient list for use in inner loop. grad_list = [np.copy(p.grad) for p in mod_snap.params()] # Mini-batch computations for inner loop. Z_batch = iter_train.next() X_batch, y_batch = ch.dataset.concat_examples(Z_batch) t += 1 # manage steps ourselves. # Predictions. prediction_tr = mod_chainer(X_batch) prediction_tr_snap = mod_snap(X_batch) # Loss computations (will feed the grad computations). loss = ch.functions.mean_squared_error(prediction_tr, y_batch) / 2.0 loss_snap = ch.functions.mean_squared_error(prediction_tr_snap, y_batch) / 2.0 #old_loss = np.mean(mod_learner.l_tr(w=mod_chainer.l1.W.data.T, data=data)) # Gradient computations. mod_chainer.cleargrads() mod_snap.cleargrads() loss.backward() loss_snap.backward() # Parameter updates. zipped = zip(mod_chainer.params(), mod_snap.params(), grad_list) for p, p_snap, mu in zipped: grad = p.grad grad_snap = p_snap.grad if grad is None: continue else: adjust = grad_snap - mu p.data -= alphaval * (grad-adjust) # Record performance loss_tr[tri,idx,mthidx] = np.mean(mod_learner.l_tr(w=mod_chainer.l1.W.data.T, data=data))-np.mean(mod_learner.l_tr(w=w_star, data=data)) riskvals[tri,idx,mthidx] = risk(w=mod_chainer.l1.W.data.T)-risk_star truedist[tri,idx,mthidx] = np.linalg.norm(w_star-mod_chainer.l1.W.data.T)-0 idx += 1 # Finally, take statistics of the performance metrics over all trials. ave_loss_tr = np.mean(loss_tr, axis=0) ave_riskvals = np.mean(riskvals, axis=0) ave_truedist = np.mean(truedist, axis=0) sd_loss_tr = np.std(loss_tr, axis=0) sd_riskvals = np.std(riskvals, axis=0) sd_truedist = np.std(truedist, axis=0) # __練習問題__ # # 0. `mod_snap`と`mod_chainer`の役割の違いを説明すること。 # # 0. `zipped`に基づくイテレーションでは、何が行われているか説明すること。 # # 0. Chainer向けのSVRGと自作のSVRGがまったく同じ性能を叩き出すと思われるか。その理由も併せて説明すること。 # In[6]: ## Visualization of performance. tvals = np.arange(t_max)+1 # better to start from the first update. # Average over trials. myfig = plt.figure(figsize=(15,5)) ax_loss_tr = myfig.add_subplot(1, 3, 1) for m in m_idx_todo: vals = ave_loss_tr[:,m] err = sd_loss_tr[:,m] ax_loss_tr.plot(tvals, vals, color=mth_colours[m], label=mth_names[m]) #ax_loss_tr.errorbar(tvals, vals, yerr=err, fmt='-o', col=mth_colours[m], label=mth_names[m]) ax_loss_tr.legend(loc=1,ncol=1) plt.axhline(y=0.0, color="black") plt.title("Excess empirical risk") ax_riskvals = myfig.add_subplot(1, 3, 2) for m in m_idx_todo: vals = ave_riskvals[:,m] err = sd_riskvals[:,m] err_log = err / vals ax_riskvals.semilogy(tvals, vals, "-o", color=mth_colours[m], label=mth_names[m]) ax_riskvals.legend(loc=1,ncol=1) plt.axhline(y=0.0, color="black") plt.title("Excess risk") ax_variation = myfig.add_subplot(1, 3, 3) for m in m_idx_todo: vals = sd_riskvals[:,m] ax_variation.plot(tvals, vals, "-o", color=mth_colours[m], label=mth_names[m]) ax_variation.legend(loc=1,ncol=1) plt.axhline(y=0.0, color="black") plt.title("Variation of risk across samples") plt.show() # 嬉しいことに、ほぼ同じ振る舞いを実現していることが確認できた。ランダムに取っているサブサンプルは使うたびに変わるので、まったく同じ動作はほぼ考えられないが、全体的な傾向が一致しており、Chainer版が誠実に自作版を再現していることが窺える。 # ___ # # ## 数値実験の概要 # # この節では、Johnson and Zhang (2013)の数値実験の全体像を眺めてから、重要な実験条件を整理しておくことにする。実際にその数値実験を実装するのは、次の節にする。 # # 彼らの実験では、複数の実世界からのデータセットが使われており、いずれも機械学習の世界で有名なベンチマークデータである。パフォーマンスの評価指標として、主に以下の3つを見ている。 # # - 訓練データでの損失 # - 検証データでの損失 # - 重みベクトルを更新するたびの分散("update variance"と呼んでいる) # # これらの数量は基本的に、__計算コスト__の関数として、そのグラフがプロットされている。彼らがいう計算コストとは、$d$次元勾配ベクトルを算出した累計回数である。代表的な結果を参考として下図で示しておく。 # # Example figures from Johnson and Zhang (NIPS 2013) # Johnsonらの実験では、識別課題のみ扱っている。基本的には、同じデータセットに対して、2種類の実験を行なっている。 # # - 凸: ロジスティック回帰を使う場合を指す。ロス関数が重みベクトルの凸関数である。 # # - 非凸: フィードフォワードニューラルネットワークを使う場合を指す。ネットワークの詳細として、隠れそうが一つだけで、接続はfully-connectedで、活性化関数が何らかのsigmoidである。全クラス分の実数値出力を最後にsoftmax cross-entropyに渡している。ミニバッチの大きさは10と決まっている。モデルが非線形のため、一般には目的関数が重みベクトルの凸関数ではない。 # # 例のupdate varianceの計算について、以下を引用する。 # # > In Fig. 2 (c), we show the variance of SVRG update $-\eta (\nabla \psi_{i}(w)-\psi_{i}(\tilde{w})+\tilde{\mu})$ in comparison with the variance of SGD update $-\eta(\nabla \psi_{i}(w))$ and SDCA. As expected, the variance (including multiplication with the learning rate) of both SVRG and SDCA decreases as optimization proceeds, and the variance of SGD with a fixed learning rate (‘SGD:0.001’) stays high. # # 彼らの記号は我々の記号と異なるので、論文中の定式化の部分を読み、正しい計算を確認すること。 # そのほか、彼らの実験を再現するにあたって必要な情報をまとめている。 # # - 内側ループの反復回数: $2n$(凸)、$5n$(非凸)。 # # - SVRGの初期化: SGDをone epoch(凸)かten epochs(非凸)走らせた結果を初期値とする。 # # - すべての実験では、$\ell_{2}$ノルムによる正則化が行われている。その罰則項の係数はデータセットによる。MNISTでは1e-4、CIFAR-10では1e-3、など(凸も非凸も同じ設定)。 # # - 学習率("learning rates"、つまりステップサイズのこと)はFigureの中に明記されている。 # ___ # # ## 学習と性能検証 # # いよいよ数値実験を実装し、ベンチマークデータを相手に、実行していく。以下のコードでは、SVRGを検証する実験を、凸(多クラスのロジスティック回帰)と非凸(ニューラルネットワーク+softmax)の両ケースで実装している。この原始的なコードを頼りに、下記の__練習問題__に取り組むこと。 # # 0. ステップサイズや正則化の係数、外側と内側の反復回数の上限などは恣意的に決めている。これらを、前節の内容や元論文の中身を踏まえて、論文どおりに正しく設定すること。 # # 0. 非凸のケースでは、SVRGは実装しているが、SGDは実装していない。前の章のChainer基礎を頼りに、非線形モデル向けにSGDを実装すること。また、SGDとSVRGの実験結果を比較すること。 # # 0. 以下のコードでは、非線形モデルのとき、$\ell_{2}$ノルムによる正則化をどのように実装しているか説明すること。 # # 0. 以下のコードでは、非線形モデルのとき、例のupdate varianceがどのように算出されているか説明すること。 # # 0. 以下のコードでは、ステップサイズは固定値である(いわゆるlearning rate schedulingは行っていない)。元論文では、固定値に加えて、2種類のschedulingもカバーされている。元論文を読み、この2種類を突き止めて、SGD+schedulingを実装すること。最終的に目指したいのは、Johnsonらがいう"SGD-best"の実装である。 # In[10]: import os import math import tables import numpy as np import multiprocessing as mltp import algorithms import models import models_ch import dataclass import helpers as hlp # In[11]: # Data set choice. paths = { "cifar10": os.path.join("data", "cifar10", "data.h5"), "mnist": os.path.join("data", "mnist", "data.h5"), "iris": os.path.join("data", "iris", "data.h5") } task_name = "iris" # SET BY HAND. # In[12]: # General-purpose performance evaluation. def perf_fn(model=None, algo=None, data=None): ''' Performance measurement routine. Various metrics are computed, relevant descriptions are given as names, and both are returned together in a dictionary. ''' names = ["Misclass rate (train)", "Misclass rate (test)", "Primal objective (train)", "Primal objective (test)", "Norm (l2)"] # In the case of a dry run, leave perf Null. if ((model is None) or (algo is None)): return {"names": names, "perf": None} # Else, compute the performance metrics of interest. y_est_tr = model.classify(w=algo.w, X=data.X_tr) y_est_te = model.classify(w=algo.w, X=data.X_te) perf_tr = model.class_perf(y_est=y_est_tr, y_true=data.y_tr) perf_te = model.class_perf(y_est=y_est_te, y_true=data.y_te) misclass_tr = perf_tr["rate"] misclass_te = perf_te["rate"] primal_tr = np.mean(model.l_tr(w=algo.w, data=data)) primal_te = np.mean(model.l_te(w=algo.w, data=data)) w_norm = np.linalg.norm(algo.w) perf = np.array([misclass_tr, misclass_te, primal_tr, primal_te, w_norm]) out = {"names": names, "perf": perf} return out # Performance evaluation for Chainer-based implementation. def perf_fn_ch(model=None, data=None): ''' Performance measurement routine; Chainer version. Various metrics are computed, relevant descriptions are given as names, and both are returned together in a dictionary. NOTE: the "model" here contains all parameters. ''' names = ["Misclass rate (train)", "Misclass rate (test)", "Primal objective (train)", "Primal objective (test)", "Norm (l2)"] # In the case of a dry run, leave perf Null. if (model is None or data is None): return {"names": names, "perf": None} # Else, compute the performance metrics of interest. ## Get predictions (not tuple notation). prediction_tr = model(data.X_tr) prediction_te = model(data.X_te) ## Compute misclassification rate. accuracy_tr = ch.functions.accuracy(y=prediction_tr, t=np.int8(data.y_tr).flatten()).data accuracy_te = ch.functions.accuracy(y=prediction_te, t=np.int8(data.y_te).flatten()).data misclass_tr = 1-accuracy_tr misclass_te = 1-accuracy_te ## Compute loss function values. primal_tr = ch.functions.softmax_cross_entropy(x=prediction_tr, t=np.int8(data.y_tr).flatten(), normalize=True, reduce="mean").data primal_te = ch.functions.softmax_cross_entropy(x=prediction_te, t=np.int8(data.y_te).flatten(), normalize=True, reduce="mean").data ## Finally compute the norm. w_norm = 0 for p in model.params(): w_norm += np.linalg.norm(p.data)**2 w_norm = math.sqrt(w_norm) perf = np.array([misclass_tr, misclass_te, primal_tr, primal_te, w_norm]) out = {"names": names, "perf": perf} return out # # ### 線形モデル向けの検証(目的関数が凸である) # # In[13]: # Basically a "driver script" for our hand-built guys. num_trials = 5 # Prepare a results folder, if doesn't already exist. hlp.makedir_safe(os.path.join("results", task_name)) # Establish file connection. toread = paths[task_name] f = tables.open_file(toread, mode="r") # Prepare data object. data = dataclass.DataSet() data.init_tr(X=f.root.train.inputs.read(), y=f.root.train.labels.read()) data.init_te(X=f.root.test.inputs.read(), y=f.root.test.labels.read()) # In[14]: # Prepare model object. mod = models.LogisticReg(data=data) # Initialize all competing hand-built algorithms. n, numfeats = data.X_tr.shape nc = mod.nc d = numfeats * (nc-1) # In[15]: perf_names = perf_fn()["names"] # fire a blank to get metric names. num_metrics = len(perf_names) # number of perf metrics used. # In[16]: # Algorithm preparation. num_epochs = 10 replace = False thres = 1e-03 store = False lamreg = 1e-03 # should adjust depending on data set. def alpha_fixed(t, val): return val def make_step(u): def mystep(t, model=None, data=None, newdir=None): return alpha_fixed(t=t, val=u) return mystep def algoFn_sgd(w_init): al = algorithms.Algo_SGD(w_init=w_init, step=make_step(u=0.01), batchsize=1, replace=replace, t_max=num_epochs*n, thres=thres, store=store, lamreg=lamreg) return al def algoFn_svrg(w_init): al = algorithms.Algo_SVRG(w_init=w_init, step=make_step(u=0.1), batchsize=1, replace=replace, out_max=num_epochs/2, in_max=2*n, thres=thres, store=store, lamreg=lamreg) return al algoFn_dict = {"sgd": algoFn_sgd, "svrg": algoFn_svrg} # In[17]: # Max number of perf records. max_records = num_epochs + 1 # extra one for initial values. # In[18]: # Run the algorithms, and write performance to disk. def worker(algorithm_name): print("At work:", algorithm_name) # Array used for recording cumulative costs. perf_cumcost = np.zeros(max_records, dtype=np.uint32) perf_cumcost.fill(np.nan) # Array used for recording update variance. perf_updatevar = np.zeros(max_records, dtype=np.float32) perf_updatevar.fill(np.nan) # Array used for housing performance over trials. perf_array = np.zeros((max_records, num_metrics, num_trials), dtype=np.float32) perf_array.fill(np.nan) for tri in range(num_trials): # Initialize weights and algorithm here. w_init = np.random.uniform(low=0.0, high=0.1, size=(d,1)) al = algoFn_dict[algorithm_name](w_init=w_init) cntr = 0 for onestep in al: if al.t % 5000 == 0: print("Update: t =", al.t) al.update(model=mod, data=data) # update parameters. if al.torecord: al.torecord = False # switch off. perf_updatevar[cntr] = al.diff perf_array[cntr,:,tri] = perf_fn(model=mod, algo=al, data=data)["perf"] perf_cumcost[cntr] = al.cumcost cntr += 1 # Finally, if cntr isn't already maxed out, be sure # to record performance at the final step. if cntr < max_records: perf_updatevar[cntr] = np.linalg.norm(al.w-al.w_old) perf_array[cntr,:,tri] = perf_fn(model=mod, algo=al, data=data)["perf"] perf_cumcost[cntr] = al.cumcost # Having run over all trials, compute statistics. # note: ndarray shape is (max_records, num_metrics). perf_ave = np.mean(perf_array, axis=2) perf_sd = np.std(perf_array, axis=2) # Trim the unfilled results (as needed). tocheck = np.where(np.isnan(perf_ave[:,0]))[0] if tocheck.size > 0: badidx = tocheck[0] perf_ave = perf_ave[0:badidx,:] tocheck = np.where(np.isnan(perf_sd[:,0]))[0] if tocheck.size > 0: badidx = tocheck[0] perf_sd = perf_sd[0:badidx,:] tocheck = np.where(np.isnan(perf_cumcost))[0] if tocheck.size > 0: badidx = tocheck[0] perf_cumcost = perf_cumcost[0:badidx,:] tocheck = np.where(np.isnan(perf_updatevar))[0] if tocheck.size > 0: badidx = tocheck[0] perf_updatevar = perf_updatevar[0:badidx] # Write to disk. towrite = os.path.join("results", task_name) np.savetxt(fname=os.path.join(towrite, (algorithm_name+".cost")), X=perf_cumcost, fmt="%d", delimiter=",") np.savetxt(fname=os.path.join(towrite, (algorithm_name+".upvar")), X=perf_updatevar, fmt="%.7e", delimiter=",") np.savetxt(fname=os.path.join(towrite, (algorithm_name+".ave")), X=perf_ave, fmt="%.7e", delimiter=",") np.savetxt(fname=os.path.join(towrite, (algorithm_name+".sd")), X=perf_sd, fmt="%.7e", delimiter=",") # In[19]: if __name__ == "__main__": cpu_count = mltp.cpu_count() print("Our machine has", cpu_count, "CPUs.") print("Of these,", len(os.sched_getaffinity(0)), "are available.") # Put all processors to work (at an upper limit). mypool = mltp.Pool(cpu_count) # Pass the "worker" the name of the algorithm to run. mypool.map(func=worker, iterable=algoFn_dict.keys()) # Memory management. mypool.close() # important for stopping memory leaks. mypool.join() # wait for all workers to exit. # END of multiproc procedure. # In[20]: # Close file connection. f.close() # # ### 線形モデル向けの検証(目的関数が非凸である) # # ここでは、線形モデルのときとほぼ同じ段取りではあるが、今回はニューラルネットワークによる非線形モデルで、自作のものに代わって、Chainerで実装したものを使う。 # # 彼らの非線形モデルを`Chain_JZ`というサブクラスで実装する。 # In[21]: class Chain_JZ(ch.Chain): ''' The neural network used by Johnson and Zhang (2013). ''' def __init__(self, out_l0, out_l1, out_l2, init_delta=1.0, nobias=False): super(Chain_JZ, self).__init__() with self.init_scope(): self.l1 = models_ch.Linear(in_size=out_l0, out_size=out_l1, init_W=None, init_b=None, init_delta=init_delta, nobias=True) self.l2 = models_ch.Linear(in_size=out_l1, out_size=out_l2, init_W=None, init_b=None, init_delta=init_delta, nobias=True) def __call__(self, x): out = ch.functions.sigmoid(self.l1(x)) # logistic sigmoid. out = self.l2(out) # return the un-normalized log-probabilities. return out # In[22]: # Basically a "driver script" for our CHAINER-built guys. num_trials = 5 # Prepare a results folder, if doesn't already exist. hlp.makedir_safe(os.path.join("results", task_name)) # Establish file connection. toread = paths[task_name] f = tables.open_file(toread, mode="r") # Prepare data object for Chainer. Z_tr = ch.datasets.TupleDataset(np.float32(f.root.train.inputs.read()), np.int8(f.root.train.labels.read())) data = dataclass.DataSet() data.init_tr(X=f.root.train.inputs.read(), y=f.root.train.labels.read()) data.init_te(X=f.root.test.inputs.read(), y=f.root.test.labels.read()) # In[23]: n, numfeats = data.X_tr.shape nc = len(np.unique(data.y_tr)) d = numfeats * (nc-1) # In[24]: perf_names = perf_fn_ch()["names"] # fire a blank to get metric names. num_metrics = len(perf_names) # number of perf metrics used. # In[25]: # Algorithm preparation. num_epochs = 10 replace = False thres = 1e-03 store = False lamreg = 1e-03 # should adjust depending on data set. alphaval = 0.05 in_max = n out_max = num_epochs t_max = out_max * in_max al_name = "svrg-ch" # In[26]: # Max number of perf records. max_records = num_epochs + 1 # extra one for initial values. # In[27]: # Run the algorithms, and write performance to disk. # Array used for recording cumulative costs. perf_cumcost = np.empty(max_records) perf_cumcost.fill(np.nan) # Array used for recording update variance. perf_updatevar = np.empty(max_records) perf_updatevar.fill(np.nan) # Array used for housing performance over trials. perf_array = np.zeros((max_records, num_metrics, num_trials)) perf_array.fill(np.nan) for tri in range(num_trials): # Initialize models (Chainer-based). mod_svrg = Chain_JZ(out_l0=numfeats, out_l1=100, out_l2=nc, nobias=True) mod_snap = Chain_JZ(out_l0=numfeats, out_l1=100, out_l2=nc, nobias=True) # Execution of SVRG. idx = 0 cntr = 0 cumcost = 0 t = 0 iter_train = ch.iterators.SerialIterator(dataset=Z_tr, batch_size=1, repeat=True, shuffle=True) while t < t_max: # If condition holds, run "outer loop" update. if t % in_max == 0: # Turn on the "to record" flag. torecord = True # Use the output of "inner loop" as new snapshot. mod_snap = mod_svrg.copy(mode="copy") # Use the full data set here. prediction_tr_snap = mod_snap(data.X_tr) # Compute loss and full-batch gradient. loss_snap = ch.functions.softmax_cross_entropy(x=prediction_tr_snap, t=np.int8(data.y_tr).flatten(), normalize=True, reduce="mean") mod_snap.cleargrads() loss_snap.backward() # Store the gradient list for use in inner loop. grad_list = [np.copy(p.grad) for p in mod_snap.params()] # Update cumulative cost. cumcost += data.X_tr.shape[0] # In any case, run typical "inner loop" update. ## Mini-batch computations for inner loop. Z = iter_train.next() X, y = ch.dataset.concat_examples(Z) t += 1 # manage steps ourselves. ## Predictions. prediction_tr = mod_svrg(X) prediction_tr_snap = mod_snap(X) ## Loss computations (will feed the grad computations). loss = ch.functions.softmax_cross_entropy(x=prediction_tr, t=y.flatten(), normalize=True, reduce="mean") loss_snap = ch.functions.softmax_cross_entropy(x=prediction_tr_snap, t=y.flatten(), normalize=True, reduce="mean") ## Gradient computations. mod_svrg.cleargrads() mod_snap.cleargrads() loss.backward() loss_snap.backward() ## Update cumulative cost. cumcost += 2 # Prepare fresh update variance (squared). upvar_sq = 0 ## Parameter updates. zipped = zip(mod_svrg.params(), mod_snap.params(), grad_list) for p, p_snap, mu in zipped: grad = p.grad grad_snap = p_snap.grad if grad is None: continue else: adjust = grad_snap - mu p.data *= (1-2*lamreg*alphaval) # this is l2-squared regularization. p.data -= alphaval * (grad-adjust) upvar_sq += np.linalg.norm(alphaval*(grad-adjust))**2 ## Record performance. if torecord: torecord = False # switch off. perf_updatevar[cntr] = math.sqrt(upvar_sq) perf_array[cntr,:,tri] = perf_fn_ch( model=mod_svrg, data=data )["perf"] perf_cumcost[cntr] = cumcost cntr += 1 # Having run over all trials, compute statistics. # note: ndarray shape is (max_records, num_metrics). perf_ave = np.mean(perf_array, axis=2) perf_sd = np.std(perf_array, axis=2) # Trim the unfilled results (as needed). tocheck = np.where(np.isnan(perf_ave[:,0]))[0] if tocheck.size > 0: badidx = tocheck[0] perf_ave = np.float32(perf_ave[0:badidx,:]) tocheck = np.where(np.isnan(perf_sd[:,0]))[0] if tocheck.size > 0: badidx = tocheck[0] perf_sd = np.float32(perf_sd[0:badidx,:]) tocheck = np.where(np.isnan(perf_cumcost))[0] if tocheck.size > 0: badidx = tocheck[0] perf_cumcost = np.uint32(perf_cumcost[0:badidx]) tocheck = np.where(np.isnan(perf_updatevar))[0] if tocheck.size > 0: badidx = tocheck[0] perf_updatevar = np.float32(perf_updatevar[0:badidx]) # Write to disk. towrite = os.path.join("results", task_name) np.savetxt(fname=os.path.join(towrite, (al_name+".cost")), X=perf_cumcost, fmt="%d", delimiter=",") np.savetxt(fname=os.path.join(towrite, (al_name+".upvar")), X=perf_updatevar, fmt="%.7e", delimiter=",") np.savetxt(fname=os.path.join(towrite, (al_name+".ave")), X=perf_ave, fmt="%.7e", delimiter=",") np.savetxt(fname=os.path.join(towrite, (al_name+".sd")), X=perf_sd, fmt="%.7e", delimiter=",") # In[28]: # Close file connection. f.close() # # ### パフォーマンスの評価 # # 上の2つの節を経て、実験結果がディスクに保存されているはずである。この結果を簡単に可視化すると、傾向が一目瞭然である。 # In[29]: import matplotlib import matplotlib.pyplot as plt import math import numpy as np import os # In[30]: perf_names = perf_fn()["names"] # fire a blank to get metric names. num_metrics = len(perf_names) # number of perf metrics used. # In[31]: # Task name is fixed throughout all the tests below. Make sure we're consistent. task_name = "iris" perf_names = perf_fn()["names"] # fire a blank to get metric names. results_dir = os.path.join("results", task_name) # Parent dir of all results. # A few lines of code to extract all the method names. names_results = os.listdir(results_dir) names_methods = [] for s in names_results: tmp_split = s.split(".")[0:-1] names_methods += [".".join(tmp_split)] names_methods = np.array(names_methods) names_methods = np.unique(names_methods) # Clean out duplicates. print("Names of methods to select from:") print(names_methods) # In[32]: myfig = plt.figure(figsize=(15,10)) ax = myfig.add_subplot(2,3,1) perf_idx = 1 for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".ave")) with open(toread, mode="r", encoding="ascii") as f: toplot_ave = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_ave, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: "+perf_names[perf_idx]) ax = myfig.add_subplot(2,3,2) perf_idx = 3 for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".ave")) with open(toread, mode="r", encoding="ascii") as f: toplot_ave = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_ave, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: "+perf_names[perf_idx]) ax = myfig.add_subplot(2,3,3) perf_idx = None for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".upvar")) with open(toread, mode="r", encoding="ascii") as f: toplot_upvar = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_upvar, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: update variance") ax = myfig.add_subplot(2,3,4) perf_idx = 0 for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".ave")) with open(toread, mode="r", encoding="ascii") as f: toplot_ave = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_ave, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: "+perf_names[perf_idx]) ax = myfig.add_subplot(2,3,5) perf_idx = 2 for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".ave")) with open(toread, mode="r", encoding="ascii") as f: toplot_ave = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_ave, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: "+perf_names[perf_idx]) ax = myfig.add_subplot(2,3,6) perf_idx = 4 for al_name in names_methods: # First get the costs. toread = os.path.join(results_dir, (al_name+".cost")) with open(toread, mode="r", encoding="ascii") as f: toplot_cost = np.loadtxt(fname=f, dtype=np.uint32, delimiter=",") toread = os.path.join(results_dir, (al_name+".ave")) with open(toread, mode="r", encoding="ascii") as f: toplot_ave = np.loadtxt(fname=f, dtype=np.float32, usecols=perf_idx, delimiter=",") ax.plot(toplot_cost, toplot_ave, "o-", label=al_name) ax.legend(loc=1,ncol=1) plt.title("Performance: "+perf_names[perf_idx]) plt.show() # ___ #