LSTM 作ってみた
これの続き。
逆伝播の説明を飲み込むのに超絶苦労したわ…数学力無い自分を恨むねぇ(汗
各種勾配は順伝播式を偏微分式計算する必要があり
- 入力にかかる重み勾配
- 前の出力にかける勾配
- バイアス勾配
各時刻でも以下の勾配を計算する。
- 入力の勾配
- 前の時刻の出力の勾配
- 前の時刻の記憶セルの勾配
この微分をしなきゃいけない…うわぁ…
ってことで、導出過程を端折ってしまうと
忘却ゲートは
入力ゲート
新しい記憶
出力ゲート
導出過程とか細かい論理とかはこれ参照。
これを見ながら頑張ればなんとか導出も理解できる。
実装してみた!(つってもだいたい写経になっちゃったけど…)
ギザギザしてる波形が学習データ。
黄色線がそこから予測した値。
実行結果はこんな感じ。
でもこれ株価に取り入れようとしたとき、実は誤差が埋まらなかった…
これがなぜかと言うと、株価を調べてみるとわかるのだけど、米国株 2020-2021(8月)まではトレンドラインに沿った上昇トレンドで、単純に保持してれば儲かったという状況。
しかし9月からテーパリングの話が持ち上がって乱高下…来年はスタグフレーションを原因としたボックス相場(短期的な上がり下がりだけして、平均株価が停滞する)と予測されてる。
正直このような状況ももともとあって、こうした局面が変わるタイミングはあくまで政治・経済イベントによるから、そのタイミングをAIで予測することが難しいという話。
要するに大きな転換期にやり始めると痛い目見そう…ってこと(泣)
実装は以下
import numpy as np import matplotlib.pyplot as plt n_time = 10 # 時系列の数 n_in = 1 # 入力層のニューロン数 n_mid = 30 # 中間層のニューロン数 n_out = 1 # 出力層のニューロン数 eta = 0.01 # 学習係数 epochs = 101 # 学習データの学習回数 batch_size = 8 # 1回の学習の処理数 interval = 10 # 経過の表示間隔 def sigmoid(x): return 1/(1+np.exp(-x)) # 訓練データの作成 # -2πから2πまでの波線作成 sin_x = np.linspace(-2*np.pi, 2*np.pi) # 波線にノイズを加える sin_y = np.sin(sin_x) + 0.1*np.random.randn(len(sin_x)) # サンプル数 n_sample = len(sin_x)-n_time # 入力 input_data = np.zeros((n_sample, n_time, n_in)) # 正解データ correct_data = np.zeros((n_sample, n_out)) for i in range(0, n_sample): input_data[i] = sin_y[i : i+n_time].reshape(-1, 1) # 正解は入力よりも一つ後 correct_data[i] = sin_y[i+n_time : i+n_time+1] # LSTM層 class LSTMLayer: def __init__(self, n_upper, n): self.w = np.random.randn(4, n_upper, n) / np.sqrt(n_upper) # Xavierの初期値 self.v = np.random.randn(4, n, n) / np.sqrt(n) self.b = np.zeros((4, n)) def forward(self, x, y_prev, c_prev): # y_prev, c_prev: 前の時刻の出力と記憶セル u = np.matmul(x, self.w) + np.matmul(y_prev, self.v) + self.b.reshape(4, 1, -1) a0 = sigmoid(u[0]) # 忘却ゲート a1 = sigmoid(u[1]) # 入力ゲート a2 = np.tanh(u[2]) # 新しい記憶 a3 = sigmoid(u[3]) # 出力ゲート self.gates = np.stack((a0, a1, a2, a3)) self.c = a0 * c_prev + a1 * a2 # 記憶セル self.y = a3 * np.tanh(self.c) # 出力 def backward(self, x, y, c, y_prev, c_prev, gates, grad_y, grad_c): a0, a1, a2, a3 = gates tanh_c = np.tanh(c) r = grad_c + (grad_y*a3) * (1-tanh_c**2) # 各delta値 delta_a0 = r * c_prev * a0 * (1-a0) delta_a1 = r * a2 * a1 * (1-a1) delta_a2 = r * a1 * (1 - a2**2) delta_a3 = grad_y * tanh_c * a3 * (1 - a3) deltas = np.stack((delta_a0, delta_a1, delta_a2, delta_a3)) # 各パラメータの勾配 self.grad_w += np.matmul(x.T, deltas) self.grad_v += np.matmul(y_prev.T, deltas) self.grad_b += np.sum(deltas, axis=1) # x grad_x = np.matmul(deltas, self.w.transpose(0, 2, 1)) self.grad_x = np.sum(grad_x, axis=0) # y_prev grad_y_prev = np.matmul(deltas, self.v.transpose(0, 2, 1)) self.grad_y_prev = np.sum(grad_y_prev, axis=0) # c_prevの勾配 self.grad_c_prev = r * a0 def reset_sum_grad(self): self.grad_w = np.zeros_like(self.w) self.grad_v = np.zeros_like(self.v) self.grad_b = np.zeros_like(self.b) def update(self, eta): self.w -= eta * self.grad_w self.v -= eta * self.grad_v self.b -= eta * self.grad_b # 全結合 出力層 class OutputLayer: def __init__(self, n_upper, n): self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper) # Xavierの初期値 self.b = np.zeros(n) def forward(self, x): self.x = x u = np.dot(x, self.w) + self.b self.y = u # 恒等関数 def backward(self, t): delta = self.y - t self.grad_w = np.dot(self.x.T, delta) self.grad_b = np.sum(delta, axis=0) self.grad_x = np.dot(delta, self.w.T) def update(self, eta): self.w -= eta * self.grad_w self.b -= eta * self.grad_b # 各層の初期化 lstm_layer = LSTMLayer(n_in, n_mid) output_layer = OutputLayer(n_mid, n_out) # 訓練 def train(x_mb, t_mb): # 順伝播 LSTM層 y_rnn = np.zeros((len(x_mb), n_time+1, n_mid)) c_rnn = np.zeros((len(x_mb), n_time+1, n_mid)) gates_rnn = np.zeros((4, len(x_mb), n_time, n_mid)) y_prev = y_rnn[:, 0, :] c_prev = c_rnn[:, 0, :] for i in range(n_time): x = x_mb[:, i, :] lstm_layer.forward(x, y_prev, c_prev) y = lstm_layer.y y_rnn[:, i+1, :] = y y_prev = y c = lstm_layer.c c_rnn[:, i+1, :] = c c_prev = c gates = lstm_layer.gates gates_rnn[:, :, i, :] = gates # 順伝播 出力層 output_layer.forward(y) # 逆伝播 出力層 output_layer.backward(t_mb) grad_y = output_layer.grad_x grad_c = np.zeros_like(lstm_layer.c) # 逆伝播 LSTM層 lstm_layer.reset_sum_grad() for i in reversed(range(n_time)): x = x_mb[:, i, :] y = y_rnn[:, i+1, :] c = c_rnn[:, i+1, :] y_prev = y_rnn[:, i, :] c_prev = c_rnn[:, i, :] gates = gates_rnn[:, :, i, :] lstm_layer.backward(x, y, c, y_prev, c_prev, gates, grad_y, grad_c) grad_y = lstm_layer.grad_y_prev grad_c = lstm_layer.grad_c_prev # パラメータの更新 lstm_layer.update(eta) output_layer.update(eta) # 予測 def predict(x_mb): # 順伝播 LSTM層 y_prev = np.zeros((len(x_mb), n_mid)) c_prev = np.zeros((len(x_mb), n_mid)) for i in range(n_time): x = x_mb[:, i, :] lstm_layer.forward(x, y_prev, c_prev) y = lstm_layer.y y_prev = y c = lstm_layer.c c_prev = c # 順伝播 出力層 output_layer.forward(y) return output_layer.y # 誤差を計算 def get_error(x, t): y = predict(x) return 1.0/2.0*np.sum(np.square(y - t)) # 二乗和誤差 error_record = [] n_batch = len(input_data) // batch_size # 1エポックあたりのバッチ数 for i in range(epochs): # 学習 index_random = np.arange(len(input_data)) np.random.shuffle(index_random) # インデックスをシャッフルする for j in range(n_batch): # ミニバッチを取り出す mb_index = index_random[j*batch_size : (j+1)*batch_size] x_mb = input_data[mb_index, :] t_mb = correct_data[mb_index, :] train(x_mb, t_mb) # 誤差を求める error = get_error(input_data, correct_data) error_record.append(error) # 経過の表示 if i%interval == 0: print("Epoch:"+str(i+1)+"/"+str(epochs), "Error:"+str(error)) predicted = input_data[0].reshape(-1).tolist() # 最初の入力 for i in range(n_sample): x = np.array(predicted[-n_time:]).reshape(1, n_time, 1) y = predict(x) predicted.append(float(y[0, 0])) # 出力をpredictedに追加する plt.plot(range(len(sin_y)), sin_y.tolist(), label="Correct") plt.plot(range(len(predicted)), predicted, label="Predicted") plt.legend() plt.show() plt.plot(range(1, len(error_record)+1), error_record) plt.xlabel("Epochs") plt.ylabel("Error") plt.show()