技術をかじる猫

適当に気になった技術や言語、思ったこと考えた事など。

LSTM 作ってみた

white-azalea.hatenablog.jp

これの続き。

逆伝播の説明を飲み込むのに超絶苦労したわ…数学力無い自分を恨むねぇ(汗

各種勾配は順伝播式を偏微分式計算する必要があり

  • 入力にかかる重み勾配 \frac{\partial E}{\partial W_g}
  • 前の出力にかける勾配  \frac{\partial E}{\partial V_g}
  • バイアス勾配  \frac{\partial E}{\partial B_g}

各時刻でも以下の勾配を計算する。

  • 入力の勾配  \frac{\partial E}{\partial X^{(t)}}
  • 前の時刻の出力の勾配  \frac{\partial E}{\partial Y^{(t-1)}}
  • 前の時刻の記憶セルの勾配  \frac{\partial E}{\partial C^{(t-1)}}

この微分をしなきゃいけない…うわぁ…
ってことで、導出過程を端折ってしまうと

忘却ゲートは


\delta^{(t)}_0 = r^{(t)} c^{(t-1)} a^{(t)}_0 (a - a^{(t)}_0)

入力ゲート


\delta^{(t)}_1 = r^{(t)} a^{(t)}_2 a^{(t)}_1 (1 - a^{(t)}_1)

新しい記憶


\delta^{(t)}_2 = r^{(t)} a^{(t)}_1 (1 - a^{(t)2}_2)

出力ゲート


\delta^{(t)}_3 = \frac{\partial E}{\partial y^{(t)}} tanh(c^{(t)}) a^{(t)}_3 (1 - a^{(t)}_3)

導出過程とか細かい論理とかはこれ参照。
これを見ながら頑張ればなんとか導出も理解できる。

実装してみた!(つってもだいたい写経になっちゃったけど…)

f:id:white-azalea:20211115213353p:plain

ギザギザしてる波形が学習データ。
黄色線がそこから予測した値。

実行結果はこんな感じ。
でもこれ株価に取り入れようとしたとき、実は誤差が埋まらなかった…

これがなぜかと言うと、株価を調べてみるとわかるのだけど、米国株 2020-2021(8月)まではトレンドラインに沿った上昇トレンドで、単純に保持してれば儲かったという状況。
しかし9月からテーパリングの話が持ち上がって乱高下…来年はスタグフレーションを原因としたボックス相場(短期的な上がり下がりだけして、平均株価が停滞する)と予測されてる。
正直このような状況ももともとあって、こうした局面が変わるタイミングはあくまで政治・経済イベントによるから、そのタイミングをAIで予測することが難しいという話。

要するに大きな転換期にやり始めると痛い目見そう…ってこと(泣)

実装は以下

import numpy as np
import matplotlib.pyplot as plt

n_time = 10  # 時系列の数
n_in = 1     # 入力層のニューロン数
n_mid = 30   # 中間層のニューロン数
n_out = 1    # 出力層のニューロン数

eta = 0.01     # 学習係数
epochs = 101   # 学習データの学習回数
batch_size = 8 # 1回の学習の処理数
interval = 10  # 経過の表示間隔

def sigmoid(x):
    return 1/(1+np.exp(-x))

# 訓練データの作成
# -2πから2πまでの波線作成
sin_x = np.linspace(-2*np.pi, 2*np.pi)
# 波線にノイズを加える
sin_y = np.sin(sin_x)  + 0.1*np.random.randn(len(sin_x))
# サンプル数
n_sample = len(sin_x)-n_time
# 入力
input_data = np.zeros((n_sample, n_time, n_in))
# 正解データ
correct_data = np.zeros((n_sample, n_out))
for i in range(0, n_sample):
    input_data[i] = sin_y[i : i+n_time].reshape(-1, 1)
    # 正解は入力よりも一つ後
    correct_data[i] = sin_y[i+n_time : i+n_time+1]

# LSTM層 
class LSTMLayer:
    def __init__(self, n_upper, n):
        self.w = np.random.randn(4, n_upper, n) / np.sqrt(n_upper)  # Xavierの初期値
        self.v = np.random.randn(4, n, n) / np.sqrt(n)
        self.b = np.zeros((4, n))

    def forward(self, x, y_prev, c_prev):
        # y_prev, c_prev: 前の時刻の出力と記憶セル
        u = np.matmul(x, self.w) + np.matmul(y_prev, self.v) + self.b.reshape(4, 1, -1)

        a0 = sigmoid(u[0])  # 忘却ゲート
        a1 = sigmoid(u[1])  # 入力ゲート
        a2 = np.tanh(u[2])  # 新しい記憶
        a3 = sigmoid(u[3])  # 出力ゲート
        self.gates = np.stack((a0, a1, a2, a3))
        self.c = a0 * c_prev + a1 * a2 # 記憶セル
        self.y = a3 * np.tanh(self.c)  # 出力
    
    def backward(self, x, y, c, y_prev, c_prev, gates, grad_y, grad_c):   
        a0, a1, a2, a3 = gates
        tanh_c = np.tanh(c)
        r = grad_c + (grad_y*a3) * (1-tanh_c**2)

        # 各delta値
        delta_a0 = r * c_prev * a0 * (1-a0)
        delta_a1 = r * a2 * a1 * (1-a1)
        delta_a2 = r * a1 * (1 - a2**2)
        delta_a3 = grad_y * tanh_c * a3 * (1 - a3)

        deltas = np.stack((delta_a0, delta_a1, delta_a2, delta_a3))

        # 各パラメータの勾配
        self.grad_w += np.matmul(x.T, deltas)
        self.grad_v += np.matmul(y_prev.T, deltas)
        self.grad_b += np.sum(deltas, axis=1)

        # x
        grad_x = np.matmul(deltas, self.w.transpose(0, 2, 1))
        self.grad_x = np.sum(grad_x, axis=0)

        # y_prev
        grad_y_prev = np.matmul(deltas, self.v.transpose(0, 2, 1))
        self.grad_y_prev = np.sum(grad_y_prev, axis=0)
        
        # c_prevの勾配
        self.grad_c_prev = r * a0

    def reset_sum_grad(self):
        self.grad_w = np.zeros_like(self.w)
        self.grad_v = np.zeros_like(self.v)
        self.grad_b = np.zeros_like(self.b)

    def update(self, eta):
        self.w -= eta * self.grad_w
        self.v -= eta * self.grad_v
        self.b -= eta * self.grad_b

# 全結合 出力層
class OutputLayer:
    def __init__(self, n_upper, n):
        self.w = np.random.randn(n_upper, n) / np.sqrt(n_upper)  # Xavierの初期値
        self.b = np.zeros(n)

    def forward(self, x):
        self.x = x
        u = np.dot(x, self.w) + self.b
        self.y = u  # 恒等関数

    def backward(self, t):
        delta = self.y - t
        
        self.grad_w = np.dot(self.x.T, delta)
        self.grad_b = np.sum(delta, axis=0)
        self.grad_x = np.dot(delta, self.w.T) 

    def update(self, eta):
        self.w -= eta * self.grad_w
        self.b -= eta * self.grad_b

# 各層の初期化
lstm_layer = LSTMLayer(n_in, n_mid)
output_layer = OutputLayer(n_mid, n_out)

# 訓練
def train(x_mb, t_mb):
    # 順伝播 LSTM層
    y_rnn = np.zeros((len(x_mb), n_time+1, n_mid))
    c_rnn = np.zeros((len(x_mb), n_time+1, n_mid))
    gates_rnn = np.zeros((4, len(x_mb), n_time, n_mid))
    y_prev = y_rnn[:, 0, :]
    c_prev = c_rnn[:, 0, :]
    for i in range(n_time):
        x = x_mb[:, i, :]
        lstm_layer.forward(x, y_prev, c_prev)

        y = lstm_layer.y
        y_rnn[:, i+1, :] = y
        y_prev = y

        c = lstm_layer.c
        c_rnn[:, i+1, :] = c
        c_prev = c

        gates = lstm_layer.gates
        gates_rnn[:, :, i, :] = gates

    # 順伝播 出力層
    output_layer.forward(y)

    # 逆伝播 出力層
    output_layer.backward(t_mb)
    grad_y = output_layer.grad_x
    grad_c = np.zeros_like(lstm_layer.c)

    # 逆伝播 LSTM層
    lstm_layer.reset_sum_grad()
    for i in reversed(range(n_time)):
        x = x_mb[:, i, :]
        y = y_rnn[:, i+1, :]
        c = c_rnn[:, i+1, :]
        y_prev = y_rnn[:, i, :]
        c_prev = c_rnn[:, i, :]
        gates = gates_rnn[:, :, i, :] 

        lstm_layer.backward(x, y, c, y_prev, c_prev, gates, grad_y, grad_c)
        grad_y = lstm_layer.grad_y_prev
        grad_c = lstm_layer.grad_c_prev

    # パラメータの更新
    lstm_layer.update(eta)
    output_layer.update(eta)

# 予測
def predict(x_mb):
    # 順伝播 LSTM層
    y_prev = np.zeros((len(x_mb), n_mid))
    c_prev = np.zeros((len(x_mb), n_mid))
    for i in range(n_time):
        x = x_mb[:, i, :]
        lstm_layer.forward(x, y_prev, c_prev)
        y = lstm_layer.y
        y_prev = y
        c = lstm_layer.c
        c_prev = c

    # 順伝播 出力層
    output_layer.forward(y)
    return output_layer.y

# 誤差を計算
def get_error(x, t):
    y = predict(x)
    return 1.0/2.0*np.sum(np.square(y - t))  # 二乗和誤差

error_record = []
n_batch = len(input_data) // batch_size  # 1エポックあたりのバッチ数
for i in range(epochs):
        
    # 学習 
    index_random = np.arange(len(input_data))
    np.random.shuffle(index_random)  # インデックスをシャッフルする
    for j in range(n_batch):
        
        # ミニバッチを取り出す
        mb_index = index_random[j*batch_size : (j+1)*batch_size]
        x_mb = input_data[mb_index, :]
        t_mb = correct_data[mb_index, :]
        train(x_mb, t_mb)

    # 誤差を求める
    error = get_error(input_data, correct_data)
    error_record.append(error)

    # 経過の表示 
    if i%interval == 0:
        print("Epoch:"+str(i+1)+"/"+str(epochs), "Error:"+str(error))

        predicted = input_data[0].reshape(-1).tolist() # 最初の入力
        for i in range(n_sample):
            x = np.array(predicted[-n_time:]).reshape(1, n_time, 1)
            y = predict(x)
            predicted.append(float(y[0, 0]))  # 出力をpredictedに追加する

        plt.plot(range(len(sin_y)), sin_y.tolist(), label="Correct")
        plt.plot(range(len(predicted)), predicted, label="Predicted")
        plt.legend()
        plt.show()

plt.plot(range(1, len(error_record)+1), error_record)
plt.xlabel("Epochs")
plt.ylabel("Error")
plt.show()