- 製造業出身のデータサイエンティストがお送りする記事
- 今回はPyTorchを用いてディープラーニングを活用した画像分類モデルを作成してみました
##はじめに
仕事として、非構造化データも扱ったデータ分析ができるようにディープラーニングを活用した画像分類問題に挑戦しました。
正直、勉強始めたばかりですしので、詳しい理論については整理することができませんが、実装することはできました。
##使用するデータセット
今回使用したデータセットは、「CelebA」というデータセットを使いました。CelebAは、有名人の顔画像をカラー178×218ピクセルで202,599枚集めたデータセットです。
CelebAには、各画像データが40種類の属性について、該当するかどうかをまとめた属性ファイル( list_attr_celeba.txt )が付属しています。これを活用して必要な画像を抽出して実装しました。
##実装
今回はgoogle colabを活用して実装しました。理由としては、無料でGPUが使用できるので活用させて頂きました。
はじめに、CelebA (Large-scale CelebFaces Attributes) データセットのダウンロードと展開(解凍)を実施しました。
- URLのリンク先に飛びます。
- 「Google Drive」をクリック
- 「img」ディレクトリの下の「img_align_celeba.zip」をダウンロード
- フォルダ「data」の直下で「img_align_celeba.zip」を解凍
実装コードは下記の通りになります。
# ライブラリーのインストール
import os
from os.path import join
import sys
import numpy as np
import glob
import pathlib
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from pathlib import Path
from PIL import Image
from sklearn.metrics import confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
%matplotlib inline
# google driveのマウント
from google.colab import drive
drive.mount('/content/drive/')
次にzipファイルを解凍します。
!unzip "drive/MyDrive/CelebA_dataset/img_align_celeba.zip"
取得したデータを確認します。
# CelebAデータセットのディレクトリを指定
path_dir = Path('img_align_celeba')
# glob関数により取得した jpg ファイルの一覧
sorted(path_dir.glob('*.jpg'))[:10]
次に画像を確認します。
plt.figure(figsize=(20, 20))
for i, path_img in enumerate(sorted(path_dir.glob('*00001.jpg'))):
name_img = path_img.name
img = Image.open(path_img)
plt.subplot(4, 5, i+1)
plt.title(name_img)
plt.imshow(img)
次は使用する画像のリサイズを行います。リサイズをする際に入力する画像のサイズをヒストグラムを用いて確認します。
# 画像サイズを集計
list_w, list_h, list_ratio = [], [], []
for path_img in path_dir.glob('*.jpg'):
img = Image.open(path_img)
w, h = img.size
list_w.append(w)
list_h.append(h)
list_ratio.append(w / h)
# ヒストグラムの表示
plt.figure(figsize=(20, 5))
# width
plt.subplot(1, 3, 1)
plt.hist(list_w, bins=10, density=True)
plt.title('width')
plt.xlabel('width')
plt.ylabel('freq')
# width
plt.subplot(1, 3, 2)
plt.hist(list_h, bins=10, density=True)
plt.title('height')
plt.xlabel('height')
plt.ylabel('freq')
# rate
plt.subplot(1, 3, 3)
plt.hist(list_ratio, bins=10, density=True)
plt.title('width / height')
plt.xlabel('w / h')
plt.ylabel('freq')
今回は有名なデータセットを使用するため、画像サイズが綺麗に揃っておりましたが、実務で使用する際はサイズが異なっているので上記作業は必ず確認する必要がありそうです。
2の累乗を約数に含むようにサイズを設定すると良いそうなので、今回は「224 * 224」に設定しました。
def load_image(path_img, size=(224, 224)):
img = Image.open(path_img)
# 短辺長を基準とした正方形の座標を得る
x_center = img.size[0] // 2
y_center = img.size[1] // 2
half_short_side = min(x_center, y_center)
x0 = x_center - half_short_side
y0 = y_center - half_short_side
x1 = x_center + half_short_side
y1 = y_center + half_short_side
img = img.crop((x0, y0, x1, y1))
img = img.resize(size)
img = np.array(img, dtype=np.float32)
return img
# リサイズ後の画像を確認
plt.figure(figsize=(20, 20))
for i, path_img in enumerate(sorted(path_dir.glob('*00001.jpg'))):
name_img = path_img.name
img = load_image(path_img)
plt.subplot(4, 5, i+1)
plt.title(name_img)
plt.imshow(img /255) # matplotlib で float32型の数値を適切に表示するために 0-1の間に収めます。
きちっとリサイズされていることが分かりますね。
次は、CelebAのデータセットを抽出します。今回は下記2パターンのデータセットを抽出しました。
- 笑っている男性
- 笑っていない男性
# outputディレクトリの指定
output_dir = f'drive/MyDrive/CelebA_dataset/'
# フォルダをリストで作成
pass_list = [f'00_smiling_male/',
f'01_Nonsmiling_male/']
for path in pass_list:
path_train_data = join(output_dir, path)
if not os.path.exists(path_train_data):
os.makedirs(path_train_data)
count = 0
with open("drive/MyDrive/CelebA_dataset/list_attr_celeba_ref.txt","r") as f: ### 属性ファイルを開く
for i in range(202599): # 全部で202,599枚処理する
line = f.readline() # 1行データ読み込み
line = line.split() # データを分割
count = count+1
print(count)
# 笑っている男性
if line[3]=="1" and line[16]=="-1" and line[21]=="1" and line[32]=="1" and line[36]=="-1" and line[40]=="1":
image = Image.open("img_align_celeba/"+line[0])
image.save(output_dir + pass_list[0] + line[0])
# 笑っていない男性
elif line[3]=="1" and line[16]=="-1" and line[21]=="1" and line[32]=="-1" and line[36]=="-1" and line[40]=="1":
image = Image.open("img_align_celeba/"+line[0])
image.save(output_dir + pass_list[1] + line[0])
次は、バッチ作成関数を実装します。
- ミニバッチを生成する関数を作成。
- 画像のパスを受け取り、入力データ:x_batch, 教師ラベル:t_batchを返す関数。
- 教師ラベルはフォルダ名から計算。
def make_batch(list_path_img):
x_batch = []
t_batch = []
for path_img in list_path_img:
img = load_image(path_img)
img = np.array(img, dtype=np.float32)
img = img.transpose(2, 0, 1)
x_batch.append(img)
t = int(str(path_img).split('/')[3][:2])
t_batch.append(t)
return torch.tensor(x_batch), torch.tensor(t_batch)
list_path_img = [output_dir+"00_smiling_male/"+"000012.jpg", output_dir+"00_smiling_male/"+"000023.jpg"]
x_batch, t_batch = make_batch(list_path_img)
print(list_path_img)
print(x_batch.shape)
print(t_batch)
#['drive/MyDrive/CelebA_dataset/00_smiling_male/000012.jpg', 'drive/MyDrive/CelebA_dataset/00_smiling_male/000023.jpg']
#torch.Size([2, 3, 224, 224])
#tensor([0, 0])
次にモデルを作成します。
- nn.Moduleというクラスを継承したクラスとしてモデルを定義。
- 3チャネルの入力画像を、Convolution 3層により 16 -> 32 -> 64 チャンネルの特徴マップに変換し、最後に全結合層により4次元のベクトルに変換するネットワークを定義。
- Convolution層の直後にはバッチ正則化を行い活性化関数 relu に通す。
class Model(nn.Module):
def __init__(self):
# スーパークラス(Module クラス)の初期化メソッドを実行
super().__init__()
self.c0 = nn.Conv2d(in_channels=3, # 入力は3チャネル
out_channels=16, # 出力は16チャネル
kernel_size=3, # カーネルサイズは3*3
stride=2, # 1pix飛ばしでカーネルを移動
padding=1) # 画像の外側1pixを埋める
self.c1 = nn.Conv2d(in_channels=16, # 入力は16チャネル
out_channels=32, # 出力は32チャネル
kernel_size=3, # カーネルサイズは3*3
stride=2, # 1pix飛ばしでカーネルを移動
padding=1) # 画像の外側1pixを埋める
self.c2 = nn.Conv2d(in_channels=32, # 入力は32チャネル
out_channels=64, # 出力は64チャネル
kernel_size=3, # カーネルサイズは3*3
stride=2, # 1pix飛ばしでカーネルを移動
padding=1) # 画像の外側1pixを埋める
self.bn0 = nn.BatchNorm2d(num_features=16) # c0用のバッチ正則化
self.bn1 = nn.BatchNorm2d(num_features=32) # c1用のバッチ正則化
self.bn2 = nn.BatchNorm2d(num_features=64) # c2用のバッチ正則化
self.fc = nn.Linear(in_features=64 * 28 * 28, # 入力サイズ
out_features=4) # 各クラスに対応する4次元のベクトルに変換
def __call__(self, x): # 入力から出力を計算するメソッドを定義
h = F.relu(self.bn0(self.c0(x)))
h = F.relu(self.bn1(self.c1(h)))
h = F.relu(self.bn2(self.c2(h)))
h = h.view(-1, 64 * 28 * 28)
y = self.fc(h) # 全結合層
return y
次にモデルを生成します。
model = Model()
model.modules
生成したモデルの挙動を確認します。
ここで、GPUを使用する設定もします。
# GPUを使用できる設定
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
path_img = output_dir+"00_smiling_male/"+"000033.jpg"
img = load_image(path_img)
plt.imshow(img /255)
この画像をモデルに通してみます。データを、PyTorchのモデルが入力画像に要求する(バッチ、チャネル、縦、横)という次元に合わせるために、np.newaxis によりバッチ次元として1次元目を挿入し、transpose メソッドにより次元の順番を変えます。
img = np.array(img, dtype=np.float32)
img_ = img[np.newaxis].transpose(0, 3, 1, 2)
img_.shape
#(1, 3, 224, 224)
x = torch.from_numpy(img_)
x = x.to(device)
model(x)
#tensor([[-0.2852, -0.2660, 0.4112, 0.6463]], device='cuda:0',grad_fn=<AddmmBackward>)
やっとここまでが事前の準備です。
これからモデルを学習します。ただし、データ数が多いのでモデルの学習にはGPUを使用しても1時間ぐらいかかります。
# 一回のパラメータ更新に使うデータ数
size_batch = 64
# 学習データの学習回数
n_epoch = 5
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = Model().to(device)
opt = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# ロスと精度を保存するリスト(訓練用・テスト用)
list_loss_train = []
list_loss_test = []
list_acc_train = []
list_acc_test = []
# データの分割
data_train = []
data_test = []
for path in pass_list:
path_dir_data = Path(output_dir+path)
list_path_img = sorted(list(path_dir_data.glob('*.jpg')))
count = 0
division_point = int(len(list_path_img)*0.9) # 学習データを90%、評価データを10%
for path_img in list_path_img:
count = count + 1
if count < division_point: # division_pointより小さいときは学習データに割り振る。
data_train.append(path_img)
else: # division_pointより大きいときは評価データに割り振る。
data_test.append(path_img)
# データ数の確認
len(data_train), len(data_test)
#(18038, 2008)
データの型をarrayに変換しておきます。
# データの型を変更
data_train = np.array(data_train)
data_test = np.array(data_test)
for epoch in range(n_epoch):
print("-----------------------------------------")
print('epoch: {}'.format(epoch))
print('train')
perm = np.random.permutation(len(data_train))
sum_loss = 0.
sum_acc = 0.
# 訓練
for i in range(0, len(perm), size_batch):
# ミニバッチの用意
x_batch, t_batch = make_batch(data_train[perm[i:i+size_batch]])
x_batch = x_batch.to(device)
t_batch = t_batch.to(device)
# 順伝播
y = model(x_batch)
loss = F.cross_entropy(y, t_batch)
# 逆伝播
opt.zero_grad()
loss.backward()
# パラメータ更新
opt.step()
# ロスと精度を蓄積
sum_loss += loss.item()
sum_acc += (y.max(1)[1] == t_batch).sum().item()
# 進捗を表示
print(i, "/", len(perm), end="\r")
sys.stdout.flush()
mean_loss = sum_loss / len(data_train)
mean_acc = sum_acc / len(data_train)
list_loss_train.append(mean_loss)
list_acc_train.append(mean_acc)
print("- mean loss:", mean_loss)
print("- mean accuracy:", mean_acc)
# Evaluate
print('test')
sum_loss = 0.
sum_acc = 0.
with torch.no_grad():
for i in range(0, len(data_test), size_batch):
x_batch, t_batch = make_batch(data_test[i:i+size_batch])
x_batch = x_batch.to(device)
t_batch = t_batch.to(device)
# forward
y = model(x_batch)
loss = F.cross_entropy(y, t_batch)
sum_loss += loss.item()
sum_acc += (y.max(1)[1] == t_batch).sum().item()
mean_loss = sum_loss / len(data_test)
mean_acc = sum_acc / len(data_test)
list_loss_test.append(mean_loss)
list_acc_test.append(mean_acc)
print("- mean loss:", mean_loss)
print("- mean accuracy:", mean_acc)
学習が完了したら結果を表示していきます。
最初に正解率の推移を見ます。
# Accuracy
plt.figure(figsize=(8, 5))
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.plot(list_acc_train)
plt.plot(list_acc_test)
plt.legend(['train', 'test'])
plt.show()
次にLossの推移を見ます。
# Loss
plt.figure(figsize=(8, 5))
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.plot(list_loss_train)
plt.plot(list_loss_test)
plt.legend(['train', 'test'])
plt.show()
最後にConfusion Matrixを作成します。
ys = []
ts = []
for i in range(0, len(data_test), size_batch):
x_batch, t_batch = make_batch(data_test[i:i+size_batch])
x_batch = x_batch.to(device)
t_batch = t_batch.to(device)
y = model(x_batch)
y = torch.argmax(y, dim=1) # 確率の最大のインデックスを取得
ys.append(y.cpu())
ts.append(t_batch.cpu())
ys = torch.cat(ys, dim=0)
ts = torch.cat(ts, dim=0)
# confusion matrixを表示するための関数
from sklearn import metrics
import itertools
def plot_confusion_matrix(cm,
classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
print(cm)
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
confmat = confusion_matrix(ys, ts)
confmat
#array([[879, 91],
# [ 88, 950]])
classes = ['smiling_male', 'Nosmiling_male']
plt.figure(figsize=(12, 12))
plot_confusion_matrix(confmat, classes=classes, normalize=True)
##さいごに
最後まで読んで頂き、ありがとうございました。
今回初めてディープラーニングを活用しましたが、実装はそこまで大変ではなかったですが、学習時間が長過ぎるのが今後の問題ですね。
転移学習とかを次は勉強してみようと思います。
訂正要望がありましたら、ご連絡頂けますと幸いです。