はじめに
PFNがChainerの開発を終了したことに伴ってPytorch (https://pytorch.org/) へ移行。
その際のLSTMの実装に関するメモ。
(2020年時点のメモです)
PytorchのLSTM実装
参考になったサイト【PyTorchのRNNとRNNCell】
(https://takoroy-ai.hatenadiary.jp/entry/2018/06/10/203531)
PytorchにはLSTMとLSTMCellの2種類ある。
ChainerのLSTMに相当するのはLSTMCellの方である。
簡単に言うと、LSTMは学習したい系列データの全時系列を一度にモデルに入れる。すなわち、Chainerで実装していたように系列データをfor文で1ステップずつ入力する必要はなく、実装は楽。また、LSTMではClassの定義の際にも多層LSTMを1行で書くことができるため、すっきりしたコードが書ける。
一方でLSTMCellはChainerのLSTMと同様にfor文で系列データを1ステップずつ入出力する必要がある。Chainerと違うポイントは、LSMTの隠れ状態を自分で定義して次ステップの入力に与えなければいけないところ。Chainerではそこが自動化されていたが、Pytorchでは明記すること。
実装はおそらくLSTMの方が楽だが、用途的にLSTMCellを使わなければいけないため実装してみた。
実装例(正弦波の学習・推論)
コードはhttps://github.com/norimaki-senbei/Pytorch-LSTMandLSTMCell
扱う問題は下図のように正弦波を学習(1周期・100プロット)
学習時と同じ波形を推論
共通部分
import関係
import numpy as np
import matplotlib.pylab as plt
# pytorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.tensor as tensor
# module
from module.param import *
from module import Class
moduleというディレクトリ内にparam.py(各パラメータを記載)とClass.py(LSTMのクラス用プログラム)を保存。
sin波の生成
# データの取得
def get_dataset():
x = np.linspace(0, 2*np.pi, N)
t = np.sin(x)
x = np.array(x,dtype="float32").reshape(N,BATCH_SIZE)
t = np.array(t,dtype="float32").reshape(N,BATCH_SIZE)
return x, t
各パラメータの定義
# パラメータの定義
#データ何プロットを学習するか
N = 100
#データの数
DataNumber = 1
#入出力の次元
in_size = 1
out_size = 1
#中間ノードの次元
Node = 50
#学習回数
EPOCH_NUM = 500
#バッチサイズ
BATCH_SIZE = 1
モデル・オプティマイザー・ロス関数の定義
#モデルの定義
LSTM_MODEL = Class.LSTM(in_size, Node, out_size)
#オプティマイザーの定義
optimizer = optim.Adam(LSTM_MODEL.parameters())
#ロス関数の定義
LossFunction = nn.MSELoss()
torch.nn.LSTM
- 入力:x (0~2π)
- 出力:y = sin(x)
クラスの定義
# モデルクラス定義
class LSTM(nn.Module):
def __init__(self, in_size, hidden_size, out_size):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = 2 #LSTMの層数定義
self.lstm = nn.LSTM(in_size, hidden_size, num_layers=self.num_layers)
self.linear = nn.Linear(hidden_size, out_size)
def forward(self, x):
#(隠れ状態の初期化・中間2章の場合は[ [h1,c1], [h2,c2] ])ここ自動化しないと
hiddens = [ torch.zeros(self.num_layers, 1, self.hidden_size), torch.zeros(self.num_layers, 1, self.hidden_size)]
output, [h,c] = self.lstm(x,hiddens)#隠れ状態の入出力を忘れないように
output = self.linear(output)
return output
学習ループ
def train():
print("/**********************\n* Train *\n**********************/")
train_x, train_t = get_dataset()
# 学習する関数の表示
plt.plot(train_x, train_t, label="training data", marker=".")
plt.show()
#pytorchのLSTMの入力の形(sequence length, batch size, input size)に変更
train_x = train_x[:,:,np.newaxis]
train_t = train_t[:,:,np.newaxis]
# 学習
for epoch in range(EPOCH_NUM):
# 誤差の初期化
loss = 0
# 勾配の初期化
LSTM_MODEL.zero_grad()
X = train_x
T = train_t
#numpyからtensor型へ変換
X = torch.from_numpy(X)
T = torch.from_numpy(T)
#モデルに入力
Y = LSTM_MODEL(x=X)
#誤差の計算
loss = LossFunction(Y, T)
#バックプロパゲーション
loss.backward()
#パラメータ更新
optimizer.step()
if (epoch+1) % 10 == 0:
print("epoch:\t{}\tloss:\t{}".format(epoch+1, loss))
推論
def test():
test_x, test_t = get_dataset()
test_x = test_x[:,:,np.newaxis]
test_t = test_t[:,:,np.newaxis]
LSTM_MODEL.zero_grad()
#推論
testdata = test_x
testdata = torch.from_numpy(testdata)
predict = LSTM_MODEL(testdata)
# tensorからnumpyに変換
predict = predict.to('cpu').detach().numpy().copy()
# これ以降はグラフ表示のためのいろいろ
plt.scatter(test_x.reshape(N), predict.reshape(N), marker=".",label="predicted", color="red")
plt.plot(test_x.reshape(N), test_t.reshape(N), label="true")
plt.show()
torch.nn.LSTMCell
- 入力:y = sin(x) (ステップk)
- 出力:y = sin(x) (ステップk+1)
前の出力y(k)から次の出力y(k+1)を予測するモデルを学習
推論時は最初の1点y(0)を入力し、その後は随時推論値を次のステップの入力とする
クラスの定義
# モデルクラス定義
class LSTM(nn.Module):
def __init__(self, in_size, hidden_size, out_size):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.lstm1 = nn.LSTMCell(in_size, hidden_size)
self.lstm2 = nn.LSTMCell(hidden_size, hidden_size)
self.linear = nn.Linear(hidden_size, out_size)
def forward(self, x, hiddens):
h1, c1 = self.lstm1(x,hiddens[0])
h2, c2 = self.lstm2(h1,hiddens[1])
output = self.linear(h2)
return output, [ [h1, c1], [h2, c2] ]
def initHidden(self):
#LSTM2層の場合は[ [h1, c1], [h2, c2] ]の形でhidden初期化
return [ [torch.zeros(BATCH_SIZE, self.hidden_size), torch.zeros(BATCH_SIZE, self.hidden_size)],[torch.zeros(BATCH_SIZE, self.hidden_size), torch.zeros(BATCH_SIZE, self.hidden_size)] ]
学習ループ
def train():
print("/**********************\n* Train *\n**********************/")
train_x, train_t = get_dataset()
# 学習する関数の表示
plt.plot(train_x, train_t, label="training data", marker=".")
plt.show()
# 学習
for epoch in range(EPOCH_NUM):
# 誤差の初期化
loss = 0
# 勾配の初期化
hidden = LSTM_MODEL.initHidden()
LSTM_MODEL.zero_grad()
for i in range(N-1):
X = train_t[i].reshape(1,1)
T = train_t[i+1].reshape(1,1)
X = torch.from_numpy(X)
T = torch.from_numpy(T)
Y, hidden = LSTM_MODEL(x=X, hiddens=hidden)
loss += LossFunction(Y, T)
loss = loss / N
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
print("epoch:\t{}\tloss:\t{}".format(epoch+1, loss))
推論
def test():
test_x, test_y = get_dataset()
LSTM_MODEL.zero_grad()
predict_all = []
#推論
for i in range(N-1):
if i == 0:
testdata = test_y[i].reshape(1,1)
predict_all.append(testdata)
testdata = torch.from_numpy(testdata)
hidden = LSTM_MODEL.initHidden()
else:
testdata = predict.reshape(1,1)
predict, hidden = LSTM_MODEL(testdata, hiddens=hidden)
predict_all.append(predict.to('cpu').detach().numpy().copy())
# これ以降はグラフ表示のためのいろいろ
predict_all = np.array(predict_all,dtype="float32").reshape(N)
plt.scatter(test_x, predict_all, marker=".",label="predicted", color="red")
plt.plot(test_x, test_y, label="true")
plt.show()
結果
ちゃんと推論できました。(左:LSTM、右:LSTMCell 、赤プロット:推論値)
おわりに
まだGPUに対応してない