はじめに
前回の記事でKerasを使ってCIFAR-10の画像分類に取り組んだので、今回はPyTorchを用いて同じ構造のCNNを実装してみる。
また、今回からcolab Proにアップグレードしてみた。これで煩わしいランタイム問題とお𠮟り時間から(ある程度は)解放されるはず.......
環境
google colaboratory Pro
Python 3.7.13
PyTorch 1.11.0+cu113
実装
1.ライブラリインポート
必要なライブラリを読み込む。
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torchsummary import summary
2.クラス継承/作成
PyTorchではまずネットワークを構成するクラスをtorch.nnから継承して作っていく。
モデルのレイヤーを一つずつ設定していくが、PyTorchでは入力の数も引数に渡さなければならないため、畳み込みやプーリングで画素数がどうなったか計算しながら作る必要がある。今回はカラー画像なので入力チャンネル数は3となる。畳み込み層ではすべてパディングを行っているため、32×32で入力した画像はプーリング層を3回通り、出力される時には4×4になっている。これが平滑化され、全結合層に渡される。パディングのサイズとモードは畳み込み層の引数で設定でき、"replicate"は0で画像の回りを埋めるモード。
class CNN(nn.Module):
def __init__(self, num_class):
super().__init__()
self.feature = nn.Sequential(
# ブロック1
# (チャンネル数, フィルタ枚数, カーネルサイズ)
nn.Conv2d(3, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.Conv2d(128, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
# ブロック2
nn.Conv2d(128, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
# ブロック3
nn.Conv2d(64, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25)
)
# 平滑化
self.flatten = nn.Flatten()
# 全結合
self.classifier = nn.Sequential(
nn.Linear(4*4*32, 512),
nn.Dropout(0.6),
nn.Linear(512, num_class)
)
def forward(self, input_data):
input_data = self.feature(input_data)
input_data = self.flatten(input_data)
input_data = self.classifier(input_data)
return input_data
3.データの読み込み
load_dataではデータを読み込む。
データを読み込んだあと、データセットからバッチサイズごとにデータを取り出したdataloaderを作る。深層学習で画像を扱う際には正則化を行う必要があるため、ここで読み込みと一緒に正規化する。Normalizeがなんなのか探してみたが、特に決まってないそう→「pytorchのNormalizationで平均と標準偏差を0.5にする理由がわかりません」
あとで色々と使うので、正解クラスのラベルもここで作っておく。
def load_data(BATCH_SIZE):
# transform定義
totensor = transforms.ToTensor() # (高さ,幅,チャネル)で画素値[0,255]を(チャネル,幅,高さ)で画素値[0,1]のTensorに変換
normalize = transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]) # [0,1]の範囲の値を[-1,1]の範囲の値に変換
transform = transforms.Compose([totensor, normalize])
# データ読み込み
data_folder = "./data2"
train_data = CIFAR10(root=data_folder, train=True, download=True, transform=transform)
train_data_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=2) # multi-process data loadingとなり、処理が高速化
test_data = CIFAR10(root=data_folder, train=False, download=True, transform=transform)
test_data_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
# クラス名
class_name = ("Plane", "Car", "Bird", "Cat", "Deer", "Dog", "Frog", "Horse", "Ship", "Truck")
return train_data_loader, test_data_loader, class_name
4.学習
trainでは、学習を行う。
諸々の学習の設定をした後、train_dataからバッチサイズずつデータセットを取り出してmodelに学習させていく。kerasだとこのあたりの実装がcompileやfit関数だけで済むが、PyTorchではそのあたりを自分で実装する必要がある(その分中身をいじりやすいという利点はある)。あとで学習の経過を描画したいので、lossやaccuracyを格納するlistを格納(ややこしい)したhistory(dict型)を作って、その中に経過を入れている。
PyTorchには自動微分機能があるため、学習の途中や学習後に検証などを行う際にはwith torch.no_grad():で勾配の更新が行われないようにする必要があるので注意。
def train(train_data_loader, test_data_loader, class_name):
# GPU設定
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
num_class = len(class_name)
# モデルとGPUセット
model = CNN(num_class)
model.to(device)
# 損失関数
criterion = nn.CrossEntropyLoss()
# 最適化手法
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 学習経過を格納するdict
history = {"loss":[], "accuracy":[], "val_loss":[], "val_accuracy":[]}
MAX_EPOCH = 50
for epoch in range(MAX_EPOCH):
train_loss = 0.
train_acc = 0.
valid_loss = 0.
valid_acc = 0.
train_total = 0
valid_total = 0
# 学習
for data in train_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
optimizer.zero_grad() # 勾配情報をリセット
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, 確率×10)
loss = criterion(pred, labels) # 誤差逆伝播の微分計算
train_loss += loss.item() # 誤差(train)を格納
loss.backward()
optimizer.step() # 勾配を計算
values, indices = torch.max(pred.data, axis=1) # 最も確率が高いラベルの確率と引数をbatch_sizeの数だけ取り出す
train_acc += (indices==labels).sum().item() # labelsと一致した個数
train_total += labels.size(0) # データ数(=batch_size)
history["loss"].append(train_loss) # 1epochあたりの誤差の平均を格納
history["accuracy"].append(train_acc/train_total) # 正解数/使ったtrainデータの数
# 学習ごとの検証
with torch.no_grad():
for data in test_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, num_class)
loss = criterion(pred, labels) # 誤差の計算
valid_loss += loss.item() # 誤差(valid)を格納
values, indices = torch.max(pred.data, axis=1) # 最も確率が高い引数をbatch_sizeの数だけ取り出す
valid_acc += (indices==labels).sum().item()
valid_total += labels.size(0) # データ数(=batch_size)
history["val_loss"].append(valid_loss) # 1epochあたりの検証誤差の平均を格納
history["val_accuracy"].append(valid_acc/valid_total) # 正解数/使ったtestデータの数
# 5の倍数回で結果表示
if (epoch+1)%5==0:
# f文字列では``と""は共存できないので変える
print(f'Epoch:{epoch+1:d} loss:{history["loss"][-1]:.3f} accuracy{history["accuracy"][-1]:.3f} val_loss{history["val_loss"][-1]:.3f} val_accuracy{history["val_accuracy"][-1]:.3f}')
return model, history
5.正解率算出
cal_accuracyでは、全体の正解率とクラスごとの正解率を計算している。
ここでは学習が終わったモデルにtestデータをバッチサイズずつ渡し、出てきたバッチサイズ個のデータをfor文で1個ずつ正解or不正解を判定してlistに格納している。
def cal_accuracy(train_data_loader, test_data_loader, class_name, model):
# GPU設定
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 個数を格納するlist
total = 0
sum_correct = 0
num_class = len(class_name)
class_total = [0 for i in range(num_class)]
class_correct = [0 for i in range(num_class)]
# 学習後のモデルで正解率算出
with torch.no_grad():
for data in test_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, num_class)
values, indices = torch.max(pred.data, axis=1) # 最も確率が高い引数をbatch_sizeの数だけ取り出す
total += labels.size(0) # データ数(=batch_size)
for i in range(len(labels)):
label = labels[i] # 0~9のいずれか
sum_correct += (1 if indices[i]==labels[i] else 0) # 正解した個数を累積
class_correct[label] += (1 if indices[i]==labels[i] else 0) # 正解ならlabel番目の正解数を+1
class_total[label] += 1 # label番目の個数を+1
print("-"*100)
print("正解数:", sum_correct)
print("データ数:", total)
print("正解率:", sum_correct/total*100)
# クラスごとの正解率
print("-"*100)
print("testデータの分類結果")
for i in range(num_class):
print(f"{class_name[i]:5s} クラスの正解率:{class_correct[i]/class_total[i]*100:.1f}%")
6.学習経過の描画
plot_figでは、学習の経過を描画している。
ここはkerasの時とほぼ同じで、historyの中身をmatplotlibを使って描画している。名前をhistoryとしたが、kerasで学習経過を入れていたhistoryとは違うのでhistory.history["Key"]ではない点に注意。
def plot_fig(history):
# lossとaccuracyを可視化
print("-"*100)
print("BatchNormalizationあり")
plt.figure(1, figsize=(13,4))
plt.subplots_adjust(wspace=0.5)
# 学習曲線
plt.subplot(1, 2, 1)
plt.plot(history["loss"], label="train")
plt.plot(history["val_loss"], label="valid")
plt.title("train and valid loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.grid()
# 精度表示
plt.subplot(1, 2, 2)
plt.plot(history["accuracy"], label="train")
plt.plot(history["val_accuracy"], label="valid")
plt.title("train and valid accuracy")
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.legend()
plt.grid()
plt.show()
7.実行
最後に、これまで定義した関数を実行するmain関数を定義して終わり。
# メイン関数
def main():
train_data_loader, test_data_loader, class_name = load_data(1024)
model, history = train(train_data_loader, test_data_loader, class_name)
cal_accuracy(train_data_loader, test_data_loader, class_name, model)
plot_fig(history)
# 実行
main()
実行すると以下のような出力になった。
load_data()
train()
cal_accuracy()
plot_fig()
正解率は78%ほどでkerasの学習結果↓とほぼ同じだが、こちらの方が精度がいい(学習誤差っぽいのでほぼ一緒)。ただし、今回lossはepochごとに累積したものをそのままアウトプットしているので値が全然違う。
Keras
ちなみに、前回のおまけで記載したBatchNormalizationを採用すると、精度が向上した。(クラスを定義するときにBatchNormalizationをいれるだけ)
class CNN(nn.Module):
def __init__(self, num_class):
super().__init__()
self.feature = nn.Sequential(
# ブロック1
# (チャンネル数, フィルタ枚数, カーネルサイズ)
nn.Conv2d(3, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.Conv2d(128, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
# ブロック2
nn.Conv2d(128, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
# ブロック3
nn.Conv2d(64, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25)
)
# 平滑化
self.flatten = nn.Flatten()
# 全結合
self.classifier = nn.Sequential(
nn.Linear(4*4*32, 512),
nn.Dropout(0.6),
nn.Linear(512, num_class)
)
def forward(self, input_data):
input_data = self.feature(input_data)
input_data = self.flatten(input_data)
input_data = self.classifier(input_data)
return input_data
testデータの正解率が上がっているが、同時にtrainデータの正解率も上がっている。やはり過学習という問題が付きまとうが、画像分類の練習としては十分に理解が深まった。より精度を上げたいなら、CNNクラスの中身や損失関数、最適化手法、学習率のパラメータなどを調整していくことになる。
おまけ①
今回の学習はcolab ProのGPUを使ってだいたい20~30分くらいかかるが、回していた時にランタイムが切断されてしまうことがあった(頑張れ有料版)。
↑GPUのメモリをけっこうギリギリまで使っている。(犯人は当然ながら学習部分)
無料版で回す際には、それぞれの関数の中身を一つのセルに入れて回すと上手くいったりもしたので、載せておく。
#############################
# BatchNormalizeなし
#############################
# 畳み込みニューラルネットワーク
class CNN(nn.Module):
def __init__(self, num_class):
super().__init__()
self.feature = nn.Sequential(
nn.Conv2d(3, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"), # (チャネル数, フィルタ枚数, カーネルサイズ)
# nn.BatchNorm2d(128),
nn.ReLU(),
nn.Conv2d(128, 128, kernel_size=3, padding=(1,1), padding_mode="replicate"),
# nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
nn.Conv2d(128, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"), # (チャネル数, フィルタ枚数, カーネルサイズ)
# nn.BatchNorm2d(64),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, padding=(1,1), padding_mode="replicate"),
# nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25),
nn.Conv2d(64, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"), # (チャネル数, フィルタ枚数, カーネルサイズ)
# nn.BatchNorm2d(32),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=(1,1), padding_mode="replicate"),
# nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d((2,2)),
nn.Dropout(0.25)
)
self.flatten = nn.Flatten()
self.classifier = nn.Sequential(
nn.Linear(4*4*32, 512),
nn.Dropout(0.6),
nn.Linear(512, num_class)
)
def forward(self, input_data):
input_data = self.feature(input_data)
input_data = self.flatten(input_data)
input_data = self.classifier(input_data)
return input_data
#############################
# def load_data(BATCH_SIZE):
#############################
BATCH_SIZE = 1024
# transform定義
totensor = transforms.ToTensor() # (高さ,幅,チャネル)で画素値[0,255]を(チャネル,幅,高さ)で画素値[0,1]のTensorに変換
normalize = transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]) # [0,1]の範囲の値を[-1,1]の範囲の値に変換
transform = transforms.Compose([totensor, normalize])
# transform = totensor
# データ読み込み
data_folder = "./data2"
train_data = CIFAR10(root=data_folder, train=True, download=True, transform=transform)
train_data_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=2) # multi-process data loadingとなり、処理が高速化
test_data = CIFAR10(root=data_folder, train=False, download=True, transform=transform)
test_data_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
# クラス名
class_name = ("Plane", "Car", "Bird", "Cat", "Deer", "Dog", "Frog", "Horse", "Ship", "Truck")
################################################################
# def train(train_data_loader, test_data_loader, class_name):
################################################################
# GPU設定
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
num_class = len(class_name)
# モデルとGPUセット
model = CNN(num_class)
model.to(device)
# 損失関数
criterion = nn.CrossEntropyLoss()
# 最適化手法
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 学習経過を格納するdict
history = {"loss":[], "accuracy":[], "val_loss":[], "val_accuracy":[]}
MAX_EPOCH = 50
for epoch in range(MAX_EPOCH):
train_loss = 0.
train_acc = 0.
valid_loss = 0.
valid_acc = 0.
train_total = 0
valid_total = 0
# 学習
for data in train_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
optimizer.zero_grad() # 勾配情報をリセット
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, 確率×10)
loss = criterion(pred, labels) # 誤差逆伝播の微分計算
train_loss += loss.item() # 誤差(train)を格納
loss.backward()
optimizer.step() # 勾配を計算
values, indices = torch.max(pred.data, axis=1) # 最も確率が高いラベルの確率と引数をbatch_sizeの数だけ取り出す
train_acc += (indices==labels).sum().item() # labelsと一致した個数
train_total += labels.size(0) # データ数(=batch_size)
history["loss"].append(train_loss) # 1epochあたりの誤差の平均を格納
history["accuracy"].append(train_acc/train_total) # 正解数/使ったtrainデータの数
# 学習ごとの検証
with torch.no_grad():
for data in test_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, num_class)
loss = criterion(pred, labels) # 誤差の計算
valid_loss += loss.item() # 誤差(valid)を格納
values, indices = torch.max(pred.data, axis=1) # 最も確率が高い引数をbatch_sizeの数だけ取り出す
valid_acc += (indices==labels).sum().item()
valid_total += labels.size(0) # データ数(=batch_size)
history["val_loss"].append(valid_loss) # 1epochあたりの検証誤差の平均を格納
history["val_accuracy"].append(valid_acc/valid_total) # 正解数/使ったtestデータの数
# 5の倍数回で結果表示
if (epoch+1)%5==0:
# f文字列では``と""は共存できないので変える
print(f'Epoch:{epoch+1:d} loss:{history["loss"][-1]:.3f} accuracy{history["accuracy"][-1]:.3f} val_loss{history["val_loss"][-1]:.3f} val_accuracy{history["val_accuracy"][-1]:.3f}')
#######################################################################################
# def cal_accuracy(train_data_loader, test_data_loader, class_name, model):
#######################################################################################
# GPU設定
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# 個数を格納するlist
total = 0
sum_correct = 0
num_class = len(class_name)
class_total = [0 for i in range(num_class)]
class_correct = [0 for i in range(num_class)]
# 学習後のモデルで正解率算出
with torch.no_grad():
for data in test_data_loader:
images, labels = data # データ取り出し
images, labels = Variable(images), Variable(labels) # torch.autograd.Variableに変換
images, labels = images.to(device), labels.to(device) # GPUに転送
pred = model(images) # モデルから予測を計算(順伝播計算):tensor(BATCH_SIZE, num_class)
values, indices = torch.max(pred.data, axis=1) # 最も確率が高い引数をbatch_sizeの数だけ取り出す
total += labels.size(0) # データ数(=batch_size)
for i in range(len(labels)):
label = labels[i] # 0~9のいずれか
sum_correct += (1 if indices[i]==labels[i] else 0) # 正解した個数を累積
class_correct[label] += (1 if indices[i]==labels[i] else 0) # 正解ならlabel番目の正解数を+1
class_total[label] += 1 # label番目の個数を+1
print("-"*100)
print("正解数:", sum_correct)
print("データ数:", total)
print("正解率:", sum_correct/total*100)
# クラスごとの正解率
print("-"*100)
print("testデータの分類結果")
for i in range(num_class):
print(f"{class_name[i]:5s} クラスの正解率:{class_correct[i]/class_total[i]*100:.1f}%")
#############################
# def plot_fig(history):
#############################
# lossとaccuracyを可視化
print("-"*100)
print("BatchNormalizationなし")
plt.figure(1, figsize=(13,4))
plt.subplots_adjust(wspace=0.5)
# 学習曲線
plt.subplot(1, 2, 1)
plt.plot(history["loss"], label="train")
plt.plot(history["val_loss"], label="valid")
plt.title("train and valid loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.grid()
# 精度表示
plt.subplot(1, 2, 2)
plt.plot(history["accuracy"], label="train")
plt.plot(history["val_accuracy"], label="valid")
plt.title("train and valid accuracy")
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.legend()
plt.grid()
plt.show()
おまけ②
余談だが、データ数に対してバッチサイズが中途半端な値の時ってどうなるんだろう?と思い調べたところ、端数のデータは使用されないらしい(ドキュメントにもここら辺の扱いは特には書かれてないそうな)。まぁシャッフルしてバッチサイズ分のデータセットを作る過程でどのデータも学習には使われると思うが、ますますバッチサイズって2の階乗である意味が無いように思えてきた...いやないと思うけど笑
参考
CIFAR-10:物体カラー写真(乗り物や動物など)の画像データセット
PyTorchは誤差逆伝播とパラメータ更新をどうやって行っているのか?
Pytorchの基礎 forwardとbackwardを理解する
PyTorch 1.8 : PyTorch の学習 : 分類器を訓練する – CIFAR-10
PyTorch 学習メモ (Karasと同じモデルを作ってみた)
PyTorch学習メモ2 (事前学習済みモデルを使ってみた)