ResNet風のアーキテクチャを自分のモデルに取り込もうと思って,ResNetをスクラッチ実装したので共有.ちなみに,研究世界でもResNetのアーキテクチャはデファクトになっていて,その後はいかに広く深くするかに取り組んだ研究が多い.前処理にはZCA白色化を用いてみた.
ZCA白色化
PCAを利用して白色化(無相関化+分散正規化)した後もとの空間に戻す
PyTorchコード
import numpy as np
import pandas as pd
import torch
from torchvision import transforms
from tqdm import tqdm_notebook as tqdm
from PIL import Image
from sklearn.model_selection import train_test_split
# 学習データ
x_train = np.load('/content/drive/MyDrive/Lectures/深層学習/chap07/data/x_train.npy')
t_train = np.load('/content/drive/MyDrive/Lectures/深層学習/chap07/data/t_train.npy')
# テストデータ
x_test = np.load('/content/drive/MyDrive/Lectures/深層学習/chap07/data/x_test.npy')
class train_dataset(torch.utils.data.Dataset):
def __init__(self, x_train, t_train):
data = x_train.astype('float32')
self.x_train = []
for i in range(data.shape[0]):
self.x_train.append(Image.fromarray(np.uint8(data[i])))
self.t_train = t_train
self.transform = transforms.ToTensor()
def __len__(self):
return len(self.x_train)
def __getitem__(self, idx):
return self.transform(self.x_train[idx]), torch.tensor(t_train[idx], dtype=torch.long)
class test_dataset(torch.utils.data.Dataset):
def __init__(self, x_test):
data = x_test.astype('float32')
self.x_test = []
for i in range(data.shape[0]):
self.x_test.append(Image.fromarray(np.uint8(data[i])))
self.transform = transforms.ToTensor()
def __len__(self):
return len(self.x_test)
def __getitem__(self, idx):
return self.transform(self.x_test[idx])
trainval_data = train_dataset(x_train, t_train)
test_data = test_dataset(x_test)
import random
import os
from torch.optim.lr_scheduler import ReduceLROnPlateau, OneCycleLR
import albumentations
seed = 42
batch_size = 32
def seed_everything(seed):
random.seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benckmark = True
seed_everything(seed=seed)
class gcn():
def __init__(self):
pass
def __call__(self, x):
mean = torch.mean(x)
std = torch.std(x)
return (x - mean)/(std + 10**(-6)) # 0除算を防ぐ
class ZCAWhitening():
def __init__(self, epsilon=1e-4, device="cuda"): # 計算が重いのでGPUを用いる
self.epsilon = epsilon
self.device = device
def fit(self, images): # 変換行列と平均をデータから計算
x = images[0][0].reshape(1, -1)
self.mean = torch.zeros([1, x.size()[1]]).to(self.device)
con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
for i in range(len(images)): # 各データについての平均を取る
x = images[i][0].reshape(1, -1).to(self.device)
self.mean += x / len(images)
con_matrix += torch.mm(x.t(), x) / len(images)
if i % 10000 == 0:
print("{0}/{1}".format(i, len(images)))
self.E, self.V = torch.symeig(con_matrix, eigenvectors=True) # 固有値分解
self.E = torch.max(self.E, torch.zeros_like(self.E)) # 誤差の影響で負になるのを防ぐ
self.ZCA_matrix = torch.mm(torch.mm(self.V, torch.diag((self.E.squeeze()+self.epsilon)**(-0.5))), self.V.t())
print("completed!")
def __call__(self, x):
size = x.size()
x = x.reshape(1, -1).to(self.device)
x -= self.mean
x = torch.mm(x, self.ZCA_matrix.t())
x = x.reshape(tuple(size))
x = x.to("cpu")
return x
# (datasetのクラスを自作したので、このあたりの処理が少し変わっています)
zca = ZCAWhitening()
zca.fit(trainval_data)
val_size = 3000
train_data, val_data = torch.utils.data.random_split(trainval_data, [len(trainval_data)-val_size, val_size])
# 前処理を定義
transform_train = transforms.Compose([transforms.RandomCrop(32, padding=(4, 4, 4, 4), padding_mode='constant'),
transforms.RandomHorizontalFlip(p=0.5),
transforms.ToTensor(),
zca])
transform = transforms.Compose([transforms.ToTensor(),
zca])
trainval_data.transform = transform_train
test_data.transform = transform
dataloader_train = torch.utils.data.DataLoader(
train_data,
batch_size=batch_size,
shuffle=True
)
dataloader_valid = torch.utils.data.DataLoader(
val_data,
batch_size=batch_size,
shuffle=True
)
dataloader_test = torch.utils.data.DataLoader(
test_data,
batch_size=batch_size,
shuffle=False
)
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F
rng = np.random.RandomState(1234)
random_state = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 3x3 convolution
def conv3x3(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False)
# Residual block
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
# ResNet
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=10):
super(ResNet, self).__init__()
self.in_channels = 16
self.conv = conv3x3(3, 16)
self.bn = nn.BatchNorm2d(16)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self.make_layer(block, 16, layers[0])
self.layer2 = self.make_layer(block, 32, layers[1], 2)
self.layer3 = self.make_layer(block, 64, layers[2], 2)
self.avg_pool = nn.AvgPool2d(8)
self.fc = nn.Linear(64, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels):
downsample = nn.Sequential(
conv3x3(self.in_channels, out_channels, stride=stride),
nn.BatchNorm2d(out_channels))
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels
for i in range(1, blocks):
layers.append(block(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv(x)
out = self.bn(out)
out = self.relu(out)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.avg_pool(out)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out
conv_net = ResNet(ResidualBlock, [2, 2, 2]).to(device)
n_epochs = 100
init_lr = 1e-04
device = 'cuda'
conv_net.to(device)
optimizer = optim.Adam(conv_net.parameters(), lr=init_lr)
scheduler = OneCycleLR(optimizer, max_lr=init_lr, steps_per_epoch=len(dataloader_train), epochs=n_epochs, pct_start=0.2)
loss_function = nn.CrossEntropyLoss()
for epoch in range(n_epochs):
losses_train = []
losses_valid = []
conv_net.train()
n_train = 0
acc_train = 0
for x, t in dataloader_train:
n_train += t.size()[0]
conv_net.zero_grad()
x = x.to(device)
t = t.to(device)
y = conv_net.forward(x)
loss = loss_function(y, t)
loss.backward()
optimizer.step()
scheduler.step()
pred = y.argmax(1)
acc_train += (pred == t).float().sum().item()
losses_train.append(loss.tolist())
conv_net.eval()
n_val = 0
acc_val = 0
for x, t in dataloader_valid:
n_val += t.size()[0]
x = x.to(device)
t = t.to(device)
y = conv_net.forward(x)
loss = loss_function(y, t)
pred = y.argmax(1)
acc_val += (pred == t).float().sum().item()
losses_valid.append(loss.tolist())
print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]'.format(
epoch,
np.mean(losses_train),
acc_train/n_train,
np.mean(losses_valid),
acc_val/n_val
))
conv_net.eval()
t_pred = []
for x in dataloader_test:
x = x.to(device)
# 順伝播
y = conv_net.forward(x)
# モデルの出力を予測値のスカラーに変換
pred = y.argmax(1).tolist()
t_pred.extend(pred)
submission = pd.Series(t_pred, name='label')
submission.to_csv('submission_pred_sub.csv', header=True, index_label='id')
結果
86%くらい.通常のモデルだと80%くらい.
通常のモデル
conv_net = nn.Sequential(
nn.Conv2d(3, 32, 3), # 32x32x3 -> 30x30x32
nn.BatchNorm2d(32),
nn.ReLU(),
nn.AvgPool2d(2), # 30x30x32 -> 15x15x32
nn.Conv2d(32, 64, 3), # 15x15x32 -> 13x13x64
nn.BatchNorm2d(64),
nn.ReLU(),
nn.AvgPool2d(2), # 13x13x64 -> 6x6x64
nn.Conv2d(64, 128, 3), # 6x6x64 -> 4x4x128
nn.BatchNorm2d(128),
nn.ReLU(),
nn.AvgPool2d(2), # 4x4x128 -> 2x2x128
nn.Flatten(),
nn.Linear(2*2*128, 256),
nn.ReLU(),
nn.Linear(256, 10)
)