PyTorchで作るRNN 後編
それではさっそく続きをやっていきます。
前回のソースコードも含めますが、「class RNNModel」というところから解説していきます。
RNN.py
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import math class RNNHardCell(nn.Module): def __init__(self, n_input:int, n_hidden:int, state=None) -> None: super(RNNHardCell, self).__init__() self.n_input = n_input self.n_hidden = n_hidden self.in_h = nn.Linear(self.n_input, self.n_hidden, bias=False) self.h_h = nn.Linear(self.n_hidden, self.n_hidden, bias=False) self.state = state self.register_parameter() def register_parameter(self) -> None: stdv = 1.0 / math.sqrt(self.n_hidden) for weight in self.parameters(): nn.init.uniform_(weight, -stdv, stdv) def forward(self, x, state=None): self.state = state if self.state is None: self.state = F.hardtanh(self.in_h(x)) else: self.state = F.hardtanh(self.in_h(x) + self.h_h(self.state)) return self.state class RNNModel(nn.Module): def __init__(self, n_input, n_hidden, n_output, num_layers=1): super(RNNModel, self).__init__() self.rnn = RNNHardCell(n_input, n_hidden) self.out = nn.Linear(n_hidden, n_output, bias=False) self.num_layers = num_layers def forward(self, xs, state=None): state = None h_seq = [] for x in xs: x = torch.from_numpy(np.asarray(x)).float() x = x.unsqueeze(0) for _ in range(self.num_layers): state = self.rnn(x, state) h_seq.append(state) h_seq = torch.stack(h_seq) ys = self.out(h_seq) ys = torch.transpose(ys, 0, 1) return ys
上記ソースコードを「RNN.py」として保存します。
RNNHardCellに全結合層であるLinearモデルを接続します。
forwardで受け取るxsはデータ長毎に受け取るtrain・testデータを想定しています。
データ長とは、時系列データを取り扱う上で、どのぐらいの期間をひとかたまりとして学習させるかというものです。
PyTorch標準のnn.RNNやnn.LSTMのinputの形状は(seq_len, batch, input_size)を想定しています。
このseq_lenがデータ長にあたります。
具体例を挙げると、0.01sec刻みのデータを100個学習させるとなると、1sec間のデータを学習させることになります。
forward内でfor文として回してるのは、この0.01secずつ推論を行っているというイメージです。
これでRNNモデルの構築は完了です。
次は学習に使うデータセットを用意する関数を作りましょう。
dataset.py
時系列データのチュートリアルではsin関数の周波数を予測するものが多く用いられます。
時系列データとしては一番シンプルでわかりやすいですよね。
というわけでsin関数のデータセットを作る関数を用意しました。
import numpy as np def sin_dataset(n_data, n_test): x = np.linspace(0, 2 * np.pi, n_data + n_test) ram = np.random.permutation(n_data + n_test) x_train = np.sort(x[ram[:n_data]]) x_test = np.sort(x[ram[n_data:]]) y_train = np.sin(x_train) y_test = np.sin(x_test) return x_train, y_train, x_test, y_test
これを「dataset.py」として保存します。
x_train, y_train, x_test, y_testという一般的なデータセット形式で出力できるようにしておきます。
ではこのsin関数を使って、sin関数を予測させるための学習を実行してみたいと思います。
train.py
import numpy as np import os os.environ['KMP_DUPLICATE_LIB_OK']='TRUE' import math #import time import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.nn.functional as F from torch import optim from RNN import RNNHardCell, RNNModel from dataset import sin_dataset # parameter epochs = 100 n_input = 1 n_hidden = 10 n_output = 1 num_layers = 2 n_batch = 20 n_data = 1000 n_test = 200 def main(): # dataset x_train, y_train, x_test, y_test = sin_dataset(n_data, n_test) y_test_torch = torch.from_numpy(np.asarray(y_test)) y_test_torch = y_test_torch.unsqueeze(0) # model, optimizer, loss function model = RNNModel(n_input, n_hidden, n_output, num_layers) optimizer = optim.Adam(model.parameters()) MSE = nn.MSELoss() train_loss = [] test_loss = [] if os.path.exists("result/loss") == False: os.makedirs("result/loss") if os.path.exists("result/eval") == False: os.makedirs("result/eval") if os.path.exists("result/model") == False: os.makedirs("result/model") # train for epoch in range(1, epochs + 1): model.train() perm = np.random.permutation(n_data) sum_loss = 0 for i in range(0, n_data, n_batch): x_batch = x_train[perm[i:i + n_batch]] ant_batch = y_train[perm[i:i + n_batch]] ant_batch = torch.from_numpy(ant_batch).double() ant_batch = ant_batch.unsqueeze(0) optimizer.zero_grad() y_batch_pred = model(x_batch, ant_batch).double() loss = MSE(y_batch_pred, ant_batch) loss.backward() optimizer.step() sum_loss += loss.data * n_batch # loss(train) ave_loss = sum_loss / n_data train_loss.append(ave_loss) # loss(test) model.eval() y_test_pred = model.forward(x_test) loss = MSE(y_test_pred, y_test_torch) test_loss.append(loss.data) # loss display if epoch % 100 == 1: print("Ep/MaxEp train_loss test_loss") if epoch % 10 == 0: print("{:4}/{} {:10.5} {:10.5}".format(epoch, epochs, ave_loss, float(loss.data))) if epoch % 20 == 0: plt.figure(figsize=(5, 4)) y_pred = model.forward(x_test) # tensor → numpy y_pred = y_pred.to('cpu').detach().numpy().copy() plt.plot(x_test, y_test, label = "target") plt.plot(x_test, y_pred[0], label = "predict") plt.legend() plt.grid(True) plt.xlim(0, 2 * np.pi) plt.ylim(-1.2, 1.2) plt.xlabel("x") plt.ylabel("y") plt.savefig("result/eval/ep{}.png".format(epoch)) plt.clf() plt.close() # save loss glaph plt.figure(figsize=(5, 4)) plt.plot(train_loss, label = "training") plt.plot(test_loss, label = "test") plt.yscale('log') plt.legend() plt.grid(True) plt.xlabel("epoch") plt.ylabel("loss (MSE)") plt.savefig("result/loss/loss_history.png") plt.clf() plt.close() # save best_model torch.save(model, "result/model/best_model.pt") if __name__ == "__main__": main()
これを「train.py」として保存します。
PyTorchのtrainソースコードは処理順序がとても分かりやすいかと思います。
ざっくり処理内容の概要を書くと
1.データセットを用意する
2.モデル、オプティマイザー、loss関数を用意する
3.エポック数分のfor文を用意し、その中でpredictとlossの計算をする
4.lossの表示、epoch毎のtestデータによる推論結果を保存する
5.最後にlossの推移とbest_modelを保存する
途中numpyをPyTorchのTensor型にしたり、次元数を増やすためにunsqueezeしたりしてます。
ディープラーニング系は扱うデータのshape合わせと格闘するのが日常です。
では最後に学習した結果を紹介しようと思います。
学習結果
それでは、上記3つのソースコードを同じ場所に置き、「python train.py」で実行してみましょう。
そうすると、resultフォルダが作成され、その中に「eval」「loss」「model」フォルダが作成されます。
まずlossフォルダの中には、lossの推移が表示されます。
おぉ、良い感じに下がってますね。
実行中にも10epoch毎に数値として表示されるようにしてます。
続いてepoch回数が進むごとに、sinの予測結果がどうなったか見てみます。
今回はepochs=100で実施しましたので、20epochと100epochで比較してみます。
20epochは後半予測が上手くいってないのに対して、epoch100の方がよりsin関数を予測出来ていることが分かりますね。
なんとかRNNとして機能したモデルを自作することに成功しました。
時系列データは画像認識に比べてシンプルですし、モデルの構造もデータサイズも軽量なので試行錯誤しやすくていいですね。
今回はepoch数も100回と少ないので、200や300にすればより正確なsin関数を予測すると思います。
また隠れ層の数を10から20に増やしたりすると精度向上につながると思います。
num_layersという変数も用意しており、これを増やせばRNNモデルを複数連結することができるので、これも精度向上が実現できるかと思います。
まとめ
去年の年末からRNNについて勉強してきて約1か月ですが、ようやく実装までたどり着きました。
できるだけ自作するということをコンセプトにしてきましたが、やはり理解が良く深まり、あらためてPyTorch標準のRNNも安心して使えそうです。
フレームワークを使う前に、まずは自分でゴリゴリ書きながら苦戦しながら理解を深めるという勉強法はおススメです。
次回はLSTMに挑戦してみようと思います。
今回はここまで!