0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PyTorch SSDで転移学習(1/2)

Last updated at Posted at 2025-03-08

PyTorchで実装された物体検出モデルSSD(Single Shot Multibox Detector)の転移学習をやってみました。転移学習(※)とは、学習済パラメータをベースに再学習させる手法のことで、少ない学習データで効率的にモデルを学習できます。

(※)厳密には、モデルの一部レイヤのみを再学習することが「転移学習」で、全レイヤを再学習するのは「ファインチューニング」

目次

長くなったので、記事を2つに分けました。本記事の内容は以下(★)です。

  1. 作成したいモデル(★)
  2. PyTorch環境構築(★)
  3. SSD実装(PyTorch)(★)
  4. 学習データ作成、学習実行
  5. 検知(推論)実行

次の記事は以下をご参照ください。

1.作成したいモデル

以下youtubeチャンネルで、高速道路とかの走行動画を公開しているのですが、動画に映る車のナンバープレートをぼかすのに結構手間がかかります。全部はとても無理で、目立つものだけを車ごと動画編集ソフトでぼかしているのが現状です。

なので、物体検出モデルを使って動画に映る車のナンバープレートを自動でぼかすアプリを作りたいと思い、転移学習で車のナンバープレートを検出するモデルを作成しようと思い立ちました。

上述、作成したいアプリ(ナンバープレートを自動でぼかすアプリ)から、どういうものを検出するモデルを作成すればよいかを以下に挙げました。

  • 車のナンバープレート領域を検出したい
    • 文字、数字の認識は不要
    • 文字、数字が視認できるものは、逃さずに検出したい
    • 文字、数字が不鮮明のものは、検出できればよいが、無理はしない

ただ、検出対象のナンバープレートは小さいので、SSDで検出できるのか半信半疑でした。そこで、未検出が多かったら代わりに車をぼかす、誤検出が多かったら車領域でフィルタリングする、等、補助的に使う想定で、車も検出対象に加えました。

  • 車の領域を検出したい
    • 車種の認識などは不要
    • ナンバープレートが装着されている前面、後面を検出したい
    • ナンバープレートが装着されていない側面は、検出できなくてもよい

2. PyTorch環境構築

以下のPCに環境構築しました。

  • CPU: AMD Ryzen 7 3700X
  • GPU: NVIDIA GeForce GTX 1660 SUPER
  • OS: Windows 11 Home (24H2) , WSL2 + Ubuntu24.04

WSL2 + Ubuntu24.04での環境(docker)は、以下記事で構築できます。昨年(2024)末に、以下方法で構築した環境でのpythonバージョン(本体、主なモジュール)は以下です。

  • Python: 3.12.3
  • PyTorch(torch): 2.5.1
  • nvidia-cudnn-cu12: 9.1.0.70
  • numpy: 2.2.1
  • opencv-python: 4.10.0.84

なお、Windows+Anaconda(or miniconda)で構築したい場合は、Anacondaプロンプト上で以下コマンドを順番に実行すれば構築できます。

Anaconda Powershellプロンプト
conda create -n pytorch_env python
conda activate pytorch_env

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

pip3 install --no-cache-dir jupyter ipykernel
pip3 install --no-cache-dir scikit-image matplotlib opencv-python tqdm scikit-learn labelImg
  • 「pytorch_env」は仮想環境名。任意の名前で可
  • 3行目のPyTorchインストールコマンドは、GPUやOSに合わせて以下サイトで調べる。ちなみに、現時点(2025.3.1)でpytorchはanacondaリポジトリから削除されているようなので、condaコマンドでのインストールは不可

3. SSD実装(PyTorch)

以下サイト(ひつじ工房様のgithub)から拝借させていただいたSSDモデル実装をベースに、一部変更を加えました。変更箇所を中心に説明します。

実装は、以下githubに公開しました。

ディレクトリ、ファイル構成は下表です。

ディレクトリ,ファイル 説明
utils/ SSDモデル実装等のソース(モデルはVGG16ベースのSSD300)
common_ssd.py 推論/学習共通部分のソース
train_ssd.py 学習実行ソース
predict_ssd.py 検知(推論)実行ソース
movie_player.py 動画(mp4)から学習用画像切り出しツール(ソース)
data/od_cars_org_F00000.jpg テストデータ(推論用画像)
data/od_cars_sample/ 学習データ(画像、正解xml) ※一部のみ抜粋
weights/ssd_best_od_cars.pth 学習済みSSDパラメータ(車、ナンバープレートを学習済)

実装の全体像のクラス図は下図です(主要クラスのみ。Debugクラス等は省略)。

soft_structure.png

3.1 SSDモデル本体、データローダー等(utils/ssd_model.py)

学習対象のSSDモデル本体や、モデルに学習データ(画像、正解データ)を供給するデータローダーは、一番重要な肝の部分です。ここは、上述ひつじ工房様のgithubから拝借したコードをほぼそのまま使用してます。変更を加えた点は以下になります。

  • (a)今回のpythonバージョンで発生するエラーやWarningの修正
  • (b)画像前処理のリサイズ(縮小)アルゴ変更

(a) 今回のpythonバージョンで発生するエラーやWarningの修正

今回構築した環境のpythonバージョン(3.12.3)に合わず、実行エラーやwarningが出た箇所があったので、その箇所の修正方法を以下示します。

pythonは、C言語等と比べて、こういうバージョン依存が比較的多い印象です。今回のコードも、今後、言語仕様等のバージョンUPに伴って新たなエラーが出てくる可能性があります。互換性を維持したバージョンUPをしてほしいというのが正直なところですが、やはり完全には難しいといったところでしょうか...

  • data_augumentation.py Line.246のエラー
 File "/home/simasaki/work/test_ssd/utils/data_augumentation.py", line 246, in __call__
    mode = random.choice(self.sample_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "numpy/random/mtrand.pyx", line 956, in numpy.random.mtrand.RandomState.choice
ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (6,) + inhomogeneous part.

以下修正(self.sample_optionsの型をタプル→numpy arrayに修正)
※左が修正前、右が修正後。以降も同様
err対策_data_augumentation_line246.png

  • ssd_model.py Line.557等でWarning発生
UserWarning: An output with one or more elements was resized since it had shape …

以下修正(参照: https://qiita.com/d_sato_/items/58eaaa659afe8ab4bfa7
err対策_ssd_model_line557.png

  • ssd_mode.py Line635等でエラー

以下修正(参照: https://hituji-ws.com/code/python/python-od2/
err対策_ssd_model_line635.png

  • ssd_mode.py Line809等でエラー

以下修正(参照: https://hituji-ws.com/code/python/python-od2/
err対策_ssd_model_line809.png

(b) 画像前処理のリサイズ(縮小)アルゴ変更

今回の検出対象であるナンバープレートは、中に文字や数値が書かれているのが一番の特徴であり、リサイズ(縮小)時に文字や数字ができるだけつぶれないようにしたかったので、縮小時には、Defaultのcv2.INTER_LINEAR(バイリニア補間)ではなく、cv2.INTER_AREAにしました。

これで文字がつぶれにくくなる旨は、以下記事にとてもわかりやすい例が載ってました。

data_augumentation.py 修正後コード(L.112~)
class Resize(object):
    def __init__(self, size=300):
        self.size = size

    def __call__(self, image, boxes=None, labels=None):
        img_h, img_w, _ = image.shape
        inter = cv2.INTER_LINEAR
        if img_w > self.size or img_h > self.size:
            # [縮小の場合] INTER_AREAで補間
            inter = cv2.INTER_AREA
        
        image = cv2.resize(image, (self.size,self.size), interpolation=inter)
        # image = cv2.resize(image, (self.size,self.size))
        return image, boxes, labels

3.2 学習実行(train_ssd.py)

2.1のSSDモデルやデータローダー等を組み合わせて、学習を実行するアルゴリズムを実装した部分です。ソース全体は以下です。

train_ssd.py(※Debugクラス(SSDModelTrainerDebug)は省略)
#!/usr/bin/env python3

# パッケージのimport
import os
import sys
import time
from typing import List
from io import TextIOWrapper
import datetime

from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import torchvision

from utils.ssd_model import SSD, MultiBoxLoss

from common_ssd import SSDModel, VocDataSetMng, Logger, makeVocClassesTxtFpath

# SSDモデル作成&学習
class SSDModelTrainer(SSDModel):

    def __init__(self, device:torch.device, voc_classes:List[str], freeze_layer:int):
        super().__init__(device, voc_classes)

        # SSDネットワークモデル
        self.net_ = SSD(phase="train", cfg=self.ssd_cfg_)

        # SSDのvggに、初期の重みを設定
        vgg_weights = torch.load("./weights/vgg16_reducedfc.pth", weights_only=True) # FutureWarning: You are using torch.load..対策
        self.net_.vgg.load_state_dict(vgg_weights)

        # VGGの重みをfreeze
        for idx,module in enumerate(self.net_.vgg):
            if idx <= freeze_layer:
                for param in module.parameters():
                    param.requires_grad = False

        # SSDのextras, loc, confには、Heの初期値を適用
        self.net_.extras.apply(SSDModelTrainer.initWeight)
        self.net_.loc.apply(SSDModelTrainer.initWeight)
        self.net_.conf.apply(SSDModelTrainer.initWeight)

        # ネットワークをDeviceへ
        self.net_.to(device)

        # 損失関数の設定
        self.criterion_ = MultiBoxLoss(jaccard_thresh=0.5, neg_pos=3, device=device)

        # 最適化手法(optimizer)の設定
        #self.optimizer_ = optim.SGD(self.net_.parameters(), lr=1e-3, momentum=0.9, weight_decay=5e-4)
        #self.optimizer_ = optim.Adam(self.net_.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
        self.optimizer_ = optim.AdamW(self.net_.parameters(), lr=0.001)

        return
    
    @staticmethod
    def initWeight(m):
        if isinstance(m, nn.Conv2d):
            init.kaiming_normal_(m.weight.data)
            if m.bias is not None:  # バイアス項がある場合
                nn.init.constant_(m.bias, 0.0)
        return

    def train(self, voc_dataset:VocDataSetMng, num_epochs:int, res_weight_fpath:str):
        
        dataloaders_dict = voc_dataset.dataloaders_dict_

        # debug
        debug_train = SSDModelTrainerDebug(True, 1)
        debug_train.openLogFile(self.device_.type, res_weight_fpath, "train_log.csv")

        # ネットワークがある程度固定であれば、高速化させる
        torch.backends.cudnn.benchmark = True

        # イテレーションカウンタをセット
        iteration        = 1
        epoch_train_loss = 0.0  # epochの損失和
        epoch_val_loss   = 0.0  # epochの損失和
        epoch_at_min_loss = 0   # val_loss最小時のepoch
        min_loss         = 9999.0
        #logs             = []

        # epochのループ
        for epoch in range(num_epochs+1):

            # 開始時刻を保存
            t_epoch_start = time.time()
            #t_iter_start  = time.time()

            iter_per_epoch_train = 0
            iter_per_epoch_val   = 0

            # epochごとの訓練と検証のループ
            for phase in ["train", "val"]:
                if phase == "train":
                    self.net_.train()
                else:
                    self.net_.eval()

                # データローダーからminibatchずつ取り出すループ
                with tqdm(dataloaders_dict[phase], desc=phase, file=sys.stdout) as iterator:
                    for batch_idx, (images, targets) in enumerate(iterator):
                        
                        # GPUが使えるならGPUにデータを送る
                        images  = images.to(self.device_)
                        targets = [ann.to(self.device_)
                                for ann in targets]  # リストの各要素のテンソルをGPUへ

                        # debug:入力画像ダンプ
                        debug_train.dumpInputImage(epoch, images, phase, batch_idx)

                        # optimizerを初期化
                        self.optimizer_.zero_grad()

                        # 順伝搬(forward)計算
                        with torch.set_grad_enabled(phase == "train"):
                            # 順伝搬(forward)計算
                            outputs = self.net_(images)

                            # 損失の計算
                            loss_l, loss_c = self.criterion_(outputs, targets)
                            loss = loss_l + loss_c

                            # 訓練時はバックプロパゲーション
                            if phase == "train":
                                loss.backward()
                                nn.utils.clip_grad_value_(self.net_.parameters(), clip_value=2.0)
                                self.optimizer_.step()
                                epoch_train_loss += loss.item()
                                iteration += 1
                                iter_per_epoch_train += 1
                            # 検証時
                            else:
                                epoch_val_loss += loss.item()
                                iter_per_epoch_val += 1

            # epochのphaseごとのlossと正解率(lossは、iteration数で正規化)
            t_epoch_finish = time.time()

            epoch_train_loss /= float(iter_per_epoch_train)
            epoch_val_loss   /= float(iter_per_epoch_val)

            print(f"epoch {epoch}/{num_epochs} {(t_epoch_finish - t_epoch_start):.4f}sec || train_Loss:{epoch_train_loss:.4f}(it:{iter_per_epoch_train}) val_Loss:{epoch_val_loss:.4f}(it:{iter_per_epoch_val})")

            # debug:ログ出力
            debug_train.outputLogEpochSummary(epoch, (t_epoch_finish - t_epoch_start), epoch_train_loss, epoch_val_loss, iter_per_epoch_train, iter_per_epoch_val)

            t_epoch_start  = time.time()

            # vallossが小さい、ネットワークを保存する
            if min_loss > epoch_val_loss:
                min_loss          = epoch_val_loss
                epoch_at_min_loss = epoch
                self.saveWeight(res_weight_fpath)

            epoch_train_loss = 0.0  # epochの損失和
            epoch_val_loss   = 0.0  # epochの損失和

        # debug
        debug_train.outputLogSummary(self.device_.type, voc_dataset.data_path_, epoch_at_min_loss, min_loss)
        debug_train.outputLogDataSetSummary(voc_dataset)
        debug_train.outputLogNetInfo(self.net_)
        debug_train.closeLogFile()

        return
    
    def saveWeight(self, weight_fpath:str):
        # ネットワーク重みをセーブ
        torch.save(self.net_.state_dict(), weight_fpath) 

        # クラス名リスト(voc_classes)をセーブ
        voc_classes_fpath = makeVocClassesTxtFpath(weight_fpath)

        voc_classes_fp = open(voc_classes_fpath,"w")
        if voc_classes_fp is not None:
            for cls_name in self.voc_classes_:
                voc_classes_fp.write(f"{cls_name}\n")
            voc_classes_fp.close()

        return

ここも、概ねひつじ工房様のgithubod_train.ipynbから拝借したソースですが、主に以下変更を行いました。

  • (a)モデルの入力層に近いレイヤのパラメータ更新freezeを追加
  • (b)optimizer変更

(a)モデルの入力層に近いレイヤのパラメータ更新freezeを追加

一般的に、モデルの入力層に近いレイヤは、大体どの検出対象でも見られる汎用的な局所特徴(エッジなど)を学習しており、検出対象への依存が比較的小さいとのことです。なので、ここを更新しない(freeze)ことで、精度を維持しながら学習時間の短縮を図れます。

入力層から指定レイヤ(freeze_layer)までパラメータ(重み)更新をfreezeするコードは以下になります。

train_ssd.py(SSDModelTrainer::__init__()抜粋)
# VGGの重みをfreeze
for idx,module in enumerate(self.net_.vgg):
    if idx <= freeze_layer:
        for param in module.parameters():
            param.requires_grad = False

今回は、色々試した結果、freeze_layer=5(以下[vgg5]までをfreeze)にしました。全レイヤ更新するのと同等の精度ながら、学習時間が約1/3に激減しました。入力層に近いレイヤは解像度が大きい(300x300)ので、ここの時間が支配的なようです。

なお、今回は、これより深いレイヤをfreezeすると、学習時間はさらに短縮できるものの、精度が全然でない(未検出、誤検出が多い)結果となってしまいました。このあたりは、検出対象に合わせて試行錯誤するところかと思います。

[vgg0] Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=False,False,
[vgg1] ReLU(inplace=True)
[vgg2] Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=False,False,
[vgg3] ReLU(inplace=True)
[vgg4] MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
[vgg5] Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=False,False,
[vgg6] ReLU(inplace=True)
[vgg7] Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg8] ReLU(inplace=True)
[vgg9] MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
[vgg10] Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg11] ReLU(inplace=True)
[vgg12] Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg13] ReLU(inplace=True)
[vgg14] Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg15] ReLU(inplace=True)
[vgg16] MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
[vgg17] Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg18] ReLU(inplace=True)
[vgg19] Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg20] ReLU(inplace=True)
[vgg21] Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg22] ReLU(inplace=True)
[vgg23] MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
[vgg24] Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg25] ReLU(inplace=True)
[vgg26] Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg27] ReLU(inplace=True)
[vgg28] Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[vgg29] ReLU(inplace=True)
[vgg30] MaxPool2d(kernel_size=3, stride=1, padding=1, dilation=1, ceil_mode=False)
[vgg31] Conv2d(512, 1024, kernel_size=(3, 3), stride=(1, 1), padding=(6, 6), dilation=(6, 6)) requires_grad=True,True,
[vgg32] ReLU(inplace=True)
[vgg33] Conv2d(1024, 1024, kernel_size=(1, 1), stride=(1, 1)) requires_grad=True,True,
[vgg34] ReLU(inplace=True)
[extras0] Conv2d(1024, 256, kernel_size=(1, 1), stride=(1, 1)) requires_grad=True,True,
[extras1] Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)) requires_grad=True,True,
[extras2] Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1)) requires_grad=True,True,
[extras3] Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)) requires_grad=True,True,
[extras4] Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1)) requires_grad=True,True,
[extras5] Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1)) requires_grad=True,True,
[extras6] Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1)) requires_grad=True,True,
[extras7] Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1)) requires_grad=True,True,
[loc0] Conv2d(512, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[loc1] Conv2d(1024, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[loc2] Conv2d(512, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[loc3] Conv2d(256, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[loc4] Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[loc5] Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf0] Conv2d(512, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf1] Conv2d(1024, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf2] Conv2d(512, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf3] Conv2d(256, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf4] Conv2d(256, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,
[conf5] Conv2d(256, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) requires_grad=True,True,

(b)optimizer変更

optimizerとは、パラメータ更新を行うモジュールです。

モデルの学習は、以下アルゴ(勾配法)で行われているのですが、処理4を担当するのが、optimizerです。

  1. 現在のモデルパラメータで検知(推論)実行(順伝播forward)
  2. 誤差(損失)算出
  3. 誤差を減らす方向(=勾配)を算出(逆伝播backward)
  4. 勾配に基づいてモデルパラメータ(重み)を更新(optimizer)
  5. 1~4を繰り返し

元々の実装では、SGD(Stochastic Gradient Descent : 確率的勾配降下法)が設定されていました。これは、勾配の定数倍(学習率)を適用する、というシンプルなoptimizerながら、今でも多くの場面で活用されている、有用性が高いoptimizerです。

ただ、今回の学習では、正解付け(アノテーション)に何かまずい点があるのか、全然安定しなかったので、かなり手の込んだoptimizerであるAdam(AdamW)を適用しました。これに変更すると、かなり安定感が増しました。正解付けを改善したほうがいいような気はしますが、どうやって改善するのかがわからなかったので、今回はoptimizerのほうに手を入れました。

optimizerの設定コードは以下です。有名どころのoptimizer実装はPyTorchにクラス化されているので、実装クラスのコンストラクタ(optim.AdamW)を呼ぶだけです。

train_ssd.py(SSDModelTrainer::__init__()抜粋)
# 最適化手法(optimizer)の設定
#self.optimizer_ = optim.SGD(self.net_.parameters(), lr=1e-3, momentum=0.9, weight_decay=5e-4)
#self.optimizer_ = optim.Adam(self.net_.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
self.optimizer_ = optim.AdamW(self.net_.parameters(), lr=0.001)

3.3 検知(推論)実行(predict_ssd.py)

学習したモデルで、検知(推論)を実行するアルゴリズムを実装した部分です。ソース全体は以下です。

predict_ssd.py(※Debugクラス等は省略)
#!/usr/bin/env python3

import sys
import cv2
import os
import time
import glob
from typing import List,Tuple,Any

from utils.ssd_model import SSD, DataTransform, VOCDataset, Anno_xml2list, nm_suppression
import numpy as np
import torch

from common_ssd import ImageProc, SSDModel, DetResult, AnnoData, DrawPen, Logger, makeVocClassesTxtFpath

# SSDモデル作成&推論
class SSDModelDetector(SSDModel):

    def __init__(self, device:torch.device, weight_fpath:str):
        # 学習済み重みをロード
        (net_weights, voc_classes) = self.loadWeight(weight_fpath, device)

        super().__init__(device, voc_classes)

        # SSDネットワークモデル
        self.net_ = SSD(phase="inference", cfg=self.ssd_cfg_)
        self.net_.load_state_dict(net_weights)

        # ネットワークをDeviceへ
        self.net_.to(self.device_)
        return
    
    def loadWeight(self, weight_fpath:str, device:torch.device) -> Tuple[Any, List[str]]:
        # ネットワーク重みをロード
        net_weights = torch.load(weight_fpath, weights_only=True, map_location=device) # FutureWarning: You are using torch.load..対策
        # net_weights = torch.load(weight_fpath, weights_only=True) # FutureWarning: You are using torch.load..対策

        # クラス名リスト(voc_classes)をロード
        voc_classes:List[str] = []

        voc_classes_fpath = makeVocClassesTxtFpath(weight_fpath)
        voc_classes_fp = open(voc_classes_fpath,"r")
        if voc_classes_fp is not None:
            voc_classes = [name.strip() for name in voc_classes_fp.readlines()]
            voc_classes_fp.close()

        return (net_weights, voc_classes)

    def predict(self, img_procs:List[ImageProc], img_org:np.ndarray, data_confidence_level:float=0.5, overlap:float=0.45) -> List[DetResult]:
        
        # 複数範囲の画像バッチ化
        transform = DataTransform(self.input_size_, self.color_mean_)

        imgs_trans:List[np.ndarray] = []
        for img_proc in img_procs:
            # 検出範囲切り出し
            img_det = img_proc.clip(img_org)
            # 画像を前処理
            (img_trans, _ ,_) = transform(img_det, "val", "", "")
            imgs_trans.append(img_trans[:, :, (2, 1, 0)]) # [h,w,ch(BGR→RGB)]
        
        imgs_trans_np = np.array(imgs_trans)                         # [batch_num, h, w, ch(RGB)]
        img_batch = torch.from_numpy(imgs_trans_np).permute(0,3,1,2) # [batch_num, ch(RGB), h, w]
        img_batch = img_batch.to(self.device_)
        # print(f"img_batch = {img_batch.shape}")

        # 推論実行
        torch.backends.cudnn.benchmark = True
        self.net_.eval()
        outputs = self.net_(img_batch)

        # 結果取得
        outputs    = outputs.cpu().detach().numpy()
        find_index = np.where(outputs[:, :, :, 0] >= data_confidence_level) # (batch_num, label, top)
        outputs    = outputs[find_index]

        det_results:List[DetResult] = []

        for i in range(len(find_index[1])):  # 抽出した物体数分ループを回す

            area_no  = find_index[0][i] # 検出領域index (batch index)
            label_no = find_index[1][i] # ラベル番号

            if label_no > 0:  
                # [背景クラスでない場合] 結果を取得

                # 確信度conf
                sc = outputs[i][0]
                # クラス名
                cls_name = self.voc_classes_[label_no-1]
                # Bounding Box: 切り出し前の画像上での座標値に変換
                bb_i = img_procs[area_no].convBBox( outputs[i][1:] )

                det_results.append(DetResult(cls_name, bb_i, sc))

        if len(img_procs) > 1:
            # [検出領域が複数の場合] 重複領域での重複枠を取り除く
            det_results = self.nmSuppression(det_results, overlap)

        return det_results

    def nmSuppression(self, det_results:List[DetResult], iou:float=0.45) -> List[DetResult]:
        det_results_sup:List[DetResult] = []

        for cls_name in self.voc_classes_:
            det_results_cls = [x for x in det_results if x.class_name_ == cls_name]

            if len(det_results_cls) > 0:
                bb_i_cls  = torch.from_numpy( np.array([x.bbox_  for x in det_results_cls]) )
                score_cls = torch.from_numpy( np.array([x.score_ for x in det_results_cls]) )

                (ids, count) = nm_suppression(bb_i_cls, score_cls, overlap=iou)
                
                bb_i_cls_sup  = bb_i_cls[ids[:count]].cpu().detach().numpy()
                score_cls_sup = score_cls[ids[:count]].cpu().detach().numpy()

                for bb_i, score in zip(bb_i_cls_sup, score_cls_sup):
                    det_results_sup.append(DetResult(cls_name, bb_i, score))

        return det_results_sup

ここは、以下サイト(「PyTorchによる発展ディープラーニング」著者 小川雄太郎様のgithub)の2-8_SSD_inference.ipynbから拝借したコードをベースにしております。

主な変更点は以下1点だけですが、これを実現するために、ソースの見た目はだいぶ変わってます。

  • (a)検出範囲の細分化

(a)検出範囲の細分化

入力画像は、今回、自分が保有するドライブレコーダー動画の画像を想定しており、解像度は1280x720です。一方で、SSDモデルに入力できるのは300x300です。なので、画像全体を検出範囲にしてしまうと、画像前処理でかなりきつい縮小(1280x720 → 300x300)がかかってしまいます。最近のドラレコはFHD(1920x1080)以上が当たり前ですが、そうなると尚更です。

ここまできつい縮小がかかってしまうと、今回の検出対象であるナンバープレートがつぶれてしまい、まともな検知ができません。そこで、1枚の画像の中から矩形領域を複数切り出し、それらに対して検知(推論)を行うアルゴリズム(以下)にしました。検出対象が他のものでも、使えるかと思います。

  • (1) 検出範囲の切り出し
  • (2) 検知(推論)実行
  • (3) 結果(枠)の座標変換、重複枠削除

検知アルゴ.png

(1) 検出範囲の切り出し

入力画像から、1枚の画像の中から矩形領域を複数切り出します。
1つの矩形領域を切り出すコードは以下です。pythonだとスライスを使って1行で書けてしまいます。

common_ssd.py ImageProc::clip()から抜粋(変数名は簡略表記)
# 矩形領域{(lu_x, lu_y) - (rb_x, rb_y)}を切り出し
img[lu_y:rb_y, lu_x:rb_x]

今回の検出対象はナンバープレートと車なので、車やナンバープレートがありそうな領域のみを切り出しました(上図の緑枠)。ちょうど範囲の切れ目に検出対象がいても未検出にならないよう、範囲に重なりを持たせてます。

切り出した画像に、画像前処理(300x300に縮小等)を施します。画像前処理は、utils/ssd_model.py DataTransformで行ってます。前出の「(b)画像前処理のリサイズ(縮小)アルゴ変更」は、この中で実行されます。

参考までに、前処理した画像(例)は下図です。リサイズだけでなく、色の引き算も入っているため変色してます。引き算だけでなく色の階調を減らす処理も入っているように見えますが、該当コードを見つけることができませんでした。

画像前処理例.png

切り出して前処理を施した複数画像をbatch化(1まとめ)する処理も行います。
1枚1枚を順にSSDモデルに入力&実行してもよいのですが、SSDモデルは複数画像をまとめて実行(batch実行)できるよう実装されているので、これを活用して処理を高速化するのが目的です。

検出範囲切り出し~batch化を行うコードは以下です。

predict_ssd.py SSDModelDetector::predict()から抜粋
# 前処理を行うクラス(DataTransform)のインスタンス作成
transform = DataTransform(self.input_size_, self.color_mean_)

imgs_trans:List[np.ndarray] = []
for img_proc in img_procs:
    # 検出範囲切り出し
    img_det = img_proc.clip(img_org)
    # 画像前処理
    (img_trans, _ ,_) = transform(img_det, "val", "", "")
    # batch化(リストに追加(SSDモデルにあうようデータ配置組み替えもあわせて実施))
    imgs_trans.append(img_trans[:, :, (2, 1, 0)]) # [h,w,ch(BGR→RGB)]

# batch化(リスト→torchテンソル型に変換(SSDモデルにあうようデータ配置組み替えもあわせて実施))
imgs_trans_np = np.array(imgs_trans)                         # [batch_num, h, w, ch(RGB)]
img_batch = torch.from_numpy(imgs_trans_np).permute(0,3,1,2) # [batch_num, ch(RGB), h, w]

# 入力画像をデバイス(GPU or CPU)に送る
img_batch = img_batch.to(self.device_)

(2) 検知(推論)実行

ここは、特に変更点なしです。コードは以下です。

predict_ssd.py SSDModelDetector::predict()から抜粋
# 推論実行
torch.backends.cudnn.benchmark = True
self.net_.eval()
outputs = self.net_(img_batch)

(3) 結果(枠)の座標変換、重複枠削除

SSDモデルの出力は、検出対象の物体を囲む外接矩形です。座標は、検出範囲内での座標になっているので、それを元の入力画像の座標に変換します。
加えて、今回、検出範囲に重なりを持たせており、重なっている領域内に検出対象がある場合、同じ物体に対して検出結果が複数検出されるため(下図例)、それを1つにする処理(重複枠削除)も行いました。

重複枠の例.png

まず、検出範囲1つ分の外接矩形の座標(左上/右下のxy座標)を、元の入力画像の座標に変換するコードです。SSDモデル出力の座標は、0~1に正規化されているので、検出範囲の幅、高さを掛けて、左上座標を加えれば変換できます。

common_ssd.py ImageProc::convBBox()から抜粋
def convBBox(self, bbox:np.ndarray) -> np.ndarray:
    area_w = self.img_w_
    area_h = self.img_h_
    if self.is_no_proc_ == False:
        area_w = self.darea_w_
        area_h = self.darea_h_

    # 検出範囲の幅、高さを掛ける
    bbox = bbox * [area_w, area_h, area_w, area_h] 
    
    # 検出範囲の左上座標を加える
    bb_f = np.array([bbox[0] + float(self.darea_lu_x_), 
                     bbox[1] + float(self.darea_lu_y_), 
                     bbox[2] + float(self.darea_lu_x_), 
                     bbox[3] + float(self.darea_lu_y_)])
    bb_i = bb_f.astype(np.int64)
    return bb_i

上記を複数の検出範囲(batch)に対して行うコードは以下です。

predict_ssd.py SSDModelDetector::predict()から抜粋
# SSDモデルの出力を閾値処理(確信度confが閾値以上の結果を取り出し)
outputs    = outputs.cpu().detach().numpy() # (batch_num, label, top200, [conf,xmin,ymin,xmax,ymax])
find_index = np.where(outputs[:, :, :, 0] >= data_confidence_level) # (batch_num, label, top200)
outputs    = outputs[find_index]

# 抽出した物体数分ループを回す
det_results:List[DetResult] = []

for i in range(len(find_index[1])):  

    area_no  = find_index[0][i] # 検出範囲index (batch index)
    label_no = find_index[1][i] # ラベル(クラス)番号

    if label_no > 0:  
        # [背景クラスでない場合] 結果を取得

        # 確信度conf
        sc = outputs[i][0]
        # クラス名
        cls_name = self.voc_classes_[label_no-1]
        # Bounding Box: 入力画像上での座標値に変換
        bb_i = img_procs[area_no].convBBox( outputs[i][1:] )

        det_results.append(DetResult(cls_name, bb_i, sc))

検出範囲(batch)のループをまわせばいいと思ってしまいますが、SSDモデルの出力(※)が検出範囲毎には並んでおらず、物体毎に並んでいます。そのため、物体毎のループをまわし、個々の物体毎に、どの検出範囲の物体かを出力outputs、find_indexから取り出します。

(※)正確には、SSDモデルの出力を閾値処理(確信度confが閾値以上の結果を取り出し)した結果が、検出範囲毎に並んでません。

outputsfind_indexのデータ構造は下図です。
predict_出力データ構造.png

outputs赤枠、赤字)は、行方向に物体、列方向に確信度confと外接矩形の座標が並んでます。
find_index黄色枠、黄色字)は、列方向に物体、行方向に検出範囲indexとラベル(クラス)番号が並んでます(3行目は不明)。

なので、検出した物体i(i番目の物体)の、検出範囲index、ラベル(クラス)、確信度conf、外接矩形(BoundingBox)は、以下のように取り出せます。

  • 物体iの検出範囲index: find_index[0][i]
  • 物体iのラベル(クラス)番号: find_index[1][i]
  • 物体iの確信度conf:outputs[i][0]
  • 物体iの外接矩形(左上/右下のxy座標): outputs[i][1:]

次に、重複枠削除のコードです。

predict_ssd.py SSDModelDetector::nmSuppression()から抜粋
def nmSuppression(self, det_results:List[DetResult], iou:float=0.45) -> List[DetResult]:
    # 重複枠の削除
    #   引数iou以上の重なりがある枠が存在する場合、一番確信度が高い枠のみ残し、それ以外を削除する
    #   異なるクラスの枠同士が重なる場合は対象外
    det_results_sup:List[DetResult] = []

    for cls_name in self.voc_classes_:
        det_results_cls = [x for x in det_results if x.class_name_ == cls_name]

        if len(det_results_cls) > 0:
            bb_i_cls  = torch.from_numpy( np.array([x.bbox_  for x in det_results_cls]) )
            score_cls = torch.from_numpy( np.array([x.score_ for x in det_results_cls]) )

            (ids, count) = nm_suppression(bb_i_cls, score_cls, overlap=iou)
            
            bb_i_cls_sup  = bb_i_cls[ids[:count]].cpu().detach().numpy()
            score_cls_sup = score_cls[ids[:count]].cpu().detach().numpy()

            for bb_i, score in zip(bb_i_cls_sup, score_cls_sup):
                det_results_sup.append(DetResult(cls_name, bb_i, score))

    return det_results_sup

これは、SSDモデル内部でも同様の処理(Non-Maximum Suppression)を行っており、utils/ssd_model.pyに関数実装(nm_suppression())があるので、それを使ってます。上記コードは、入出力データをnm_suppression()に合わせるためのラッパー関数です。


記事がだいぶ長くなってしまったので、本記事はここまでとさせていただきました。
続きは以下を参照ください。
以下記事では、上述3章の実装を使って、実際に、学習/検知(推論)を行う方法を掲載しています。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?