340
359

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pytorch Template 個人的ベストプラクティス(解説付き)

Last updated at Posted at 2021-10-17

はじめに

  • Pytorchでコードを書き始めるとき、乱数固定やデータローダー、モデルの訓練や学習結果の取得等、毎度色々なサイトを参照するのは面倒だと思い、現時点の個人的ベストプラクティス・テンプレートを作成してみました。
  • 今後のバージョンアップや便利なライブラリの登場で変わるかもしれませんげ、現在はこれで落ち着いています。
  • 個人的な備忘録も兼ねて、前半に簡単な解説付きのコードと最後に全コードを載せています。
  • もっと便利な書き方やライブラリなどあれば、コメントいただけると嬉しいです。

テンプレート(解説付き)

1. ライブラリインポートと初期設定

  • torchやよく利用するライブラリ(numpy, matplotlib)のインポート
  • モデルの訓練時(for文)の進捗を表示するtqdmライブラリ(jupyter notebookとコマンドライン版)
    • 進捗表示は待ち時間の見積もりやエラーに気づくことができるので基本的には設定する
  • torch.deviceではGPUが利用できれば指定(GPU利用ができない場合はCPUを指定)
  • fix_seedとworker_init_fnは乱数固定用の関数
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import random
import numpy as np
import matplotlib.pyplot as plt

# from tqdm import tqdm  #コマンドラインで実行するとき
from tqdm.notebook import tqdm  # jupyter で実行するとき

# リソースの指定(CPU/GPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 乱数シード固定(再現性の担保)
def fix_seed(seed):
    # random
    random.seed(seed)
    # numpy
    np.random.seed(seed)
    # pytorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed = 42
fix_seed(seed)

# データローダーのサブプロセスの乱数seedが固定
def worker_init_fn(worker_id):
    np.random.seed(np.random.get_state()[1][0] + worker_id)

2. データ準備(データローダー)

  • Datasetクラスではデータの前処理などを定義し、学習データをデータローダーとして準備
    • Datasetクラスは"def len()"と"def getitem()"が必須である点に注意する
    • 他は自由にメソッドを定義できるので、前処理用の関数などを定義できる
  • データローダーは高速化のために、num_workersやpin_memoryを設定
# データセットの作成
class Mydataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        feature = self.X[index]
        label = self.y[index]
        # 前処理などを書く -----

        # --------------------
        return feature, label

train_dataset = Mydataset(train_X, train_y)
test_dataset = Mydataset(test_X, test_y)


# データローダーの作成
train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=16,  # バッチサイズ
                                           shuffle=True,  # データシャッフル
                                           num_workers=2,  # 高速化
                                           pin_memory=True,  # 高速化
                                           worker_init_fn=worker_init_fn
                                           )
test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=16,
                                          shuffle=False,
                                          num_workers=2,
                                          pin_memory=True,
                                          worker_init_fn=worker_init_fn
                                          )

3. モデルの設計・学習準備

  • モデルクラスを定義
    • Convolution-Batch Normalization-Reluの様な頻発するレイヤの組み合わせはSequentialでまとめると見通しが良くなる
    • forwardで順伝播を定義する際は(ネットワークが深くない限りは)1処理ごと書いておくと見直しやすい
  • 損失関数・最適化アルゴリスムは要件に応じて選択
  • モデル訓練用の関数を定義
    • データをバッチごとの取得して、誤差計算・逆伝播の処理を定義する
    • バッチごろのロスを取得しておき、最終的にバッチ平均したロスを出力する(ロスの下がり具合を確認用)
    • テストパート側のループ処理も訓練パート側とほぼ同様だが、model.eval() で評価モードにしておくことを忘れない(pytorchのメソッドによっては訓練モードと評価モードで挙動が異なるものが存在する)
# モデルの定義
class Mymodel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(3, 16, 3, 2, 1),
                                   nn.BatchNorm2d(16),
                                   nn.ReLU())
        self.conv2 = nn.Sequential(nn.Conv2d(16, 64, 3, 2, 1),
                                   nn.BatchNorm2d(64),
                                   nn.ReLU())

        self.fc1 = nn.Linear(2 * 2 * 64, 100)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = torch.nn.Linear(100, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# モデル・損失関数・最適化アルゴリスムの設定
model = Mymodel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)


# モデル訓練関数
def train_model(model, train_loader, test_loader):
    # Train loop ----------------------------
    model.train()  # 学習モードをオン
    train_batch_loss = []
    for data, label in train_loader:
        # GPUへの転送
        data, label = data.to(device), label.to(device)
        # 1. 勾配リセット
        optimizer.zero_grad()
        # 2. 推論
        output = model(data)
        # 3. 誤差計算
        loss = criterion(output, label)
        # 4. 誤差逆伝播
        loss.backward()
        # 5. パラメータ更新
        optimizer.step()
        # train_lossの取得
        train_batch_loss.append(loss.item())

    # Test(val) loop ----------------------------
    model.eval()  # 学習モードをオフ
    test_batch_loss = []
    with torch.no_grad():  # 勾配を計算なし
        for data, label in test_loader:
            data, label = data.to(device), label.to(device)
            output = model(data)
            loss = criterion(output, label)
            test_batch_loss.append(loss.item())

    return model, np.mean(train_batch_loss), np.mean(test_batch_loss)

4. モデル訓練の実行

  • 先ほど定義した訓練関数を任意の回数(epoch)で繰り返し、モデルを学習
  • 学習状況(ロスの下がり具合)をグラフで可視化
# 訓練の実行
epoch = 100
train_loss = []
test_loss = []

for epoch in tqdm(range(epoch)):
    model, train_l, test_l = train_model(model)
    train_loss.append(train_l)
    test_loss.append(test_loss)
    # 10エポックごとにロスを表示
    if epoch % 10 == 0:
        print("Train loss: {a:.3f}, Test loss: {b:.3f}".format(a=train_loss[-1], b = test_loss[-1]))

# 学習状況(ロス)の確認
plt.plot(train_loss, label='train_loss')
plt.plot(test_loss, label='test_loss')
plt.legend()

5. モデルの評価

  • 学習済みモデルを用いて、データローダーから予測値と正解値を取得
  • preds, labelsに予測結果がnumpy形式で入っているので評価実施
# 学習済みモデルから予測結果と正解値を取得
def retrieve_result(model, dataloader):
    model.eval()
    preds = []
    labels = []
    # Retreive prediction and labels
    with torch.no_grad():
        for data, label in dataloader:
            data, label = data.to(device), label.to(device)
            output = model(data)
            # Collect data
            preds.append(output)
            labels.append(label)
    # Flatten
    preds = torch.cat(preds, axis=0)
    labels = torch.cat(labels, axis=0)
    # Returns as numpy (CPU環境の場合は不要)
    preds = preds.cpu().detach().numpy()
    labels = labels.cpu().detach().numpy()
    return preds, labels


# 予測結果と正解値を取得
preds, labels = retrieve_result(model, test_loader)

6. その他

  • 学習したモデルを保存・ロード
  • モデルのサマリー(パラメーター数など)
  • 学習済みモデルをロードし、推論に利用する際は、model.eval()を忘れずに実行する
    (実行しないとdrop-outやbatch-norm layerが訓練モードのままで再現性が担保されない)
# 学習済みモデルの保存・ロード
path_saved_model = "./saved_model"
# モデルの保存
torch.save(model.state_dict(), path_saved_model)
# モデルのロード
model = Mymodel()
model.load_state_dict(torch.load(path_saved_model))
model.eval()


# Model summary
from torchsummary import summary
model = model().to(device)
summary(model, input_size=(1, 50, 50))

テンプレート(全コード)

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import random
import numpy as np
import matplotlib.pyplot as plt

# from tqdm import tqdm  #コマンドラインで実行するとき
from tqdm.notebook import tqdm  # jupyter で実行するとき

# リソースの選択(CPU/GPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 乱数シード固定(再現性の担保)
def fix_seed(seed):
    # random
    random.seed(seed)
    # numpy
    np.random.seed(seed)
    # pytorch
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed = 42
fix_seed(seed)

# データローダーのサブプロセスの乱数のseedが固定
def worker_init_fn(worker_id):
    np.random.seed(np.random.get_state()[1][0] + worker_id)


# Data preprocessing ----------------------------------------------------------

# データセットの作成
class Mydataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        feature = self.X[index]
        label = self.y[index]
        # 前処理などを書く -----

        # --------------------
        return feature, label

train_dataset = Mydataset(train_X, train_y)
test_dataset = Mydataset(test_X, test_y)


# データローダーの作成
train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=16,  # バッチサイズ
                                           shuffle=True,  # データシャッフル
                                           num_workers=2,  # 高速化
                                           pin_memory=True,  # 高速化
                                           worker_init_fn=worker_init_fn
                                           )
test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=16,
                                          shuffle=False,
                                          num_workers=2,
                                          pin_memory=True,
                                          worker_init_fn=worker_init_fn
                                          )


# Modeling --------------------------------------------------------------------

# モデルの定義
class Mymodel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = torch.nn.Sequential(nn.Conv2d(3, 16, 3, 2, 1),
                                         nn.BatchNorm2d(16),
                                         nn.ReLU())
        self.conv2 = torch.nn.Sequential(nn.Conv2d(16, 64, 3, 2, 1),
                                         nn.BatchNorm2d(64),
                                         nn.ReLU())

        self.fc1 = nn.Linear(2 * 2 * 64, 100)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = torch.nn.Linear(100, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# モデル・損失関数・最適化アルゴリスムの設定
model = Mymodel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)


# モデル訓練関数
def train_model(model, train_loader, test_loader):
    # Train loop ----------------------------
    model.train()  # 学習モードをオン
    train_batch_loss = []
    for data, label in train_loader:
        # GPUへの転送
        data, label = data.to(device), label.to(device)
        # 1. 勾配リセット
        optimizer.zero_grad()
        # 2. 推論
        output = model(data)
        # 3. 誤差計算
        loss = criterion(output, label)
        # 4. 誤差逆伝播
        loss.backward()
        # 5. パラメータ更新
        optimizer.step()
        # train_lossの取得
        train_batch_loss.append(loss.item())

    # Test(val) loop ----------------------------
    model.eval()  # 学習モードをオフ
    test_batch_loss = []
    with torch.no_grad():  # 勾配を計算なし
        for data, label in test_loader:
            data, label = data.to(device), label.to(device)
            output = model(data)
            loss = criterion(output, label)
            test_batch_loss.append(loss.item())

    return model, np.mean(train_batch_loss), np.mean(test_batch_loss)


# 訓練の実行
epoch = 100
train_loss = []
test_loss = []

for epoch in tqdm(range(epoch)):
    model, train_l, test_l = train_model(model)
    train_loss.append(train_l)
    test_loss.append(test_loss)


# 学習状況(ロス)の確認
plt.plot(train_loss, label='train_loss')
plt.plot(test_loss, label='test_loss')
plt.legend()

# Evaluation ----------------------------------------------------------------

# 学習済みモデルから予測結果と正解値を取得
def retrieve_result(model, dataloader):
    model.eval()
    preds = []
    labels = []
    # Retreive prediction and labels
    with torch.no_grad():
        for data, label in dataloader:
            data, label = data.to(device), label.to(device)
            output = model(data)
            # Collect data
            preds.append(output)
            labels.append(label)
    # Flatten
    preds = torch.cat(preds, axis=0)
    labels = torch.cat(labels, axis=0)
    # Returns as numpy (CPU環境の場合は不要)
    preds = preds.cpu().detach().numpy()
    labels = labels.cpu().detach().numpy()
    return preds, labels


# 予測結果と正解値を取得
preds, labels = retrieve_result(model, test_loader)


# Other ----------------------------------------------------------------------

# 学習済みモデルの保存・ロード
path_saved_model = "./saved_model"

# モデルの保存
torch.save(model.state_dict(), path_saved_model)
# モデルのロード
model = Mymodel()
model.load_state_dict(torch.load(path_saved_model))


# Model summary
from torchsummary import summary
model = model().to(device)
summary(model, input_size=(1, 50, 50))
340
359
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
340
359

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?