はじめに
KaggleのDigitRecognizerタスクの解説です.
全コードを載せているので,参考にしてください.
KaggeleのNotebook
IMPORT
必要なライブラリのImportです.
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
# Validation
from torchvision import datasets
from torch.utils.data.dataset import Subset
import csv
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd
import seaborn as sn
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
import timm
import random
import pickle
plt.style.use('seaborn-darkgrid')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
ReadData
データを読み込みます.
画像ですが,csv形式のデータになっていて,28*28の画像が784で一列になっています.
また,TRAINの方は1つ目の値が正解ラベルとなっているため,POPして取り出しています.
TRAIN_DIR = "/kaggle/input/digit-recognizer/train.csv"
TEST_DIR = "/kaggle/input/digit-recognizer/test.csv"
TRAIN_DATA = pd.read_csv(TRAIN_DIR)
TEST_DATA = pd.read_csv(TEST_DIR)
TRAIN_DATA = TRAIN_DATA.values.tolist()
del TRAIN_DATA[0]
TEST_DATA = TEST_DATA.values.tolist()
TRAIN_Y = []
for i in range(len(TRAIN_DATA)):
y = TRAIN_DATA[i][0]
del TRAIN_DATA[i][0]
TRAIN_Y.append(y)
TRAIN_X = np.array(TRAIN_DATA)
TRAIN_Y = np.array(TRAIN_Y)
TEST_X = np.array(TEST_DATA)
print(TRAIN_X.shape)
print(TRAIN_Y.shape)
print(TEST_X.shape)
#(41999, 784)
#(41999,)
#(28000, 784)
これをDeepに入れる形式にReshapeします
画像は([枚数,チャンネル数,タテ,ヨコ])の形にします.
チャンネル数とはRGBです.カラーなら3,モノクロなら1になります.
TRAIN_X = TRAIN_X.reshape([41999,1,28,28])
TEST_X = TEST_X.reshape([28000,1,28,28])
print(TRAIN_X.shape)
print(TRAIN_Y.shape)
print(TEST_X.shape)
#(41999, 1, 28, 28)
#(41999,)
#(28000, 1, 28, 28)
MODEL
Tensor型にして,Datasetに入れておきます.
今回はデータも多いので,BatchSize32にしています.
X_tensor = torch.Tensor(TRAIN_X)
y_tensor = torch.Tensor(TRAIN_Y)
dataset = TensorDataset(X_tensor, y_tensor)
X_tensor = torch.Tensor(TEST_X)
test_dataset = TensorDataset(X_tensor)
batch_size = 32
IMG_SIZE = 28
DataAugmentation用のTransform実装です.
また,シード値固定関数もここで定義しています.(Segmentation用に実装しただけなので今回はなくてもいいです)
image_transform = transforms.RandomOrder([
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
transforms.RandomPosterize(bits=4),
])
share_transform = transforms.RandomOrder([
transforms.RandomErasing(),
transforms.RandomHorizontalFlip(),
transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.2)),
transforms.RandomAffine(degrees=[-10, 10],translate=(0.2, 0.2)),
])
def fix_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
TRAINの処理です.DataAugmentationを入れていますが,データ数多いしなくても精度がガクッと下がるとかはなさそうです.
def train_epoch(model, optimizer, criterion, dataloader, device):
train_loss = 0
model.train()
for i, (images, labels) in enumerate(dataloader):
# DataAugmentation ------#
### FIX SEED
seed = random.randint(0, 2**32)
### IMAGE
images = images.to(torch.uint8)
fix_seed(seed)
images = share_transform(images)
images = image_transform(images)
images = images.to(torch.float32)
#------------------------#
labels = labels.type(torch.LongTensor)
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
#outputs = torch.sigmoid(outputs)
#outputs = F.softmax(outputs,dim=1)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
del images,labels,loss,outputs
train_loss = train_loss / len(dataloader.dataset)
return train_loss
Validation処理です.
def validation(model, optimizer, criterion, dataloader, device):
model.eval()
val_loss=0
with torch.no_grad():
for i, (images, labels) in enumerate(dataloader):
labels = labels.type(torch.LongTensor)
images, labels = images.to(device), labels.to(device)
outputs = model(images)
#outputs = torch.sigmoid(outputs)
#outputs = F.softmax(outputs,dim=1)
loss = criterion(outputs, labels)
val_loss += loss.item()
val_loss = val_loss / len(dataloader.dataset)
return val_loss
EarlyStoppingの定義です.
class EarlyStopping:
def __init__(self, patience=10, verbose=False, path='checkpoint_model.pth'):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.path = path
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.checkpoint(val_loss, model)
elif score < self.best_score:
self.counter += 1
if self.verbose:
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.checkpoint(val_loss, model)
self.counter = 0
def checkpoint(self, val_loss, model):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model.state_dict(), self.path)
self.val_loss_min = val_loss
RUN部分です.
def run(num_epochs, optimizer, criterion, device, train_loader, val_loader,model):
train_loss_list = []
val_loss_list = []
earlystopping = EarlyStopping(verbose=True)
for epoch in range(num_epochs):
train_loss = train_epoch(model, optimizer, criterion, train_loader, device)
val_loss = validation(model, optimizer, criterion, val_loader, device)
print(f'Epoch [{epoch+1}], train_Loss : {train_loss:.4f}')
train_loss_list.append(train_loss)
val_loss_list.append(val_loss)
earlystopping(val_loss_list[-1], model)
if earlystopping.early_stop:
print("Early Stopping!")
break
return train_loss_list, val_loss_list
Loss遷移グラフ出力関数です.
def graph(train_loss_list, val_loss_list):
num_epochs=len(train_loss_list)
fig, ax = plt.subplots(figsize=(4, 3), dpi=100)
ax.plot(range(num_epochs), train_loss_list, c='b', label='train loss')
ax.plot(range(num_epochs), val_loss_list, c='r', label='test loss')
ax.set_xlabel('epoch', fontsize='10')
ax.set_ylabel('loss', fontsize='10')
ax.set_title('training and test loss', fontsize='10')
ax.grid()
ax.legend(fontsize='10')
plt.show()
混同行列の出力関数です.
## CV ALL CONFUSION MATRIX
cv_y_true,cv_y_pred = [],[]
def print_confusion_matrix(test_loader,model):
model.eval()
y_true,y_pred = [],[]
with torch.no_grad():
for i, (images, labels) in enumerate(test_loader):
labels = labels.type(torch.LongTensor)
images, labels = images.to(device), labels.to(device)
outputs = model(images)
for nval in range(len(labels)):
#y_true.append(torch.argmax(labels[nval]))
y_true.append(labels[nval])
y_pred.append(torch.argmax(outputs[nval]))
for leny in range(len(y_true)):
y_true[leny] = y_true[leny].item()
y_pred[leny] = y_pred[leny].item()
## CV ALL CONFUSION MATRIX
cv_y_true.append(y_true)
cv_y_pred.append(y_pred)
#target_names = ['0', '1']
#cmx = confusion_matrix(y_true, y_pred)
#df_cmx = pd.DataFrame(cmx, index=target_names, columns=target_names)
#plt.figure(figsize = (6,3))
#sn.heatmap(df_cmx, annot=True, annot_kws={"size": 18}, fmt="d", cmap='Blues')
#plt.show()
print(classification_report(y_true, y_pred, target_names=target_names))
print("accuracy: ", accuracy_score(y_true, y_pred))
層化分割交差検証の準備と,FOLD毎のLossを保存するリストを用意しておきます.
cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
fold_train_list = []
fold_val_list = []
fold_test_list = []
メイン部分です.
今回モデルは自分で書かずに,TIMMのEfficientNetV2を使用しています.
TIMMはImageNetで事前学習したモデルが手軽にダウンロードできます.様々なモデルが用意されているので,めちゃくちゃに使えます.
”PreTrained=False”にすることで事前学習していないモデルを使うことも可能です.後は,チャンネル数とクラス数を入れておきましょう.
for i,(train_index, test_index) in enumerate(cv.split(TRAIN_X,TRAIN_Y)):
# モデル指定
TIMM = timm.create_model('tf_efficientnetv2_s_in21ft1k', pretrained=True, num_classes=10, in_chans=1)
model = TIMM.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=0.0001)
# train/test 分割
cv_train_dataset = Subset(dataset, train_index)
cv_val_dataset = Subset(dataset, test_index)
train_loader = DataLoader(cv_train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(cv_val_dataset, batch_size=batch_size, shuffle=True)
# run
print(f"***FOLD {i}")
train_loss_list, val_loss_list = run(100, optimizer, criterion, device, train_loader, val_loader,model)
model.load_state_dict(torch.load('checkpoint_model.pth'))
# Model Save
ModelSavePath='model'+str(i)+'.pth'
torch.save(model.state_dict(), ModelSavePath)
# PLOT
graph(train_loss_list, val_loss_list)
#print_confusion_matrix(val_loader,model)
# 各実行の最後のLossを保存
fold_train_list.append(train_loss_list[-1])
fold_val_list.append(val_loss_list[-1])
print("-----------------\n")
OUTPUT
出力の準備,学習したモデルを使ってテストデータに対する予測を行います.
5つのモデルそれぞれの結果を出して,Votingを行っています.
指定形式にして,csvとしてOUTPUTフォルダに保存すれば完成です.
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
y_pred = []
for i in range(5):
# Model Load
ModelSavePath='model'+str(i)+'.pth'
model.load_state_dict(torch.load(ModelSavePath))
model.eval()
y_pred_tmp = []
with torch.no_grad():
for table_data in test_loader:
table_data = table_data[0].clone().detach()
table_data = table_data.to(device).detach()
outputs = model(table_data)
outputs = F.softmax(outputs,dim=1)
for nval in range(len(outputs)):
y_pred_tmp.append(outputs[nval])
y_pred.append(y_pred_tmp)
y_pred_bote = []
for i in range(len(y_pred[0])):
for j in range(5):
tmp = [0]*10
for k in range(10):
tmp[k] += y_pred[j][i][k]
#print(i)
ArgMax = max(tmp)
MaxIndex = tmp.index(ArgMax)
y_pred_bote.append(MaxIndex)
output = [["ImageId","Label"]]
for i in range(len(y_pred_bote)):
tmp = []
tmp.append(i+1)
tmp.append(y_pred_bote[i])
output.append(tmp)
output_path = "/kaggle/working/submission.csv"
with open(output_path, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerows(output)
結果
結果は0.96となりました.
時間がかかるため,Epoch10にしていますからこんなもんでしょう.
Epoch500,EarlyStopping20等にしとけば精度は結構あがるんじゃないかなぁとおもいます.
暇な人はやってみて~
さいごに
また詳しい解説を追記していきます.
ちょっとまってね🥰