はじめに
pytorchの記述方法と対応付けながらpytorch lightningを学びたかったので、自分用のメモとして書きました。
実験条件
設定は以下の通りとします。pytorch_lightning==1.6.4
- データセットはMNIST (入力次元は28×28)
- ネットワークは以下の構成
- Linear層(784→512), ReLU層
- Linear層(512→512), ReLU層
- Linear層(512→10), Softmax層
- バッチサイズは64, エポック数は5
- 損失関数は交差エントロピー誤差
- optimizerはAdamとし, 学習係数(learning rate)は0.001とする
まずはPyTorchで書く
以下のコードはPyTorchチュートリアル(日本語翻訳版)を参考にしております。
注)いらないモジュールも含まれています!
# 必要なモジュールをimport (足りないものがあれば適宜追加してください)
import torch
import torchvision
from torch import nn
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Lambda
import numpy as np
import matplotlib.pyplot as plt
# エポック数, バッチサイズ, 学習係数を設定
epochs = 5
batch_size = 64
learning_rate = 1e-3
# データセットのダウンロード
train_data = MNIST(root='data', train=True, download=True,transform=ToTensor())
test_data = MNIST(root='data', train=False, download=True,transform=ToTensor())
# DatasetオブジェクトをDataloaderを用いてミニバッチ化する
train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=False)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=False)
# モデルの定義
class NeuralNetwork(nn.Module):
def __init__(self, hidden_size=512):
super(NeuralNetwork, self).__init__()
self.flatten = nn.Flatten() # 1次元配列化する関数
self.layer_stack = nn.Sequential( # nn.Sequentialで層(layer)を積み上げる
nn.Linear(28*28, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, 10), nn.Softmax(dim=1) # 行方向で正規化するようにするのでdim=1
)
def forward(self, x): # 順伝搬を計算する. forwardメソッドを呼び出す際は hoge.forward(x)としないでhoge(x)とすることに注意!!!!
x = self.flatten(x) # xを[28, 28]->[784]に1次元配列化する(flatten)
logits = self.layer_stack(x) # 順伝搬して予測値を出力する logits.size() = [10]
return logits
# gpuが使えるならgpuを使う
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# モデルを定義
model = NeuralNetwork().to(device)
# 損失関数の初期化、定義
loss_fn = nn.CrossEntropyLoss()
# optimizerの定義
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# 学習時に最適化を行うためのループを行う
def train_loop(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) # 学習用データの総数
for batch, (X, y) in enumerate(dataloader): # ミニバッチごとに学習を行う
# 予測と損失の計算
X, y = X.to(device), y.to(device) # gpuが使用可能ならばgpuを使う
pred = model(X) # 予測値を算出
loss = loss_fn(pred, y) # 予測値と正解値をもとに損失を計算
# バックプロパゲーション
optimizer.zero_grad() # 勾配を初期化する(とりあえずやる)
loss.backward() # 損失関数の偏微分値を計算する
optimizer.step() # 最適化する(パラメータ値の更新)
if batch % 100 == 0: # バッチサイズ64なので100で割ってあまり0, すなわち6400の倍数分学習データ使った時に損失の値等を出力
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def test_loop(dataloader, model, loss_fn):
size = len(dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad(): # テストセット推論時は特に微分とかする必要ないのでtorch.no_grad()をする!
for X, y in dataloader:
X, y = X.to(device), y.to(device) # gpuが使用可能ならばgpuを使う
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= size # lossの平均値を求める
correct /= size # accuracyを求める
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
# 訓練&テスト
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train_loop(train_dataloader, model, loss_fn, optimizer)
test_loop(test_dataloader, model, loss_fn)
print("Done!")
PyTorch Lightningで書く
PyTorchで記述された上記のコードをPyTorch Lightningを使って書いてみました!上記コードではシード値は設定していませんが、下記コードでは設定しました。
注)pytorchで記述したものと完全に対応している訳でないのでご了承ください。
# 必要なモジュールをimport
import torch
import torchvision
from torch import nn
from torch.utils.data import DataLoader, random_split
import torch.nn.functional as F
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Lambda
import numpy as np
import matplotlib.pyplot as plt
import pytorch_lightning as pl
# エポック数, バッチサイズ, 学習係数を設定
epochs = 5
batch_size = 64
learning_rate = 1e-3
# LightningModule (nn.Module のサブクラス) の定義
class Net(pl.LightningModule):
# モデルの定義
def __init__(self, batch_size=batch_size, hidden_size=512):
super().__init__()
# self.flatten = nn.Flatten()
self.layer_stack = nn.Sequential(
nn.Linear(28*28, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, 10), nn.Softmax(dim=1)
)
self.batch_size = batch_size
# 予測/推論アクションを定義 (required)
def forward(self, x):
# x = self.flatten(x) # 以下のx.viewですでにflattenしている
logits = self.layer_stack(x)
return logits
# 損失関数を定義
def lossfun(self, pred, y):
return F.cross_entropy(pred, y)
# optimizerの設定 (required)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
return optimizer
# train dataを定義
def train_dataloader(self):
train_data = MNIST(root='data', train=True, download=True,transform=ToTensor())
return DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=False)
# train ループを定義 (required)
def training_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1) # flatten
pred = self.forward(x) # 順伝搬
loss = self.lossfun(pred, y) # lossを計算
self.log("train_loss", loss)
return loss
# val dataを定義 (今回はtest dataと同じ)
def val_dataloader(self):
test_data = MNIST(root='data', train=False, download=True,transform=ToTensor())
return DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=False)
# val dataに対するイテレーションごとの動き
def validation_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1) # flatten
pred = self.forward(x)
loss = self.lossfun(pred, y)
pred_labels = torch.argmax(pred, dim=1) # [b, d] -> [b, 1]
acc = torch.sum(y == pred_labels) * 1.0 / len(y) # accuracyを計算
results = {'val_loss': loss, 'val_acc': acc} # lossとaccuracyを保存
return results
# val dataに対するエポックごとの動き
def validation_epoch_end(self, outputs):
# 各イテレーションごとに得られたlossとacccuracyをまとめて、エポックでの平均値の取得
avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
avg_acc =torch.stack([x['val_acc'] for x in outputs]).mean()
results = {'val_loss': avg_loss, 'val_acc': avg_acc}
print(results)
return results
# test dataを定義
def test_dataloader(self):
test_data = MNIST(root='data', train=False, download=True,transform=ToTensor())
return DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=False)
# test dataに対するイテレーションごとの動き (val_stepと同じ)
def test_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1) # flatten
pred = self.forward(x)
loss = self.lossfun(pred, y)
pred_labels = torch.argmax(pred, dim=1) # [b, d] -> [b, 1]
acc = torch.sum(y == pred_labels) * 1.0 / len(y) # accuracyを計算
results = {'test_loss': loss, 'test_acc': acc} # lossとaccuracyを保存
return results
# test dataに対するエポックごとの動き (val_endと同じ)
def test_epoch_end(self, outputs):
# 各イテレーションごとに得られたlossとacccuracyをまとめて、エポックでの平均値の取得
avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
avg_acc =torch.stack([x['test_acc'] for x in outputs]).mean()
self.test_results = {'test_loss': avg_loss, 'test_acc': avg_acc}
print(self.test_results)
return self.test_results
# gpuが使えるならgpuを使う
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = Net().to(device) # モデルを宣言
# シード値を設定 (plでのみ行っています)
pl.seed_everything(42, workers=True)
# 訓練&テスト
trainer = pl.Trainer(max_epochs=epochs, gpus=1 if torch.cuda.is_available() else 0, deterministic=True) # trainerの設定(epoch, gpu使うか, シード値固定するか)
trainer.fit(model) # 訓練を行う
trainer.test()
誰かのお役に立てれば幸いです。Tensorboardの使い方とかも追記したいです。