技術は使ってなんぼ

自分が得たものを誰かの役に立てたい

PyTorchで作るRNN 後編

それではさっそく続きをやっていきます。


前回のソースコードも含めますが、「class RNNModel」というところから解説していきます。


RNN.py

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class RNNHardCell(nn.Module):
    def __init__(self, n_input:int, n_hidden:int, state=None) -> None:
        super(RNNHardCell, self).__init__()
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.in_h = nn.Linear(self.n_input, self.n_hidden, bias=False)
        self.h_h = nn.Linear(self.n_hidden, self.n_hidden, bias=False)
        self.state = state
        self.register_parameter()

    def register_parameter(self) -> None:
        stdv = 1.0 / math.sqrt(self.n_hidden)
        for weight in self.parameters():
            nn.init.uniform_(weight, -stdv, stdv)
    
    def forward(self, x, state=None):
        self.state = state
        if self.state is None:
            self.state = F.hardtanh(self.in_h(x))
        else:
            self.state = F.hardtanh(self.in_h(x) + self.h_h(self.state))
        return self.state

class RNNModel(nn.Module):
    def __init__(self, n_input, n_hidden, n_output, num_layers=1):
        super(RNNModel, self).__init__()
        self.rnn = RNNHardCell(n_input, n_hidden)
        self.out = nn.Linear(n_hidden, n_output, bias=False)
        self.num_layers = num_layers
        
    def forward(self, xs, state=None):
        state = None
        h_seq = []
        
        for x in xs:
            x = torch.from_numpy(np.asarray(x)).float()
            x = x.unsqueeze(0)
            for _ in range(self.num_layers):
                state = self.rnn(x, state)
            h_seq.append(state)
        
        h_seq = torch.stack(h_seq)
        ys = self.out(h_seq)
        ys = torch.transpose(ys, 0, 1)

        return ys


上記ソースコードを「RNN.py」として保存します。


RNNHardCellに全結合層であるLinearモデルを接続します。


forwardで受け取るxsはデータ長毎に受け取るtrain・testデータを想定しています。


データ長とは、時系列データを取り扱う上で、どのぐらいの期間をひとかたまりとして学習させるかというものです。


pytorch.org


PyTorch標準のnn.RNNやnn.LSTMのinputの形状は(seq_len, batch, input_size)を想定しています。


このseq_lenがデータ長にあたります。


具体例を挙げると、0.01sec刻みのデータを100個学習させるとなると、1sec間のデータを学習させることになります。


forward内でfor文として回してるのは、この0.01secずつ推論を行っているというイメージです。


これでRNNモデルの構築は完了です。


次は学習に使うデータセットを用意する関数を作りましょう。

dataset.py


時系列データのチュートリアルではsin関数の周波数を予測するものが多く用いられます。


時系列データとしては一番シンプルでわかりやすいですよね。


というわけでsin関数のデータセットを作る関数を用意しました。

import numpy as np

def sin_dataset(n_data, n_test):
    x = np.linspace(0, 2 * np.pi, n_data + n_test)
    ram = np.random.permutation(n_data + n_test)
    x_train = np.sort(x[ram[:n_data]])
    x_test = np.sort(x[ram[n_data:]])

    y_train = np.sin(x_train)
    y_test = np.sin(x_test)

    return x_train, y_train, x_test, y_test

これを「dataset.py」として保存します。


x_train, y_train, x_test, y_testという一般的なデータセット形式で出力できるようにしておきます。


ではこのsin関数を使って、sin関数を予測させるための学習を実行してみたいと思います。

train.py

import numpy as np
import os
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'
import math
#import time
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

from RNN import RNNHardCell, RNNModel
from dataset import sin_dataset

# parameter
epochs = 100
n_input = 1
n_hidden = 10
n_output = 1
num_layers = 2
n_batch = 20
n_data = 1000
n_test = 200

def main():
    # dataset
    x_train, y_train, x_test, y_test = sin_dataset(n_data, n_test)
    y_test_torch = torch.from_numpy(np.asarray(y_test))
    y_test_torch = y_test_torch.unsqueeze(0)

    # model, optimizer, loss function
    model = RNNModel(n_input, n_hidden, n_output, num_layers)
    optimizer = optim.Adam(model.parameters())
    MSE = nn.MSELoss()

    train_loss = []
    test_loss = []

    if os.path.exists("result/loss") == False:
        os.makedirs("result/loss")
    if os.path.exists("result/eval") == False:
        os.makedirs("result/eval")
    if os.path.exists("result/model") == False:
        os.makedirs("result/model")

    # train
    for epoch in range(1, epochs + 1):
        model.train()
        perm = np.random.permutation(n_data)
        sum_loss = 0
        for i in range(0, n_data, n_batch):
            x_batch = x_train[perm[i:i + n_batch]]
            ant_batch = y_train[perm[i:i + n_batch]]
            ant_batch = torch.from_numpy(ant_batch).double()
            ant_batch = ant_batch.unsqueeze(0)

            optimizer.zero_grad()
            y_batch_pred = model(x_batch, ant_batch).double()
            loss = MSE(y_batch_pred, ant_batch)
            loss.backward()
            optimizer.step()
            sum_loss += loss.data * n_batch

        # loss(train)
        ave_loss = sum_loss / n_data
        train_loss.append(ave_loss)

        # loss(test)
        model.eval()
        y_test_pred = model.forward(x_test)
        loss = MSE(y_test_pred, y_test_torch)
        test_loss.append(loss.data)

        # loss display
        if epoch % 100 == 1:
            print("Ep/MaxEp     train_loss     test_loss")

        if epoch % 10 == 0:
            print("{:4}/{}  {:10.5}   {:10.5}".format(epoch, epochs, ave_loss, float(loss.data)))

        if epoch % 20 == 0:
            plt.figure(figsize=(5, 4))
            y_pred = model.forward(x_test)
            # tensor → numpy
            y_pred = y_pred.to('cpu').detach().numpy().copy()
            plt.plot(x_test, y_test, label = "target")
            plt.plot(x_test, y_pred[0], label = "predict")
            plt.legend()
            plt.grid(True)
            plt.xlim(0, 2 * np.pi)
            plt.ylim(-1.2, 1.2)
            plt.xlabel("x")
            plt.ylabel("y")
            plt.savefig("result/eval/ep{}.png".format(epoch))
            plt.clf()
            plt.close()

    # save loss glaph
    plt.figure(figsize=(5, 4))
    plt.plot(train_loss, label = "training")
    plt.plot(test_loss, label = "test")
    plt.yscale('log')
    plt.legend()
    plt.grid(True)
    plt.xlabel("epoch")
    plt.ylabel("loss (MSE)")
    plt.savefig("result/loss/loss_history.png")
    plt.clf()
    plt.close()

    # save best_model
    torch.save(model, "result/model/best_model.pt")


if __name__ == "__main__":
    main()


これを「train.py」として保存します。


PyTorchのtrainソースコードは処理順序がとても分かりやすいかと思います。


ざっくり処理内容の概要を書くと
1.データセットを用意する
2.モデル、オプティマイザー、loss関数を用意する
3.エポック数分のfor文を用意し、その中でpredictとlossの計算をする
4.lossの表示、epoch毎のtestデータによる推論結果を保存する
5.最後にlossの推移とbest_modelを保存する


途中numpyをPyTorchのTensor型にしたり、次元数を増やすためにunsqueezeしたりしてます。


ディープラーニング系は扱うデータのshape合わせと格闘するのが日常です。


では最後に学習した結果を紹介しようと思います。

学習結果


それでは、上記3つのソースコードを同じ場所に置き、「python train.py」で実行してみましょう。


そうすると、resultフォルダが作成され、その中に「eval」「loss」「model」フォルダが作成されます。


まずlossフォルダの中には、lossの推移が表示されます。

f:id:yonesuke0716:20210117144817p:plain
loss推移


おぉ、良い感じに下がってますね。


実行中にも10epoch毎に数値として表示されるようにしてます。


続いてepoch回数が進むごとに、sinの予測結果がどうなったか見てみます。


今回はepochs=100で実施しましたので、20epochと100epochで比較してみます。

f:id:yonesuke0716:20210117145137p:plain
epoch20
f:id:yonesuke0716:20210117145156p:plain
epoch100


20epochは後半予測が上手くいってないのに対して、epoch100の方がよりsin関数を予測出来ていることが分かりますね。


なんとかRNNとして機能したモデルを自作することに成功しました。


時系列データは画像認識に比べてシンプルですし、モデルの構造もデータサイズも軽量なので試行錯誤しやすくていいですね。


今回はepoch数も100回と少ないので、200や300にすればより正確なsin関数を予測すると思います。


また隠れ層の数を10から20に増やしたりすると精度向上につながると思います。


num_layersという変数も用意しており、これを増やせばRNNモデルを複数連結することができるので、これも精度向上が実現できるかと思います。

まとめ

去年の年末からRNNについて勉強してきて約1か月ですが、ようやく実装までたどり着きました。


できるだけ自作するということをコンセプトにしてきましたが、やはり理解が良く深まり、あらためてPyTorch標準のRNNも安心して使えそうです。


フレームワークを使う前に、まずは自分でゴリゴリ書きながら苦戦しながら理解を深めるという勉強法はおススメです。


次回はLSTMに挑戦してみようと思います。


今回はここまで!