DeepLerning らしいDeepLerning実装
ここに、Dropout 層とか色々積んでみたというもの。
尚、この記事初めて見た人は突然何のことやらだと思うので、この辺の 2021/2 月あたりから継続してるのでその辺見ると良いかも?
やってみてわかったことは、大量のデータセットでいきなり学習させるとかではなくて、そこそこ小さなデータセットで 1 層づつ増やしていった方が無難だと思った。
いきなり層を増やしてエラー起こすと、何層目で設定ミスしたのか本気で分からなくなるので…。
行列計算の問題なので、行列サイズ(input / output) の形状一つでもミスすると、即エラーです。
尚、im2col 変換や col2im は以前の記事参照で。
畳み込み層
画像の特徴抽出層。
内部でフィルタを持っていて、フィルタによって特徴を拾う。
尚、ニューロンからのリクエスト(backpropagation)を受けて、よりリクエストに合ったフィルタに形状変更していく。
class ILayer: def forward(self, x, is_test): pass def backward(self, grad_y): pass def update(self, eta): pass class ConvLayer(ILayer): def __init__(self, x_ch, x_h, x_w, n_flt, flt_h, flt_w, stride, pad, wb_width): # パラメータをまとめる self.params = (x_ch, x_h, x_w, n_flt, flt_h, flt_w, stride, pad) # フィルタとバイアスの初期値 self.w = wb_width * np.random.randn(n_flt, x_ch, flt_h, flt_w) self.b = wb_width * np.random.randn(1, n_flt) # 出力画像のサイズ self.y_ch = n_flt # 出力チャンネル数 self.y_h = (x_h - flt_h + 2*pad) // stride + 1 # 出力高さ self.y_w = (x_w - flt_w + 2*pad) // stride + 1 # 出力幅 # AdaGrad用 self.h_w = np.zeros((n_flt, x_ch, flt_h, flt_w)) + 1e-8 self.h_b = np.zeros((1, n_flt)) + 1e-8 def forward(self, x, is_test): n_bt = x.shape[0] x_ch, x_h, x_w, n_flt, flt_h, flt_w, stride, pad = self.params y_ch, y_h, y_w = self.y_ch, self.y_h, self.y_w # 入力画像とフィルタを行列に変換 self.cols = im2col(x, flt_h, flt_w, y_h, y_w, stride, pad) self.w_col = self.w.reshape(n_flt, x_ch*flt_h*flt_w) # 出力の計算: 行列積、バイアスの加算、活性化関数 u = np.dot(self.w_col, self.cols).T + self.b self.u = u.reshape(n_bt, y_h, y_w, y_ch).transpose(0, 3, 1, 2) self.y = np.where(self.u <= 0, 0, self.u) return self.y def backward(self, grad_y): n_bt = grad_y.shape[0] x_ch, x_h, x_w, n_flt, flt_h, flt_w, stride, pad = self.params y_ch, y_h, y_w = self.y_ch, self.y_h, self.y_w # delta delta = grad_y * np.where(self.u <= 0, 0, 1) delta = delta.transpose(0,2,3,1).reshape(n_bt*y_h*y_w, y_ch) # フィルタとバイアスの勾配 grad_w = np.dot(self.cols, delta) self.grad_w = grad_w.T.reshape(n_flt, x_ch, flt_h, flt_w) self.grad_b = np.sum(delta, axis=0) # 入力の勾配 grad_cols = np.dot(delta, self.w_col) x_shape = (n_bt, x_ch, x_h, x_w) self.grad_x = col2im(grad_cols.T, x_shape, flt_h, flt_w, y_h, y_w, stride, pad) return self.grad_x def update(self, eta): self.h_w += self.grad_w * self.grad_w self.w -= eta / np.sqrt(self.h_w) * self.grad_w self.h_b += self.grad_b * self.grad_b self.b -= eta / np.sqrt(self.h_b) * self.grad_b
プーリング層
画像を部分部分で区切って、その中で最も大きな値を返す層。
前の層で拾った特徴を、なるだけ損なわずにデータ圧縮してると思えばいい。
class PoolingLayer(ILayer): # batch_size:バッチサイズ, x_ch:入力チャンネル数, x_h:入力画像高さ, x_w:入力画像幅 # pool:プーリング領域のサイズ, pad:パディング幅 # y_ch:出力チャンネル数, y_h:出力高さ, y_w:出力幅 def __init__(self, x_ch, x_h, x_w, pool, pad): # パラメータをまとめる self.params = (x_ch, x_h, x_w, pool, pad) # 出力画像のサイズ self.y_ch = x_ch # 出力チャンネル数 self.y_h = x_h//pool if x_h%pool==0 else x_h//pool+1 # 出力高さ self.y_w = x_w//pool if x_w%pool==0 else x_w//pool+1 # 出力幅 def forward(self, x, is_test): n_bt = x.shape[0] x_ch, x_h, x_w, pool, pad = self.params y_ch, y_h, y_w = self.y_ch, self.y_h, self.y_w # 入力画像を行列に変換 cols = im2col(x, pool, pool, y_h, y_w, pool, pad) cols = cols.T.reshape(n_bt*y_h*y_w*x_ch, pool*pool) # 出力の計算: Maxプーリング y = np.max(cols, axis=1) self.y = y.reshape(n_bt, y_h, y_w, x_ch).transpose(0, 3, 1, 2) # 最大値のインデックスを保存 self.max_index = np.argmax(cols, axis=1) return self.y def backward(self, grad_y): n_bt = grad_y.shape[0] x_ch, x_h, x_w, pool, pad = self.params y_ch, y_h, y_w = self.y_ch, self.y_h, self.y_w # 出力の勾配の軸を入れ替え grad_y = grad_y.transpose(0, 2, 3, 1) # 行列を作成し、各列の最大値であった要素にのみ出力の勾配を入れる grad_cols = np.zeros((pool*pool, grad_y.size)) grad_cols[self.max_index.reshape(-1), np.arange(grad_y.size)] = grad_y.reshape(-1) grad_cols = grad_cols.reshape(pool, pool, n_bt, y_h, y_w, y_ch) grad_cols = grad_cols.transpose(5,0,1,2,3,4) grad_cols = grad_cols.reshape( y_ch*pool*pool, n_bt*y_h*y_w) # 入力の勾配 x_shape = (n_bt, x_ch, x_h, x_w) self.grad_x = col2im(grad_cols, x_shape, pool, pool, y_h, y_w, pool, pad) return self.grad_x
ニューロン/出力層
文字通りニューロン。
この辺の実装はニューラルネット作り始めた初期からほぼ変更なし
つまりこれはこれでほぼほぼ汎用化されてると言っていいハズ
class Neuron(ILayer): def __init__(self, n_upper, n, wb_width, activation_function, differential_function): self.w = wb_width * np.random.randn(n_upper, n) # ランダムなのが確立勾配法 self.b = wb_width * np.random.randn(n) self.grad_w = np.zeros((n_upper, n)) self.grad_b = np.zeros((n)) self.activation_function = activation_function self.differential_function = differential_function def update(self, eta): self.w -= eta * self.grad_w self.b -= eta * self.grad_b def forward(self, x, is_test): self.x = x u = x.dot(self.w) + self.b self.y = self.activation_function(u) return self.y def backward(self, t): delta = self.differential_function(self.y, t) self.grad_w = self.x.T.dot(delta) self.grad_b = np.sum(delta, axis=0) self.grad_x = delta.dot(self.w.T) return self.grad_x class AdaNeuron(Neuron): def __init__(self, n_upper, n, wb_width, activation_function, differential_function): super().__init__(n_upper, n, wb_width, activation_function, differential_function) self.h_w = np.zeros((n_upper, n)) + 1e-8 self.h_b = np.zeros((n)) + 1e-8 def update(self, eta): # ここだけ更新 self.h_w += (self.grad_w * self.grad_w) self.h_b += (self.grad_b * self.grad_b) self.w -= eta / np.sqrt(self.h_w) * self.grad_w self.b -= eta / np.sqrt(self.h_b) * self.grad_b class Output(AdaNeuron): pass class Middle(AdaNeuron): def backward(self, grad_y): delta = self.differential_function(grad_y, self.y) self.grad_w = self.x.T.dot(delta) self.grad_b = delta.sum(axis = 0) self.grad_x = delta.dot(self.w.T) return self.grad_x
ドロップアウト
過学習対策層。ニューロンの出力値を受け取って確率的に結果を無効化する動きをする。
言ってみればニューロンの学習妨害装置。
class Dropout(ILayer): def __init__(self, dropout_rate): self.dropout_rate = dropout_rate def forward(self, x, is_train): if is_train: rand = np.random.rand(*x.shape) self.dropout = np.where(rand > self.dropout_rate, 1, 0) self.y = x * self.dropout else: self.y = (1 - self.dropout_rate) * x return self.y def backward(self, grad_y): self.grad_x = grad_y * self.dropout return self.grad_x
活性関数群
ニューロン云々見れば説明不要っぽい気がする。
def relu_func(x): return np.where(x <= 0, 0, x) def relu_func_dash(y, t): return y * np.where(t <= 0, 0, 1) def soft_max(u): return np.exp(u) / np.sum(np.exp(u), axis=1).reshape(-1, 1) def soft_max_dash(y, t): return y - t
んじゃDeepLerning だ
今回の構成
- 畳み込み層 1 : 10 種類のフィルタ
- 畳み込み層 2 : 10 種類のフィルタ
- プーリング層
- ReLU ニューロン 200 個(ニューロン層 1)
- ドロップアウト層 1
- ReLU ニューロン 200 個(ニューロン層 2)
- 出力層
という構成。
因みに、このくらいの規模を原始的な微分での計算なんてしようものなら、何時間かかるか分かったものではない
# 各設定値 img_h = 8 # 入力画像の高さ img_w = 8 # 入力画像の幅 img_ch = 1 # 入力画像のチャンネル数 wb_width = 0.1 # 重みとバイアスの広がり具合 eta = 0.01 # 学習係数 epoch = 50 batch_size = 8 interval = 10 # 経過の表示間隔 n_sample = 200 # 誤差計測のサンプル数 # 畳み込み層 cl_1 = ConvLayer( x_ch=img_ch, x_h=img_h, x_w=img_w, n_flt=10, flt_h=3, flt_w=3, stride=1, pad=1, wb_width=wb_width ) cl_2 = ConvLayer( x_ch=cl_1.y_ch, x_h=cl_1.y_h, x_w=cl_1.y_w, n_flt=10, flt_h=3, flt_w=3, stride=1, pad=1, wb_width=wb_width ) # プーリング層 pl_1 = PoolingLayer( x_ch=cl_1.y_ch, x_h=cl_1.y_h, x_w=cl_1.y_w, pool=2, pad=0 ) # 中間層 n_fc_in = pl_1.y_ch * pl_1.y_h * pl_1.y_w ml_1 = Middle(n_fc_in, 200, wb_width, relu_func, relu_func_dash) dr_1 = Dropout(0.5) ml_2 = Middle(200, 200, wb_width, relu_func, relu_func_dash) dr_2 = Dropout(0.5) # 出力層 ol_1 = Output(200, 10, wb_width, soft_max, soft_max_dash) conv_layers = [cl_1, cl_2, pl_1] neuron_layers = [ml_1, dr_1, ml_2, dr_2, ol_1] def forward_propagation(x, is_train=True): """順伝播""" batch_size = x.shape[0] y = x.reshape(batch_size, img_ch, img_h, img_w) for cl in conv_layers: y = cl.forward(y, is_train) y = y.reshape(batch_size, -1) for n in neuron_layers: y = n.forward(y, is_train) def backpropagation(t): """逆伝播""" batch_size = t.shape[0] grad_x = t rev_nr = neuron_layers[::-1] for n in rev_nr: grad_x = n.backward(grad_x) grad_x = grad_x.reshape(batch_size, pl_1.y_ch, pl_1.y_h, pl_1.y_w) cl_rev = conv_layers[::-1] for cl in cl_rev: grad_x = cl.backward(grad_x) def uppdate_wb(): """重みとバイアスの更新""" for cl in conv_layers: cl.update(eta) for nr in neuron_layers: nr.update(eta) def get_error(t, batch_size): """誤差を計算""" return -np.sum(t * np.log(ol_1.y + 1e-7)) / batch_size # 交差エントロピー誤差 def forward_sample(inp, correct, n_sample): """サンプルを順伝播""" index_rand = np.arange(len(correct)) np.random.shuffle(index_rand) index_rand = index_rand[:n_sample] x = inp[index_rand, :] t = correct[index_rand, :] forward_propagation(x, False) return x, t
それでは食わせる学習データを用意して
from sklearn import datasets from sklearn.model_selection import train_test_split # 手書き文字データセットの読み込み digits_data = datasets.load_digits() input_data = digits_data.data correct = digits_data.target n_data = len(correct) # 入力データの標準化 def standardize(x): av = np.average(x) std = np.std(x) return (x - av) / std input_data = standardize(input_data) # -- 正解をone-hot表現に -- correct_data = np.zeros((n_data, 10)) for i in range(n_data): correct_data[i, correct[i]] = 1.0 # 学習データとテストデータの分離 X_train, X_test, y_train, y_test = train_test_split(input_data, correct_data, random_state=0) n_train = X_train.shape[0] # 訓練データのサンプル数 n_test = X_test.shape[0] # テストデータのサンプル数
実行開始
# 誤差の記録用 train_error_x = [] train_error_y = [] test_error_x = [] test_error_y = [] n_batch = n_train // batch_size for i in range(epoch): # 誤差計算 x, t = forward_sample(X_train, y_train, n_sample) error_train = get_error(t, n_sample) x, t = forward_sample(X_test, y_test, n_sample) error_test = get_error(t, n_sample) # -- 誤差の記録 -- train_error_x.append(i) train_error_y.append(error_train) test_error_x.append(i) test_error_y.append(error_test) # -- 経過の表示 -- if i % interval == 0: print(f"Epoch:{i}/{epoch}, Error_train:{error_train}, Error_test:{error_test}") # -- 学習 -- index_rand = np.arange(n_train) np.random.shuffle(index_rand) for j in range(n_batch): mb_index = index_rand[j*batch_size : (j+1)*batch_size] x = X_train[mb_index, :] t = y_train[mb_index, :] forward_propagation(x) backpropagation(t) uppdate_wb() # 誤差の記録をグラフ表示 plt.plot(train_error_x, train_error_y, label="Train") plt.plot(test_error_x, test_error_y, label="Test") plt.legend() plt.xlabel("Epochs") plt.ylabel("ErrorRate") plt.show() # 正解率の測定 x, t = forward_sample(X_train, y_train, n_train) count_train = np.sum(np.argmax(ol_1.y, axis=1) == np.argmax(t, axis=1)) x, t = forward_sample(X_test, y_test, n_test) count_test = np.sum(np.argmax(ol_1.y, axis=1) == np.argmax(t, axis=1)) print(f"Accuracy Train:{count_train/n_train*100}%, Accuracy Test:{count_test/n_test*100}%")
実行結果はこんな感じ
Accuracy Train:100.0%, Accuracy Test:98.66666666666667%
前回の時点で、大して誤差は出てなかったので、
Accuracy Train: 100.0 % Accuracy Test: 98.88888888888889 %
正直ここまでする必要があったかとは思わなくもなかった(汗