LoginSignup
0
3

More than 1 year has passed since last update.

Kaggleに挑戦 DigitRecognizer🔢

Last updated at Posted at 2023-02-06

はじめに

KaggleのDigitRecognizerタスクの解説です.
全コードを載せているので,参考にしてください.
KaggeleのNotebook

IMPORT

必要なライブラリのImportです.

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
# Validation
from torchvision import datasets
from torch.utils.data.dataset import Subset

import csv
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd
import seaborn as sn 

from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold

import timm
import random

import pickle
plt.style.use('seaborn-darkgrid')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

ReadData

データを読み込みます.
画像ですが,csv形式のデータになっていて,28*28の画像が784で一列になっています.
また,TRAINの方は1つ目の値が正解ラベルとなっているため,POPして取り出しています.

TRAIN_DIR  = "/kaggle/input/digit-recognizer/train.csv"
TEST_DIR   = "/kaggle/input/digit-recognizer/test.csv"

TRAIN_DATA = pd.read_csv(TRAIN_DIR)
TEST_DATA  = pd.read_csv(TEST_DIR) 

TRAIN_DATA = TRAIN_DATA.values.tolist()
del TRAIN_DATA[0]

TEST_DATA = TEST_DATA.values.tolist()

TRAIN_Y = []
for i in range(len(TRAIN_DATA)):
    y = TRAIN_DATA[i][0]
    del TRAIN_DATA[i][0]
    TRAIN_Y.append(y)

TRAIN_X = np.array(TRAIN_DATA)
TRAIN_Y = np.array(TRAIN_Y)
TEST_X = np.array(TEST_DATA)

print(TRAIN_X.shape)
print(TRAIN_Y.shape)
print(TEST_X.shape)
#(41999, 784)
#(41999,)
#(28000, 784)

これをDeepに入れる形式にReshapeします
画像は([枚数,チャンネル数,タテ,ヨコ])の形にします.
チャンネル数とはRGBです.カラーなら3,モノクロなら1になります.

TRAIN_X = TRAIN_X.reshape([41999,1,28,28])
TEST_X = TEST_X.reshape([28000,1,28,28])
print(TRAIN_X.shape)
print(TRAIN_Y.shape)
print(TEST_X.shape)
#(41999, 1, 28, 28)
#(41999,)
#(28000, 1, 28, 28)

MODEL

Tensor型にして,Datasetに入れておきます.
今回はデータも多いので,BatchSize32にしています.

X_tensor = torch.Tensor(TRAIN_X)
y_tensor = torch.Tensor(TRAIN_Y)
dataset = TensorDataset(X_tensor, y_tensor)

X_tensor = torch.Tensor(TEST_X)
test_dataset = TensorDataset(X_tensor)

batch_size = 32
IMG_SIZE = 28

DataAugmentation用のTransform実装です.
また,シード値固定関数もここで定義しています.(Segmentation用に実装しただけなので今回はなくてもいいです)

image_transform = transforms.RandomOrder([
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomPosterize(bits=4),
])

share_transform = transforms.RandomOrder([
    transforms.RandomErasing(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.2)),
    transforms.RandomAffine(degrees=[-10, 10],translate=(0.2, 0.2)),
])

def fix_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

TRAINの処理です.DataAugmentationを入れていますが,データ数多いしなくても精度がガクッと下がるとかはなさそうです.

def train_epoch(model, optimizer, criterion, dataloader, device):
    train_loss = 0
    model.train()
    
    for i, (images, labels) in enumerate(dataloader):
        # DataAugmentation ------#
        ### FIX SEED
        seed = random.randint(0, 2**32)
        ### IMAGE
        images = images.to(torch.uint8)
        fix_seed(seed)
        images = share_transform(images)
        images = image_transform(images)
        images = images.to(torch.float32)
        #------------------------#
        labels = labels.type(torch.LongTensor) 
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        #outputs = torch.sigmoid(outputs)
        #outputs = F.softmax(outputs,dim=1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        del images,labels,loss,outputs
    train_loss = train_loss / len(dataloader.dataset)
    
    return train_loss

Validation処理です.

def validation(model, optimizer, criterion, dataloader, device):
    model.eval()
    val_loss=0
    
    with torch.no_grad():
        for i, (images, labels) in enumerate(dataloader):
            labels = labels.type(torch.LongTensor) 
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            #outputs = torch.sigmoid(outputs)
            #outputs = F.softmax(outputs,dim=1)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
        val_loss = val_loss / len(dataloader.dataset)
    return val_loss

EarlyStoppingの定義です.

class EarlyStopping:

    def __init__(self, patience=10, verbose=False, path='checkpoint_model.pth'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0  
        self.best_score = None 
        self.early_stop = False   
        self.val_loss_min = np.Inf   
        self.path = path            

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:  
            self.best_score = score 
            self.checkpoint(val_loss, model)  
        elif score < self.best_score:  
            self.counter += 1  
            if self.verbose:  
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')   
            if self.counter >= self.patience:  
                self.early_stop = True
        else:  
            self.best_score = score  
            self.checkpoint(val_loss, model)  
            self.counter = 0  

    def checkpoint(self, val_loss, model):
        if self.verbose:  
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)  
        self.val_loss_min = val_loss  

RUN部分です.

def run(num_epochs, optimizer, criterion, device, train_loader, val_loader,model):
    train_loss_list = []
    val_loss_list = []
    
    earlystopping = EarlyStopping(verbose=True)

    for epoch in range(num_epochs):
        train_loss = train_epoch(model, optimizer, criterion, train_loader, device)
        val_loss = validation(model, optimizer, criterion, val_loader, device)

        print(f'Epoch [{epoch+1}], train_Loss : {train_loss:.4f}')
        train_loss_list.append(train_loss)
        val_loss_list.append(val_loss)

        earlystopping(val_loss_list[-1], model)
        if earlystopping.early_stop: 
          print("Early Stopping!")
          break
      
    return train_loss_list, val_loss_list

Loss遷移グラフ出力関数です.

def graph(train_loss_list, val_loss_list):
  num_epochs=len(train_loss_list)
  fig, ax = plt.subplots(figsize=(4, 3), dpi=100)
  ax.plot(range(num_epochs), train_loss_list, c='b', label='train loss')
  ax.plot(range(num_epochs), val_loss_list, c='r', label='test loss')
  ax.set_xlabel('epoch', fontsize='10')
  ax.set_ylabel('loss', fontsize='10')
  ax.set_title('training and test loss', fontsize='10')
  ax.grid()
  ax.legend(fontsize='10')
  plt.show()

混同行列の出力関数です.

## CV ALL CONFUSION MATRIX
cv_y_true,cv_y_pred = [],[]

def print_confusion_matrix(test_loader,model):
    
    model.eval()
    y_true,y_pred = [],[]
    
    with torch.no_grad():
        for i, (images, labels) in enumerate(test_loader):
            labels = labels.type(torch.LongTensor) 
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            for nval in range(len(labels)):
                #y_true.append(torch.argmax(labels[nval]))
                y_true.append(labels[nval])
                y_pred.append(torch.argmax(outputs[nval]))
                

    for leny in range(len(y_true)):
        y_true[leny] = y_true[leny].item()
        y_pred[leny] = y_pred[leny].item()
    
    ## CV ALL CONFUSION MATRIX
    cv_y_true.append(y_true)
    cv_y_pred.append(y_pred)
    
    #target_names = ['0', '1']
    #cmx = confusion_matrix(y_true, y_pred)
    #df_cmx = pd.DataFrame(cmx, index=target_names, columns=target_names)
    #plt.figure(figsize = (6,3))
    #sn.heatmap(df_cmx, annot=True, annot_kws={"size": 18}, fmt="d", cmap='Blues')
    #plt.show()   
    
    print(classification_report(y_true, y_pred, target_names=target_names))
    print("accuracy: ", accuracy_score(y_true, y_pred))

層化分割交差検証の準備と,FOLD毎のLossを保存するリストを用意しておきます.

cv = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
fold_train_list = []
fold_val_list = []
fold_test_list = []

メイン部分です.
今回モデルは自分で書かずに,TIMMのEfficientNetV2を使用しています.
TIMMはImageNetで事前学習したモデルが手軽にダウンロードできます.様々なモデルが用意されているので,めちゃくちゃに使えます.
”PreTrained=False”にすることで事前学習していないモデルを使うことも可能です.後は,チャンネル数とクラス数を入れておきましょう.

for i,(train_index, test_index) in enumerate(cv.split(TRAIN_X,TRAIN_Y)):    

    # モデル指定
    TIMM = timm.create_model('tf_efficientnetv2_s_in21ft1k', pretrained=True, num_classes=10, in_chans=1)
    model = TIMM.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(),lr=0.0001)

    # train/test 分割
    cv_train_dataset = Subset(dataset, train_index)
    cv_val_dataset  = Subset(dataset, test_index)

    train_loader = DataLoader(cv_train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(cv_val_dataset, batch_size=batch_size, shuffle=True)

    # run
    print(f"***FOLD {i}")
    train_loss_list, val_loss_list = run(100, optimizer, criterion, device, train_loader, val_loader,model)
    model.load_state_dict(torch.load('checkpoint_model.pth'))
    
    # Model Save
    ModelSavePath='model'+str(i)+'.pth'
    torch.save(model.state_dict(), ModelSavePath)
    
    # PLOT
    graph(train_loss_list, val_loss_list)
    #print_confusion_matrix(val_loader,model)
    
    # 各実行の最後のLossを保存
    fold_train_list.append(train_loss_list[-1])
    fold_val_list.append(val_loss_list[-1])
    print("-----------------\n")

OUTPUT

出力の準備,学習したモデルを使ってテストデータに対する予測を行います.
5つのモデルそれぞれの結果を出して,Votingを行っています.
指定形式にして,csvとしてOUTPUTフォルダに保存すれば完成です.

test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
y_pred = []

for i in range(5):
    
    # Model Load
    ModelSavePath='model'+str(i)+'.pth'
    model.load_state_dict(torch.load(ModelSavePath))    
    model.eval()
    
    y_pred_tmp = []
    
    with torch.no_grad():
        for table_data in test_loader:
            table_data = table_data[0].clone().detach()
            table_data = table_data.to(device).detach()
            outputs = model(table_data)
            outputs = F.softmax(outputs,dim=1)
            for nval in range(len(outputs)):
                y_pred_tmp.append(outputs[nval])
        
    y_pred.append(y_pred_tmp)
y_pred_bote = []

for i in range(len(y_pred[0])):
    for j in range(5):
        tmp = [0]*10
        for k in range(10):
            tmp[k] += y_pred[j][i][k]
    #print(i)
    ArgMax = max(tmp)
    MaxIndex = tmp.index(ArgMax)
    y_pred_bote.append(MaxIndex)
output = [["ImageId","Label"]]
for i in range(len(y_pred_bote)):
    tmp = []
    tmp.append(i+1)
    tmp.append(y_pred_bote[i])
    output.append(tmp)

output_path = "/kaggle/working/submission.csv"

with open(output_path, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(output)

結果

結果は0.96となりました.
時間がかかるため,Epoch10にしていますからこんなもんでしょう.
Epoch500,EarlyStopping20等にしとけば精度は結構あがるんじゃないかなぁとおもいます.
暇な人はやってみて~:v:

さいごに

また詳しい解説を追記していきます.
ちょっとまってね🥰

0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3