MobileNet V1について構造の説明と実装のメモ書きです。
ただし、論文すべてを見るわけでなく構造のところを中心に見ていきます。
勉強のメモ書き程度でありあまり正確に実装されていませんので、ご了承ください。
自分の実力不足で読み解けなくなってきています。難しいです。
以下の論文について実装を行っていきます。
タイトル:MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications
MobileNet V1
Depthwise Separable Convolution
MobileNetは通常のConvolutionをDepthwise ConvolutionとPointwise Convolutionに分解したもの(depthwise separable convolution)が基本となり構成されます。
分解することで計算コストを削減することができます。
depthwise convolution
チャンネルごとに異なるフィルタで畳み込み処理を行います。
pointwise convolution
1×1の畳み込み処理を行います。
構造
構造の詳細を次の表に示します。
depthwise convolutionとpointwise convolutionの組み合わせが基本単位ですが、下の図の右側で示すように各conv層の後にはBatchNormalizationとReLUが挿入されます。
学習
RMSPropを使用し、decayは0.9、$\epsilon$は1.0です。
学習率は0.045で、2エポックごとにrateを0.94として指数的に減衰させていきます。
実装
keras
まず、必要なライブラリのインポートをします。
import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv2D, Activation, Dense, GlobalAveragePooling2D, BatchNormalization, DepthwiseConv2D
from keras.layers.merge import concatenate
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import LearningRateScheduler
from keras.datasets import cifar10
import numpy as np
import cv2
Depswise Separable Convolutionの実装をします。
DepthwiseConv2Dを使用します。
class Depthwise_Separable_convolutions(Model):
def __init__(self, out_channels, strides):
super().__init__()
self.conv_dw = DepthwiseConv2D(kernel_size = (3,3), strides=strides, padding = 'same')
self.bn1 = BatchNormalization()
self.relu1 = Activation('relu')
self.conv_pw = Conv2D(filters = out_channels, kernel_size = (1,1), padding = 'same')
self.bn2 = BatchNormalization()
self.relu2 = Activation('relu')
def call(self, x):
x = self.conv_dw(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.conv_pw(x)
x = self.bn2(x)
x = self.relu2(x)
return x
MobileNet V1本体の実装をします。
class MobileNetV1(Model):
def __init__(self):
super().__init__()
self.conv1 = Conv2D(32, kernel_size=3, strides=2, padding='same')
self.DSconv1 = Depthwise_Separable_convolutions(out_channels=64, strides=1)
self.DSconv2 = Depthwise_Separable_convolutions(out_channels=128, strides=2)
self.DSconv3 = Depthwise_Separable_convolutions(out_channels=128, strides=1)
self.DSconv4 = Depthwise_Separable_convolutions(out_channels=256, strides=2)
self.DSconv5 = Depthwise_Separable_convolutions(out_channels=256, strides=1)
self.DSconv6 = Depthwise_Separable_convolutions(out_channels=512, strides=2)
self.DSconv7 = Sequential([Depthwise_Separable_convolutions(out_channels=512, strides=1) for _ in range(5)])
self.DSconv8 = Depthwise_Separable_convolutions(out_channels=1024, strides=2)
self.DSconv9 = Depthwise_Separable_convolutions(out_channels=1024, strides=1)
self.pool = GlobalAveragePooling2D()
self.fc = Dense(10, activation='softmax')
def call(self, x):
x = self.conv1(x)
x = self.DSconv1(x)
x = self.DSconv2(x)
x = self.DSconv3(x)
x = self.DSconv4(x)
x = self.DSconv5(x)
x = self.DSconv6(x)
x = self.DSconv7(x)
x = self.DSconv8(x)
x = self.DSconv9(x)
x = self.pool(x)
x = self.fc(x)
return x
構造を確認します。
model = MobileNetV1()
model.build((None, 224, 224, 3)) # build with input shape.
dummy_input = Input(shape=(224, 224, 3)) # declare without batch demension.
model_summary = Model(inputs=[dummy_input], outputs=model.call(dummy_input))
model_summary.summary()
学習の設定をします。
epochs = 100
initial_lrate = 0.0045
def decay(epoch, steps=100):
initial_lrate = 0.0045
drop = 0.94
epochs_drop = 2
lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
return lrate
sgd = RMSprop(lr=0.045, rho=0.9, epsilon=1.0, decay=0.9)
lr_sc = LearningRateScheduler(decay, verbose=1)
model = MobileNetV1()
model.compile(loss=['categorical_crossentropy'], optimizer=sgd, metrics=['accuracy'])
転移学習(keras)
学習済みのモデルを使用します。
全結合層を付け加えて、追加した部分だけ学習させます。
from keras.applications.mobilenet import MobileNet
input_shape = (224,224,3)
num_classes = 10
base_model = MobileNet(weights='imagenet', include_top=False, input_shape=input_shape)
x = base_model.output
x = GlobalAveragePooling2D()(x)
#x = Dense(1024, activation='relu')(x)
output = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=output)
for layer in base_model.layers:
layer.trainable = False
epochs = 100
initial_lrate = 0.0045
def decay(epoch, steps=100):
initial_lrate = 0.0045
drop = 0.94
epochs_drop = 2
lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
return lrate
sgd = RMSprop(lr=0.045, rho=0.9, epsilon=1.0, decay=0.9)
lr_sc = LearningRateScheduler(decay, verbose=1)
model.compile(loss=['categorical_crossentropy'], optimizer=sgd, metrics=['accuracy'])
pytorch
必要なライブラリのインポートをします。
import torch
import torch.nn as nn
import torch.optim as optim
import pytorch_lightning as pl
from torchmetrics import Accuracy as accuracy
Depswise Separable Convolutionの実装をします。
nn.Conv2dのgroupsを入力チャンネル数とすることで、Depthwise Convolutionを行います。
class Depthwise_Separable_convolutions(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super().__init__()
self.conv_dw = nn.Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size = (3,3), stride=stride, groups=in_channels, padding = 1)
self.bn1 = nn.BatchNorm2d(in_channels)
self.relu1 = nn.ReLU(inplace=False)
self.conv_pw = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size = (1,1), padding = 'same')
self.bn2 = nn.BatchNorm2d(out_channels)
self.relu2 = nn.ReLU(inplace=False)
def forward(self, x):
x = self.conv_dw(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.conv_pw(x)
x = self.bn2(x)
x = self.relu2(x)
return x
MobileNet V1本体の実装をします。
class MobileNetV1(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2, padding=1)
self.DSconv1 = Depthwise_Separable_convolutions(in_channels=32, out_channels=64, stride=1)
self.DSconv2 = Depthwise_Separable_convolutions(in_channels=64, out_channels=128, stride=2)
self.DSconv3 = Depthwise_Separable_convolutions(in_channels=128, out_channels=128, stride=1)
self.DSconv4 = Depthwise_Separable_convolutions(in_channels=128, out_channels=256, stride=2)
self.DSconv5 = Depthwise_Separable_convolutions(in_channels=256, out_channels=256, stride=1)
self.DSconv6 = Depthwise_Separable_convolutions(in_channels=256, out_channels=512, stride=2)
self.DSconv7 = nn.Sequential(*[Depthwise_Separable_convolutions(in_channels=512, out_channels=512, stride=1) for _ in range(5)])
self.DSconv8 = Depthwise_Separable_convolutions(in_channels=512, out_channels=1024, stride=2)
self.DSconv9 = Depthwise_Separable_convolutions(in_channels=1024, out_channels=1024, stride=1)
self.pool = nn.AdaptiveAvgPool2d((1,1))
self.fc = nn.Linear(1024,10)
def forward(self, x):
x = self.conv1(x)
x = self.DSconv1(x)
x = self.DSconv2(x)
x = self.DSconv3(x)
x = self.DSconv4(x)
x = self.DSconv5(x)
x = self.DSconv6(x)
x = self.DSconv7(x)
x = self.DSconv8(x)
x = self.DSconv9(x)
x = self.pool(x)
x = x.view(x.shape[0], -1)
x = self.fc(x)
return x
構造の確認をします
from torchsummary import summary
summary(MobileNetV1(), (3,224,224))
学習の設定をします。
class MobileNetV1Trainer(pl.LightningModule):
def __init__(self):
super().__init__()
self.model = MobileNetV1()
def forward(self, x):
x = self.model(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
return {'loss': loss, 'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
def validation_step(self, batch, batch_idx):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
return {'y_hat':y_hat, 'y':y, 'batch_loss': loss.item()*x.size(0)}
def test_step(self, batch, batch_nb):
x, y = batch
#x, y = x.to(device), y.to(device)
y_hat = self.forward(x)
loss = nn.CrossEntropyLoss()(y_hat, y)
y_label = torch.argmax(y_hat, dim=1)
acc = accuracy()(y_label, y)
return {'test_loss': loss, 'test_acc': acc}
def training_epoch_end(self, train_step_output):
y_hat = torch.cat([val['y_hat'] for val in train_step_outputs], dim=0)
y = torch.cat([val['y'] for val in train_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in train_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('train_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('train_acc', acc, prog_bar=True, on_epoch=True)
print('---------- Current Epoch {} ----------'.format(self.current_epoch + 1))
print('train Loss: {:.4f} train Acc: {:.4f}'.format(epoch_loass, acc))
def validation_epoch_end(self, val_step_outputs):
y_hat = torch.cat([val['y_hat'] for val in val_step_outputs], dim=0)
y = torch.cat([val['y'] for val in val_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in val_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('val_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('val_acc', acc, prog_bar=True, on_epoch=True)
print('valid Loss: {:.4f} valid Acc: {:.4f}'.format(epoch_loss, acc))
# New: テストデータに対するエポックごとの処理
def test_epoch_end(self, test_step_outputs):
y_hat = torch.cat([val['y_hat'] for val in test_step_outputs], dim=0)
y = torch.cat([val['y'] for val in test_step_outputs], dim=0)
epoch_loss = sum([val['batch_loss'] for val in test_step_outputs]) / y_hat.size(0)
preds = torch.argmax(y_hat, dim=1)
acc = accuracy()(preds, y)
self.log('test_loss', epoch_loss, prog_bar=True, on_epoch=True)
self.log('test_acc', acc, prog_bar=True, on_epoch=True)
print('test Loss: {:.4f} test Acc: {:.4f}'.format(epoch_loss, acc))
def configure_optimizers(self):
optimizer = optim.RMSprop(self.parameters(), lr=0.045, eps=1.0, momentum=0.9)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.06)
return {'optimizer': optimizer, 'lr_scheduler': scheduler}