5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

論文の勉強1 VGG

Last updated at Posted at 2021-12-11

VGGについて構造の説明と実装のメモ書きです。
ただし、論文すべてを見るわけでなく構造のところを中心に見ていきます。

以下の論文についてです。
Very Deep Convolutional Networks for Large-Scale Image Recognition

構造

入力~畳み込み層

入力画像は224×224のRGB画像とします。前処理として、平均RGB値を各ピクセルから引きます。
カーネルサイズが3×3のフィルタを使用しますが、線形変換として見ることができる1×1の畳み込みフィルタも使用します。
ストライドは1で、パディングは3×3の畳み込み処理に対して1ピクセルとします。
5つ使われるプーリング層としては最大プーリングを使用し、いくつかの畳み込み層のあとに配置します。プーリングは、カーネルサイズは2×2でストライドを2で実行します。

全結合層

上記の畳み込み層に3つの全結合層が続きます。
最初の2つはそれぞれ4096チャンネルを持ち、3つ目の層は1000チャンネル(ILSVRの分類問題のクラス数)を持っています。最後の層はソフトマックス関数を使用します。

すべての隠れ層は非線形の整流性(ReLU)を持っていますが、1つを除いてLocal Response Normalizationを持っていません。

設定

設定を見てみます。
層の数が違うAからEのネットワークが紹介されています。
ここでは全部で19層と1番層の多いEのネットワークを実装します。

image.png

###実装
kerasとpytorch両方で実装します。
全結合層にはDropoutを0.5で設定したものを入れて、weight decayなども設定しています。
学習させるのには時間がかかるので、動くことは確認していますが、結果は出していません。

keras

まずは、kerasです。
必要なライブラリのインポートを行います。

import tensorflow.keras as keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv2D, Activation, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import ReduceLROnPlateau
from keras.datasets import cifar10
import numpy as np
import cv2

次にデータセットを準備します。
cifar10のデータです。

# データセットの読み込み
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# バッチサイズ、クラス数、エポック数の設定
batch_size=32
num_classes=10
epochs=5

# データリサイズ
img_rows=224
img_cols=224

x_train = np.array([cv2.resize(img, (img_rows,img_cols)) for img in x_train[::100,:,:,:]])
x_test = np.array([cv2.resize(img, (img_rows,img_cols)) for img in x_test[::100,:,:,:]])

# データ正規化
x_train=x_train.astype('float32')
x_train/=255
x_test=x_test.astype('float32')
x_test/=255

# one-hotベクトル化
y_train = y_train[::100]
y_test = y_test[::100]
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 4, figsize=(12,3))
[axes[i].imshow(x_train[i]) for i in range(4)];

image.png

ここで、VGG19を実装します。

input_shape=(224, 224, 3)

model = Sequential()

# 第1層
model.add(Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding='same', input_shape=input_shape, name='block1_conv1'))
model.add(Activation('relu'))
model.add(Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding='same', name='block1_conv2'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same', name='block1_pool'))

# 第2層
model.add(Conv2D(filters=128, kernel_size=(3,3), strides=(1,1), padding='same', name='block2_conv1'))
model.add(Activation('relu'))
model.add(Conv2D(filters=128, kernel_size=(3,3), strides=(1,1), padding='same', name='block2_conv2'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same', name='block2_pool'))

# 第3層
model.add(Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same', name='block3_conv1'))
model.add(Activation('relu'))
model.add(Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same', name='block3_conv2'))
model.add(Activation('relu'))
model.add(Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same', name='block3_conv3'))
model.add(Activation('relu'))
model.add(Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), padding='same', name='block3_conv4'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same', name='block3_pool'))

# 第4層
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block4_conv1'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block4_conv2'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block4_conv3'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block4_conv4'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same', name='block4_pool'))

# 第5層
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block5_conv1'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block5_conv2'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block5_conv3'))
model.add(Activation('relu'))
model.add(Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding='same', name='block5_conv4'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same', name='block5_pool'))

# 全結合層
model.add(Flatten(name='flatten'))
model.add(Dense(units=4096, activation='relu', name='fc1'))
model.add(Dropout(0.5))
model.add(Dense(units=4096, activation='relu', name='fc2'))
model.add(Dropout(0.5))

# 出力層
model.add(Dense(units=num_classes, activation='softmax', name='predictions'))

作ったモデルの構造を確認します。

model.summary()

image.png

パラメータ数は140万ほどで論文とほぼ同じになりました。
最後の出力層の出力数は10に変更します。

10回更新がなかった場合には学習率を0.1倍するという設定には、ReduceLROnPlateauを使います。
weight decayはSGDの引数として設定します。

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', 
    factor=0.1,   
    patience=10, 
    min_lr=0.000001,
    verbose=2
)

sgd = SGD(lr=0.01, momentum=0.9, decay=5e-4, nesterov=False)

model.compile(loss=['categorical_crossentropy'], optimizer=sgd, metrics=['accuracy'])

ここで学習を実行します。
メモリ不足で動きませんでした。

history=model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test), callbacks=[reduce_lr])

転移学習

VGGは本来,学習済みのものが提供されています。
実際にはこちらを使うことになると思います。

出力部分を付けくわえて、10個の分類モデルにします。
次いで,途中の層までは重みを固定して学習を行います。

from keras.applications.vgg19 import VGG19

base_model = VGG19(weights='imagenet', include_top=False, input_shape=input_shape)

x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
output = Dense(num_classes, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=output)

for layer in base_model.layers[:17]:
    layer.trainable = False

モデルの設定が終わればその後は同じです。

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', 
    factor=0.1,   
    patience=10, 
    min_lr=0.000001,
    verbose=2
)

sgd = SGD(lr=0.01, momentum=0.9, decay=5e-4, nesterov=False)

model.compile(loss=['categorical_crossentropy'], optimizer=sgd, metrics=['accuracy'])

history=model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test), callbacks=[reduce_lr])

pytorch

次にpytorchについて述べます。
流れはkerasと変わりませんが、torch-lightningを使います。
必要なライブラリをインポートします。

import torch
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import random_split, DataLoader

from torchsummary import summary
import pytorch_lightning as pl
from torchmetrics import Accuracy as accuracy
import torchvision
from torchvision import models, transforms
from torchvision.datasets import CIFAR10

import matplotlib.pyplot as plt

データを取得します。
torch-lightningではLightningDataModuleを使ってデータのクラスを作成します。
次いで、DataLoaderなどはこの中で定義します。

class CIFAR10DataModule(pl.LightningDataModule):
    def __init__(self, batch_size, data_dir: str = './'):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        
        self.dims = (3, 32, 32)
        self.num_classes = 10
    
    def prepare_data(self):
        CIFAR10(self.data_dir, train= True, download= True)
        CIFAR10(self.data_dir, train= False, download= True)
    
    def setup(self, stage=None):
        # Assign train/val datasets for use in dataloaders
        if stage == 'fit' or stage is None:
            cifar_full = CIFAR10(self.data_dir, train=True, transform=self.transform)
            self.cifar_train, self.cifar_val = random_split(cifar_full, [45000, 5000])

        # Assign test dataset for use in dataloader(s)
        if stage == 'test' or stage is None:
            self.cifar_test = CIFAR10(self.data_dir, train=False, transform=self.transform)

    def train_dataloader(self):
        return DataLoader(self.cifar_train, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.cifar_val, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.cifar_test, batch_size=self.batch_size)

データを準備し、中身を見ます。

# Init our data pipeline
dm = CIFAR10DataModule(batch_size=32)
# To access the x_dataloader we need to call prepare_data and setup.
dm.prepare_data()
dm.setup()

# Samples required by the custom ImagePredictionLogger callback to log image predictions.
val_samples = next(iter(dm.val_dataloader()))
val_imgs, val_labels = val_samples[0], val_samples[1]

fig, axes = plt.subplots(1, 4, figsize=(12,3))
[axes[i].imshow(val_imgs[i].transpose(0,2)) for i in range(4)];

image.png

次にVGG19を実装します。

class VGG(nn.Module):
    def __init__(self):
        super(VGG, self).__init__()
        self.layers1 = []
        in_channels = 3
        
        # 第1層
        conv2d = nn.Conv2d(in_channels, 64, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        self.layers1 += [nn.MaxPool2d(kernel_size=2, stride=2)]
        
        # 第2層
        conv2d = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        self.layers1 += [nn.MaxPool2d(kernel_size=2, stride=2)]
        
        # 第3層
        conv2d = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        self.layers1 += [nn.MaxPool2d(kernel_size=2, stride=2)]
        
        # 第4層
        conv2d = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        self.layers1 += [nn.MaxPool2d(kernel_size=2, stride=2)]
        
        # 第5層
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        conv2d = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.layers1 += [conv2d, nn.ReLU(inplace=True)]
        self.layers1 += [nn.MaxPool2d(kernel_size=2, stride=2)]
        self.layers1 += [nn.AdaptiveAvgPool2d((7, 7))]
        self.features = nn.Sequential(*self.layers1)

        # 全結合層
        self.layers2 = []
        
        conv5 = nn.Linear(512*7*7, 4096)
        conv6 = nn.Linear(4096, 4096)
        conv7 = nn.Linear(4096, 10)
        self.layers2 += [conv5, nn.ReLU(inplace=True), nn.Dropout(p=0.5),
                         conv6, nn.ReLU(inplace=True), nn.Dropout(p=0.5), conv7]
        self.classifier = nn.Sequential(*self.layers2)

    def forward(self, x):
        out = self.features(x)
        out = out.view(out.shape[0], -1)
        out = self.classifier(out)
        return out

構造を確認します。

summary(VGG(), (3,224,224))

image.png

この場合もパラメータ数が140万近くになっています。
LightningModuleを使ってネットワークの定義をします。
書き方については別途まとめようと思っています。
更新がない場合には,学習率を変える設定をReduceLROnPlateauで行います。

class VGGTrainer(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = VGG()
        
    def forward(self, x):
        x = self.model(x)
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch 
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return {'loss': loss, 'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return {'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
    
    def test_step(self, batch, batch_nb):
        x, y = batch
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        y_label = torch.argmax(y_hat, dim=1)
        acc = accuracy()(y_label, y)
        return {'test_loss': loss, 'test_acc': acc}
    
    def training_epoch_end(self, train_step_output):
        y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in train_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('train_acc', acc, prog_bar=True, on_epoch=True)
        
        print('---------- Current Epoch {} ----------'.format(self.current_epoch + 1))
        print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loass, acc))
    
    def validation_epoch_end(self, val_step_outputs):
        y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in val_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('val_acc', acc, prog_bar=True, on_epoch=True)
        
        print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc))
    
    def test_epoch_end(self, test_step_outputs):
        y_hat = torch.cat([val['y_hat'] for val in test_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in test_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in test_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('test_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('test_acc', acc, prog_bar=True, on_epoch=True)
        
        print('test Loss: {:.4f} test Acc: {:.4f}'.format(epoch_loss, acc))
        
    def configure_optimizers(self):
        optimizer = optim.SGD(self.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, threshold=0.0001)
        return {'optimizer': optimizer, 'lr_scheduler': scheduler, 'monitor': 'val_loss'}

ここで、Trainerを作ります。
(GPUを使おうとしましたがうまくいかず、今後訂正します。)

net = VGGTrainer().to(device)

trainer = pl.Trainer(gpus=0, max_epochs=5)

あとはfitで学習します。

trainer.fit(net, dm)

転移学習

pytorchでも学習済みのものが提供されています。

さきほどと実装内容は変わりません。__init__部分だけ書き換えます。
こちらも全結合層の最後の層の出力を10個にします。
また,出力層以外は重みの学習をおこなわないように設定します。

class VGGTrainer(pl.LightningModule):
    def __init__(self):
        super(VGGTrainer, self).__init__()
        vgg19 = models.vgg19(pretrained=True)
        
        vgg19.classifier[6] = nn.Linear(in_features=4096, out_features=10)
        self.model = vgg19
        
        update_param_names = ['classifier.6.weight', 'classifier.6.bias']
        
        for name, param in self.model.named_parameters():
            if name in update_param_names:
                param.requires_grad = True
            else:
                param.requires_grad = False
        
    def forward(self, x):
        x = self.model(x)
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch 
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return {'loss': loss, 'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        return {'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
    
    def test_step(self, batch, batch_nb):
        x, y = batch
        #x, y = x.to(device), y.to(device)
        y_hat = self.forward(x)
        loss = nn.CrossEntropyLoss()(y_hat, y)
        y_label = torch.argmax(y_hat, dim=1)
        acc = accuracy()(y_label, y)
        return {'test_loss': loss, 'test_acc': acc}
    
    def training_epoch_end(self, train_step_output):
        y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in train_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('train_acc', acc, prog_bar=True, on_epoch=True)
        
        print('---------- Current Epoch {} ----------'.format(self.current_epoch + 1))
        print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loass, acc))
    
    def validation_epoch_end(self, val_step_outputs):
        y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in val_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('val_acc', acc, prog_bar=True, on_epoch=True)
        
        print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc))
    
    # New: テストデータに対するエポックごとの処理
    def test_epoch_end(self, test_step_outputs):
        y_hat = torch.cat([val['y_hat'] for val in test_step_outputs], dim=0)
        y = torch.cat([val['y'] for val in test_step_outputs], dim=0)
        epoch_loss = sum([val['batch_loss'] for val in test_step_outputs]) / y_hat.size(0)
        preds = torch.argmax(y_hat, dim=1)
        acc = accuracy()(preds, y)
        self.log('test_loss', epoch_loss, prog_bar=True, on_epoch=True)
        self.log('test_acc', acc, prog_bar=True, on_epoch=True)
        
        print('test Loss: {:.4f} test Acc: {:.4f}'.format(epoch_loss, acc))
        
    def configure_optimizers(self):
        optimizer = optim.SGD(self.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, threshold=0.0001)
        return {'optimizer': optimizer, 'lr_scheduler': scheduler, 'monitor': 'val_loss'}

この処理の後,学習させます。

net = VGGTrainer().to(device)

trainer = pl.Trainer(gpus=0, max_epochs=5)
trainer.fit(net, dm)

以上です。
今後,学習の結果が得られた場合に,追加掲載します。

5
4
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?