ResNetについて構造の説明と実装のメモ書きです。
ただし、論文すべてを見るわけでなく構造のところを中心に見ていきます。
以下の論文について実装を行っていきます。
Deep Residual Learning for Image Recognition
Residual Learning(残差学習)
入力を$x$としたときに、複数の層で構成されるネットワークによって変換された出力結果が$H(x)$と表されるとします。
ここで、residual function(残差関数)を$F(x)=H(x)-x$と定義します。
もとの変換を表す関数$H(x)$は、$H(x)=F(x)+x$と書くことができるので、入力と残差の和として近似できることがわかります。
本論文では、以下の式で表されるネットワークを定義します。
y=F(x, \{W_i\})+x
$F()$は学習対象の残差マッピングを表しています。
例えば、2層のネットワークで構成されているとすると、$F(x)=W_2\sigma(W_1x)$のように書くことができます。
$\sigma()$はReLUによる非線形変換とします。
ここで、$F()$で表されるネットワークを残差ネットワーク(residual network)、$x$の方を恒等ネットワーク(identify network)またはショートカット(shortcut connection)と呼ぶこととします。
$F(x)$と$x$の次元は一致しなければいけません。
チャンネル数を変化させた場合は、ショートカットの方も線形変換(1×1の畳み込み)によってチャンネル数を変える必要があります。
この場合のショートカットは射影ネットワーク(projection network)と呼ぶことにします。
y=F(x, \{W_i\})+W_sx
bottleneck
bottleneckと呼ばれる構造も導入します。
下図の右側のネットワークであり、3つの層から構成されます。
3×3の畳み込み層が1×1の畳み込み層に挟み込まれたような構造であることが分かります。
1番目の1×1の畳み込み層では、チャンネル数の削減を行います。
2番目の3×3の畳み込み層では、通常の畳み込み処理を行いますがstrideでの次元削減はここに設定します。
最後の1×1の畳み込み層でチャンネル数を増加させショートカットとの和を取り出力とします。
ここでも、残差ネットワークとショートカットで次元が一致しなければ射影ネットワークを使用します。
構造
下図にResNetの概要を示します。
これは、34層のネットワークのものになります。
ショートカットの構造が続いていますが、残差ネットワークとショートカットの次元が一致すれば恒等変換を使い、
次元が異なる場合は、先ほど説明したネットワーク(1×1の畳み込み)で次元を一致させます。(または、ゼロパディングを行う。)
対象となるネットワークは図の破線になっているショートカットとなります。
ここでは、チャンネル数が増加し、strideを2として画像のサイズを半分にしています。
従って、射影ネットワークではチャンネル数だけでなく画像のサイズも一致させる処理が必要となります。
ここで、構造の詳細を見ていきます。
いくつか種類がありますが、ここでは34-layerと50-layer(Res34,Res50と表すこととします)の実装を行っていきます。
最初の畳み込み層を除けば、カーネルサイズはすべて3×3となります。
各ブロックで同じ残差ネットワークが使われますが、strideを2に設定したり画像のサイズを変換するのは最初のネットワークのみとなります。
また、各畳み込み層の後には、バッチ正規化(BN)と非線形変換(ReLU)を配置します。
このBNとReLUを畳み込み層の前に配置するという方法もありますが、今回はあとに置くこととします。
学習
学習はSGDを使い、学習率を0.1として、weight decayは0.0001、そしてmomentumを0.9としています。
実装
ここではcifar10の画像セットを意識して10分類モデルを作成しますが、画像の読み込みはどは行いません。
cifar10の画像セットの利用は次の記事を参考にしてください。
keras
実装を行っていきます。
必要なライブラリのインポートをします。
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv2D, Activation, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D, BatchNormalization, Add
from keras import backend as K
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import ReduceLROnPlateau
from keras.datasets import cifar10
import numpy as np
import cv2
最初にResidualブロックの実装をします。
今回は、ブロックの番号とブロック内の何個目のネットワークであるかの番号を引数として与えることで、恒等ネットワークか射影ネットワーク(とその設定)を指定するようにしています。
class res_block(Model):
"""residual block"""
def __init__(self, out_channels, block_num, layer_num):
super(res_block, self).__init__(name='block'+'_'+str(block_num)+'_'+str(layer_num))
block_name = '_'+str(block_num)+'_'+str(layer_num)
# shortcutとstrideの設定
if (layer_num == 0):
# 最初のresblockは(W、 H)は変更しないのでstrideは1にする
if (block_num==1):
self._is_change = False
stride = 1
else:
self._is_change = True
stride = 2
self.conv_sc = Conv2D(out_channels, kernel_size=1, strides=stride, padding='same', use_bias=False, name='conv_sc'+block_name)
self.bn_sc = BatchNormalization(name='bn_sc'+block_name)
else:
self._is_change = False
stride = 1
# 1層目 3×3 畳み込み処理を行います
self.conv1 = Conv2D(out_channels, kernel_size=3, strides=stride, padding='same', use_bias=False, name='conv1'+block_name)
self.bn1 = BatchNormalization(name='bn1'+block_name)
self.act1 = Activation('relu', name='act1'+block_name)
# 2層目 3×3 畳み込み処理を行います
self.conv2 = Conv2D(out_channels, kernel_size=3, strides=1, padding='same', use_bias=False, name='conv2'+block_name)
self.bn2 = BatchNormalization(name='bn2'+block_name)
self.add = Add(name='add'+block_name)
self.act = Activation('relu', name='act'+block_name)
def call(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.act1(out)
out = self.conv2(out)
out = self.bn2(out)
if K.int_shape(x) != K.int_shape(out):
shortcut = self.conv_sc(x)
shortcut = self.bn_sc(shortcut)
else:
shortcut = x
out = self.add([out, shortcut])
out = self.act(out)
return out
同様にbottleneckの実装を行います。
class res_bottleneck_block(Model):
"""residual block(bottlenack)"""
def __init__(self, out_channels, block_num, layer_num):
super(res_bottleneck_block, self).__init__(name='block'+'_'+str(block_num)+'_'+str(layer_num))
block_name = '_'+str(block_num)+'_'+str(layer_num)
# 今回はbottleneckのチャネル数はout_channelsの4分の1とします
bneck_channels = out_channels // 4
# 各ブロックの最初の層ではstrideを2とする(2層目の3×3畳み込み層)
if (layer_num == 0)&(out_channels!=256):
stride = 2
else:
stride = 1
# 1層目 1×1 畳み込み処理は行わず(線形変換)、チャネル数をbneck_channelsにします
self.conv1 = Conv2D(bneck_channels, kernel_size=1, strides=1, padding='valid', use_bias=False, name='conv1'+block_name)
self.bn1 = BatchNormalization(name='bn1'+block_name)
self.act1 = Activation('relu', name='act1'+block_name)
# 2層目 3×3 畳み込み処理を行います
self.conv2 = Conv2D(bneck_channels, kernel_size=3, strides=stride, padding='same', use_bias=False, name='conv2'+block_name)
self.bn2 = BatchNormalization(name='bn2'+block_name)
self.act2 = Activation('relu', name='act2'+block_name)
# 3層目 1×1 畳み込み処理は行わず(線形変換)、チャネル数をout_channelsにします
self.conv3 = Conv2D(out_channels, kernel_size=1, strides=1, padding='valid', use_bias=False, name='conv3'+block_name)
self.bn3 = BatchNormalization(name='bn3'+block_name)
# inputとoutputでチャネル数が異なる場合はinputのチャネル数を1×1の畳み込み層で変換します
# stride=2とした場合は次元を合わせるためstirde=2とします
self.conv_sc = Conv2D(out_channels, kernel_size=1, strides=stride, padding='same', use_bias=False, name='conv_sc'+block_name)
self.bn_sc = BatchNormalization(name='bn_sc'+block_name)
self.add = Add(name='add'+block_name)
self.act = Activation('relu', name='act'+block_name)
def call(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.act1(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.act2(out)
out = self.conv3(out)
out = self.bn3(out)
if K.int_shape(x) != K.int_shape(out):
shortcut = self.conv_sc(x)
shortcut = self.bn_sc(shortcut)
else:
shortcut = x
out = self.add([out, shortcut])
return out
まずは、実装したResidualブロックを使ってRes34を定義します。
class ResNet34(Model):
def __init__(self):
super().__init__()
self._layers = []
# 入力層
self._layers += [
Conv2D(filters = 64, kernel_size = (7,7), strides = 2, padding = 'same', name='conv_input'),
BatchNormalization(name='bn_input'),
Activation('relu', name='act_input'),
MaxPooling2D(pool_size = (2,2), padding = 'same', name='pool_input'),
]
# Residualブロック
self._layers += [res_block(out_channels=64, block_num=1, layer_num=i) for i in range(3)]
self._layers += [res_block(out_channels=128, block_num=2, layer_num=i) for i in range(4)]
self._layers += [res_block(out_channels=256, block_num=3, layer_num=i) for i in range(6)]
self._layers += [res_block(out_channels=512, block_num=4, layer_num=i) for i in range(3)]
# 出力層
self._layers += [
GlobalAveragePooling2D(name='pool_output'),
Dense(10, activation='softmax', name='output')
]
def call(self, x):
for layer in self._layers:
x = layer(x)
return x
構造を確認してみます。
うまい方法が見つからなかったため、Modelを作成してsummary()で確認します。
model = ResNet34()
model.build((None, 224, 224, 64)) # build with input shape.
dummy_input = Input(shape=(224, 224, 64)) # declare without batch demension.
model_summary = Model(inputs=[dummy_input], outputs=model.call(dummy_input), name="pretrained")
model_summary.summary()
そして、学習させます。
今回は実行させていません。以後同様ですので、学習部分は省略します。
sgd = SGD(lr=0.1, momentum=0.9, decay=0.0001, nesterov=False)
model.compile(loss=['categorical_crossentropy'], optimizer=sgd, metrics=['accuracy'])
history=model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test))
次にbottleneckを使って、Res50を実装します。
class ResNet50(Model):
def __init__(self):
super().__init__()
self._layers = []
# 入力層
self._layers += [
Conv2D(filters = 64, kernel_size = (7,7), strides = 2, padding = 'same', name='conv_input'),
BatchNormalization(name='bn_input'),
Activation('relu', name='act_input'),
MaxPooling2D(pool_size = (2,2), padding = 'same', name='pool_input'),
]
# Residualブロック
self._layers += [res_bottleneck_block(out_channels=256, block_num=1, layer_num=i) for i in range(3)]
self._layers += [res_bottleneck_block(out_channels=512, block_num=2, layer_num=i) for i in range(4)]
self._layers += [res_bottleneck_block(out_channels=1024, block_num=3, layer_num=i) for i in range(6)]
self._layers += [res_bottleneck_block(out_channels=2048, block_num=4, layer_num=i) for i in range(3)]
# 出力層
self._layers += [
GlobalAveragePooling2D(name='pool_output'),
Dense(10, activation='softmax', name='output')
]
def call(self, x):
for layer in self._layers:
x = layer(x)
return x
構造を確認してみます。
model = ResNet50()
model.build((None, 224, 224, 64)) # build with input shape.
dummy_input = Input(shape=(224, 224, 64)) # declare without batch demension.
model_summary = Model(inputs=[dummy_input], outputs=model.call(dummy_input), name="pretrained")
model_summary.summary()
転移学習
学習済みモデルを使用します。
ネットワークの最後に全結合層を追加して、分類モデルを作成します。
from keras.applications.resnet50 import ResNet50
base_model = ResNet50(include_top=False, weights='imagenet',input_shape=(224, 224, 3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
output = Dense(10, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=output)
for layer in base_model.layers:
layer.trainable = False
model.summary()
pytorch
次にpytorchでも同様に実装を行います。
必要なライブラリのインポートを行います。
import torch
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import random_split, DataLoader
from torchsummary import summary
import pytorch_lightning as pl
from torchmetrics import Accuracy as accuracy
import torchvision
from torchvision import models, transforms
from torchvision.datasets import CIFAR10
import matplotlib.pyplot as plt
最初にResidualブロックの実装をします。
class res_block(nn.Module):
"""residual block"""
def __init__(self, out_channels, block_num, layer_num):
super(res_block, self).__init__()
# 1番目のブロック以外はチャンネル数がinputとoutputで変わる(output=4×input)
if (block_num!=1)&(layer_num==0):
input_channels = out_channels//2
else:
input_channels = out_channels
# shortcutとstrideの設定
if (layer_num == 0):
# 最初のresblockは(W、 H)は変更しないのでstrideは1にする
if (block_num==1):
self._is_change = False
stride = 1
else:
self._is_change = True
stride = 2
self.conv_sc = nn.Conv2d(input_channels, out_channels, kernel_size=1, stride=stride)
self.bn_sc = nn.BatchNorm2d(out_channels)
else:
self._is_change = False
stride = 1
# 1層目 3×3 畳み込み処理を行います
self.conv1 = nn.Conv2d(input_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn1 = nn.BatchNorm2d(out_channels)
# 2層目 3×3 畳み込み処理を行います
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
shortcut = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
# Projection shortcutの場合
if self._is_change:
shortcut = self.conv_sc(shortcut)
shortcut = self.bn_sc(shortcut)
out += shortcut
out = self.relu(out)
return out
次にbottleneckの実装をします。
class res_bottleneck_block(nn.Module):
"""residual block"""
def __init__(self, out_channels, block_num, layer_num):
super(res_bottleneck_block, self).__init__()
# 1番目のブロック以外はチャンネル数がinputとoutputで変わる(output=4×input)
if (layer_num==0):
if (block_num!=1):
input_channels = out_channels//2
else:
input_channels = out_channels//4
else:
input_channels = out_channels
# shortcutとstrideの設定
if (layer_num == 0):
# 最初のresblockは(W、 H)は変更しないのでstrideは1にする
if (block_num==1):
self._is_change = True
stride = 1
else:
self._is_change = True
stride = 2
self.conv_sc = nn.Conv2d(input_channels, out_channels, 1, stride=stride)
self.bn_sc = nn.BatchNorm2d(out_channels)
else:
self._is_change = False
stride = 1
# チャネル数の削減
bottleneck_channels = int(out_channels // 4)
# 1層目 1×1 畳み込み処理は行わず(線形変換)、チャネル数をbneck_channelsにします
self.conv1 = nn.Conv2d(input_channels, bottleneck_channels, kernel_size=1)
self.bn1 = nn.BatchNorm2d(bottleneck_channels)
# 2層目 3×3 畳み込み処理を行います
self.conv2 = nn.Conv2d(bottleneck_channels, bottleneck_channels, kernel_size=3, stride=stride, padding=1)
self.bn2 = nn.BatchNorm2d(bottleneck_channels)
# 3層目 1×1 畳み込み処理は行わず(線形変換)、チャネル数をout_channelsにします
self.conv3 = nn.Conv2d(bottleneck_channels, out_channels, kernel_size=1)
self.bn3 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
shortcut = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
# Projection shortcutの場合
if self._is_change:
print('w')
shortcut = self.conv_sc(x)
shortcut = self.bn_sc(shortcut)
out += shortcut
out = self.relu(out)
return out
Residualブロックを使用してRes34の実装を行います。
class ResNet34(nn.Module):
def __init__(self, num_classes):
super(ResNet34, self).__init__()
conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
bn1 = nn.BatchNorm2d(64)
relu = nn.ReLU()
maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.conv1 = nn.Sequential(*[conv1, bn1, relu, maxpool])
self.conv2_x = nn.Sequential(*[res_block(out_channels=64, block_num=1, layer_num=i) for i in range(3)])
self.conv3_x = nn.Sequential(*[res_block(out_channels=128, block_num=2, layer_num=i) for i in range(1)])
self.conv4_x = nn.Sequential(*[res_block(out_channels=256, block_num=3, layer_num=i) for i in range(6)])
self.conv5_x = nn.Sequential(*[res_block(out_channels=512, block_num=4, layer_num=i) for i in range(3)])
self.pool = nn.AdaptiveAvgPool2d((1,1))
self.linear = nn.Linear(in_features=512, out_features=num_classes)
def forward(self, x):
out = self.conv1(x)
out = self.conv2_x(out)
out = self.conv3_x(out)
out = self.conv4_x(out)
out = self.conv5_x(out)
out = self.pool(out)
out = out.view(out.shape[0], -1)
out = self.linear(out)
return out
構造の確認を行います。
summary(ResNet34(10), (3,224,224))
学習のためにtorch-lightningのLightningModuleを使います。
これ以降は、この実装はmodelの部分以外は同じなので省略します。
class ResTrainer(pl.LightningModule):
def __init__(self):
super().__init__()
self.model = ResNet34(10)
def forward(self, x):
x = self.model(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
return {'loss': loss, 'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
def validation_step(self, batch, batch_idx):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
return {'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
def test_step(self, batch, batch_nb):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
y_label = torch.argmax(y_hat, dim=1)
acc = accuracy()(y_label, y)
return {'test_loss': loss, 'test_acc': acc}
def training_epoch_end(self, train_step_output):
y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0)
y = torch.cat([val['y'] for val in train_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('train_acc', acc, prog_bar=True, on_epoch=True)
print('---------- Current Epoch {} ----------'.format(self.current_epoch + 1))
print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loass, acc))
def validation_epoch_end(self, val_step_outputs):
y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0)
y = torch.cat([val['y'] for val in val_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('val_acc', acc, prog_bar=True, on_epoch=True)
print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc))
# New: テストデータに対するエポックごとの処理
def test_epoch_end(self, test_step_outputs):
y_hat = torch.cat([val['y_hat'] for val in test_step_outputs], dim=0)
y = torch.cat([val['y'] for val in test_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in test_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('test_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('test_acc', acc, prog_bar=True, on_epoch=True)
print('test Loss: {:.4f} test Acc: {:.4f}'.format(epoch_loss, acc))
def configure_optimizers(self):
optimizer = optim.SGD(self.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
return {'optimizer': optimizer, 'lr_scheduler': scheduler, 'monitor': 'val_loss'}
そして、学習させます。
ここでは実行させません。
net = VGGTrainer().to(device)
trainer = pl.Trainer(gpus=0, max_epochs=5)
trainer.fit(net, dm)
次にRes50の実装を行います。
class ResNet50(nn.Module):
def __init__(self, num_classes):
super(ResNet50, self).__init__()
conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
bn1 = nn.BatchNorm2d(64)
relu = nn.ReLU()
maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.conv1 = nn.Sequential(*[conv1, bn1, relu, maxpool])
self.conv2_x = nn.Sequential(*[res_bottleneck_block(out_channels=256, block_num=1, layer_num=i) for i in range(3)])
self.conv3_x = nn.Sequential(*[res_bottleneck_block(out_channels=512, block_num=2, layer_num=i) for i in range(4)])
self.conv4_x = nn.Sequential(*[res_bottleneck_block(out_channels=1024, block_num=3, layer_num=i) for i in range(6)])
self.conv5_x = nn.Sequential(*[res_bottleneck_block(out_channels=2048, block_num=4, layer_num=i) for i in range(3)])
self.pool = nn.AdaptiveAvgPool2d((1,1))
self.linear = nn.Linear(in_features=2048, out_features=num_classes)
def forward(self, x):
out = self.conv1(x)
out = self.conv2_x(out)
out = self.conv3_x(out)
out = self.conv4_x(out)
out = self.conv5_x(out)
out = self.pool(out)
out = out.view(out.shape[0], -1)
out = self.linear(out)
return out
構造を確認します。
summary(ResNet50(10), (3,224,224))
転移学習
最後に学習済みモデルの使い方を確認します。
res50 = models.resnet50(pretrained=True)
res50.fc = nn.Linear(in_features=2048, out_features=10)
model = res50
update_param_names = ['fc.weight', 'fc.bias']
for name, param in model.named_parameters():
if name in update_param_names:
param.requires_grad = True
else:
param.requires_grad = False
summary(res50, input_size=(3,224,224))
これでResNetの実装を終わります。
新しいバージョンのものが出ているので今後扱えたらと思います。