LoginSignup
7
5

More than 3 years have passed since last update.

TensorFlow Object Detection APIで物体検出モデルを簡易トレーニング

Last updated at Posted at 2021-01-01

手軽に物体検出モデルをトレーニング

Colabサンプル
事前トレーニング済みモデルのファインチューニングで、
新たなオブジェクトを検出できるようになります。
トレーニング後はモデルの保存、復元もできます。
事前トレーニングしたモデルの最後の層の転移学習です。

duckies_test.gif

kites_detections_output.jpg
ダウンロード.png

手順(Colabリンクでも実行できます)

0.TensorFlow2をインストール

!pip install -U --pre tensorflow=="2.2.0"

1.リポジトリのクローン


import os
import pathlib

if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  git clone --depth 1 https://github.com/tensorflow/models

2.Object Detection APIをインストール


%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install .

3.モジュールのインポート


import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.builders import model_builder

%matplotlib inline

4.画像読み込み関数

画像を Numpy array にします。

def load_image_into_numpy_array(path):
  """画像ファイルをNumpy配列にする.

    TensorFlowのグラフに食わせるために画像をNumpy配列に。
  慣例として(高さ、幅、カラーチャネル)形状のNumpy配列にする。

  引数:
    path: 画像ファイルのパス.

  戻り値:
    uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 
  """
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

5.結果を視覚化する関数


def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
  """検出結果を視覚化するAPI機能のラッパー関数.

  引数:
    image_np: uint8、(高さ, 幅, 3チャネル)形状のnumpy配列。 
    boxes: [数, 4]形状のnumpy配列。
    classes: [数]形状のNumpy配列。 注:クラスのインデックスは「1」からはじまり(0ではなく)
      ラベルマップのキーの数と一致する。
    scores: [数]形状のNumpy配列もしくはNone。  scores=Noneの場合, then
      この関数はプロットするボックスをグランドトゥルース(完全な正解)のボックスとして扱い、全てのボックスをクラスとスコアなしの
    黒のボックスとしてプロットする。
    category_index: カテゴリインデックスでキー付されたカテゴリ・ディクショナリ
    (それぞれがカテゴリ・インデックス:idとカテゴリ名:nameを持つ)を含む辞書。
    figsize: 表示するサイズ。
    image_name: 画像ファイルの名前。
  """
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8)
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)

6.画像とラベルマップ、アノテーション・データを用意

必要なものは以下。
1、画像パスの配列
2、ラベルマップ(どのIDにどのラベル名が対応するかの辞書)
3、ラベルIDの配列
4、バウンディング・ボックスの配列

<例>


 # 画像パスの配列
train_image_filenames = [
     './datasets/train_images/train_image0001.jpg',
     './datasets/train_images/train_image0002.jpg'
     ]
 # ラベルマップ idは1から
category_index = {
    1: {'id': 1, 'name': 'cat'},
    2: {'id': 2, 'name': 'dog'}
    }

 # クラス数
num_classes = 2

 # ラベルIDの配列 
gt_labels = [
   np.array([1,1]),
   np.array([1,2,2])
   ]

 # バウンディング・ボックス[ miny, minx, maxy, maxx]のnumpy配列 
gt_boxes = [
    np.array([[0.436, 0.591, 0.629, 0.712],[0.539, 0.583, 0.73, 0.71]], dtype=np.float32),
    np.array([[0.464, 0.414, 0.626, 0.548],[0.313, 0.308, 0.648, 0.526],[0.256, 0.444, 0.484, 0.629]], dtype=np.float32)
   ]


<要件>
画像はモデルの入力サイズにリサイズしておく必要があります。
画像・ラベル・ボックスの、配列内インデックスが一致している必要があります。

<参考記事>

画像を一括リサイズ

7、画像を numpy array に


train_image_dir = 'models/research/object_detection/test_images/ducky/train/' # 1、画像ディレクトリのパス
train_images_np = []
for filename in train_image_filenames:
  train_images_np.append(load_image_into_numpy_array(filename))

# 画像を読み込んで表示してみる
plt.imshow(train_image_np[0])
plt.show()

8.クラスラベルをワンホットTensorに、画像とBoxデータをTensorに

ワンホットとは番号を0と1の配列で表したもの。
例えば、2クラス内の1は[1,0] 2は[0,1] と該当順番のみ1になっている。

# クラスラベルをワンホットに変換; 全てをTensorに変換。
# ここで `label_id_offset`は、すべてのクラスを特定の数のインデックスだけシフト
# バックグラウンド以外はモデルがワンホットラベルを受け取るように、ここでこれを行う
# クラスは0から数え始める。 
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []
for (train_image_np, gt_box_np, gt_label_np) in zip(
    train_images_np, gt_boxes, gt_labels):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0)) # Numpy画像をTensorに
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32)) # Numpy boxをTensorに
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(
      gt_label_np - label_id_offset) # Numpy labelを最小値0の配列にして、Tensorに
  gt_classes_one_hot_tensors.append(tf.one_hot(
      zero_indexed_groundtruth_classes, num_classes)) # label Tensorをワンホットに
print('データの準備が終わりました')

たとえば、ある一枚の画像に対する
[1,1,2]
のラベル配列が
array([ [1., 0.],[1., 0.],[0., 1.] ], dtype=float32)>
のワンホット配列になります

9.アノテーションした正解ボックスを視覚化してチェック

dummy_scores = np.array([1.0], dtype=np.float32)  # 100%のボックススコアを仮で入れる

plt.figure(figsize=(30, 15))
for idx in range(5):
  plt.subplot(2, 3, idx+1)
  plot_detections(
      train_images_np[idx],
      gt_boxes[idx],
      gt_labels[idx],
      dummy_scores, category_index)
plt.show()

10.モデルをビルドして重みをリストアする

最後レイヤー以外の重みをリストアします。最後のレイヤーのみトレーニング用にランダムな重みで初期化されます。
ここではResnetバックボーンのRetinanetを使っています。
Object Detection API にはさまざまなモデルがあります。


# モデルをダウンロード
!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

モデルごとにパイプライン定義辞書ファイルがあり、クラス数などパラメーターが書き込まれています。
パイプライン定義辞書ファイルは、Object Detection リポジトリの configs フォルダ、もしくはダウンロードしたモデルのディレクトリにあります。
定義ファイルのクラス数を、自前のデータのクラス数に書き換えます。

チェックポイントからリストアする層をHeadで指定しています。今回は、クラス分類用の部分の重みはリストアしないので、ボックス分類用の部分の重みのみ指定しています。


tf.keras.backend.clear_session()

print('簡易トレーニングのためにモデルをビルドして重みをリストアしています...', flush=True)
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# パイプライン定義を読み込んで物体検出モデルをビルド。

# デフォルトでは90クラスを検出するCOCOアーキテクチュアで作業しているので、
# パイプライン定義のクラス数をデータのクラス数に上書きする。

configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

# 物体分類・検出の重みをリストアする --- RetinaNetは2つの推論チェックポイントHeadをもっている
#  --- 一つはクラス分類用, もう一つはボックス検出用.  We will
# ボックス検出用のチェックポイントHeadから重みをリストアするが、クラス分類用の重みはスクラッチで初期化する
両方のヘッドからリストアしたい場合は追加するラインをコメントアウトで以下に示している

fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    # _prediction_heads=detection_model._box_predictor._prediction_heads,
    #    (今回はリストアしないクラス分類用のHead)
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial() # 部分的にリストア

# モデルの重み値が作られるように、ダミーインプット(0配列)で実行
image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('重みをリストアしました!')

11.トレーニング

トレーニング時間は数分。


tf.keras.backend.set_learning_phase(True)

# これらのパラメーターは調整できる; サンプルでは5枚の画像でトレーニングするため
# 大きなバッチサイズは機能しない, 必要に応じてもっと大きいバッチでトレーニング画像を
# メモリにフィットできる.
batch_size = 4
learning_rate = 0.01
num_batches = 100

# 今回簡易トレーニングするトップ層の値を選択する.
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

# 一回のトレーニングステップのフォワード・バックワード処理を設定
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  """Get a tf.function for training step."""

  # 速度をあげるために tf.function を使用.
  # 内部の具体的な値が欲しい場合(Eager Execution)は  @tf.functionデコレーターをコメントアウト

  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    """一回分のトレーニングの計算.

    引数:
      image_tensors:  [1, 高さ, 幅, 3]のtf.float32タイプのTensorのリスト.
        注。画像によってサイズは異なり、この関数内で640x640にリシェイプされる.
      groundtruth_boxes_list: バッチ内の各画像の正解ボックスを表す
        [N_i, 4]形状のtf.float32 タイプのTensorのリスト 
      groundtruth_classes_list: バッチ内の各画像の正解ボックスを表す
        [N_i, num_classes]形状のtf.float32 タイプのTensorのリスト 

    戻り値:
      入力バッチのトータルロスを表すスカラー(単一値)のTensor
    """
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

print('簡易トレーニングをスタート!', flush=True)
for idx in range(num_batches):
  # データセットのランダムなサブセットのキーを取得
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  # 注 このサンプルデモではデータ拡張を行わないが、楽しい練習のためにランダム横フリップと
  # ランダムクロップをお勧めする
  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]

  # トレーニングステップ (フォワードパス + バックワードパス)
  total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 10 == 0:
    print('batch ' + str(idx) + ' of ' + str(num_batches)
    + ', loss=' +  str(total_loss.numpy()), flush=True)

print('簡易トレーニングが終わりました!')

batch 0 of 100, loss=1.2068503
batch 10 of 100, loss=0.12002414
batch 20 of 100, loss=0.10228661
batch 30 of 100, loss=0.0361837
batch 40 of 100, loss=0.011348422
batch 50 of 100, loss=0.0028579112
batch 60 of 100, loss=0.0032960502
batch 70 of 100, loss=0.0023721359

12.未知の画像でテスト!

テスト画像を numpy array にしてモデルの推論にかけます。結果は100個のバウンディングボックス、
100個のラベル、100個のスコアで返ってきます。
バウンディングボックスのi番目は、ラベルのi番目、スコアのi番目に対応しています。
どんな場合もこの100個という数は変わりません。

この中で、スコアの高いものを視覚化したり、推論結果のボックスとして用います。
視覚化する場合のデフォルトのスコアの閾値は0.8です。(plot_detection関数で指定される)
手元で試した時も、大体の場合、例えばぼくが目で画像をみて推論対象のオブジェクトが2つ見える場合、モデルが出力した100個のスコアのうち0.5を超えるものは2つです。他のスコアは0.02など極端に低いです。なので、100個の結果のうち信頼できるボックスやラベルを見つけるのはそれほど難しくありません(トレーニングがうまくいっていれば)。

pip install natsort #テスト画像の順番を保って推論するために、名前でソートするライブラリをインストールしています。
from natsort import natsorted

print(sorted_file_names)
test_image_dir = './dataset/test'
test_images_np = []
file_names = os.listdir(test_image_dir)
test_paths = natsorted(file_names)

for test_path in test_paths:
  test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(test_path), axis=0))

# Again, uncomment this decorator if you want to run inference eagerly
@tf.function
def detect(input_tensor):
  """Run detection on an input image.

  Args:
    input_tensor: A [1, height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.

  Returns:
    A dict containing 3 Tensors (`detection_boxes`, `detection_classes`,
      and `detection_scores`).
  """
  preprocessed_image, shapes = detection_model.preprocess(input_tensor)
  prediction_dict = detection_model.predict(preprocessed_image, shapes)
  return detection_model.postprocess(prediction_dict, shapes)

# Note that the first frame will trigger tracing of the tf.function, which will
# take some time, after which inference should be fast.

label_id_offset = 1
for i in range(len(test_images_np)):
  input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32)
  detections = detect(input_tensor) # このdetectionsで結果が取れます。

  plot_detections(
      test_images_np[i][0],
      detections['detection_boxes'][0].numpy(),
      detections['detection_classes'][0].numpy().astype(np.uint32)
      + label_id_offset,
      detections['detection_scores'][0].numpy(),
      category_index, figsize=(15, 20), image_name="gif_frame_" + ('%02d' % i) + ".jpg") # 指定したパスにスコア0.8を超えるボックスを描画した画像が保存されます。
print(detections)
# 以下出力結果。省略してあるがそれぞれ100個ある
# 'detection_boxes' 'detection_classes' 'detection_scores'が最終結果
# 'detection_anchor_indices''raw_detection_boxes''raw_detection_scores'は最終結果を計算する際使った途中データ(だと思う。たぶん)

{'detection_anchor_indices': <tf.Tensor: shape=(1, 100), dtype=int32, numpy=
 array([[49416,
         50753, 
        ...
         51112,
         26364]], dtype=int32)>,
 'detection_boxes': <tf.Tensor: shape=(1, 100, 4), dtype=float32, numpy=
 array([[[0.43758985, 0.7465773 , 0.63472795, 0.9252911 ],
         [0.1677289 , 0.6480559 , 0.890319  , 1.        ],
        ...
         [0.40918362, 0.3183376 , 1.        , 0.9439225 ],
         [0.639281  , 0.8898159 , 0.7221419 , 0.97141266]]], dtype=float32)>,
 'detection_classes': <tf.Tensor: shape=(1, 100), dtype=float32, numpy=
 array([[0.,
         0.,
        ...
         1.,
         0.]], dtype=float32)>,
 'detection_multiclass_scores': <tf.Tensor: shape=(1, 100, 3), dtype=float32, numpy=
 array([[[5.47093153e-03, 3.10172260e-01, 1.57460570e-03],
         [3.18378210e-03, 2.98067868e-01, 1.27398968e-03],
        ...
         [1.98462605e-03, 7.14010894e-02, 1.30185485e-03]]], dtype=float32)>,
 'detection_scores': <tf.Tensor: shape=(1, 100), dtype=float32, numpy=
 array([[0.31017226, 0.29806787, 0.26563442, 0.23411435, 0.22276634,
         0.21396422, 0.20716852, 0.18401867, 0.17277354, 0.16559672,
        ...
         0.14484483, 0.14467192, 0.13986477, 0.13589099, 0.13474342,
         0.07329145, 0.0723871 , 0.07223672, 0.07157233, 0.07140109]],
       dtype=float32)>,
 'num_detections': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([100.], dtype=float32)>,
 'raw_detection_boxes': <tf.Tensor: shape=(1, 51150, 4), dtype=float32, numpy=
 array([[[-3.6555314e-03, -1.2414398e-02,  1.4784184e-02,  1.0699857e-02],
         [-9.5088510e-03, -2.2957223e-02,  3.9035182e-02,  1.7941574e-02],
         ...,
         [ 3.1216300e-01,  6.6491508e-01,  1.3707981e+00,  1.0911807e+00],
         [ 6.6202581e-02,  4.6959493e-01,  1.5031044e+00,  1.2707567e+00]]],
       dtype=float32)>,
 'raw_detection_scores': <tf.Tensor: shape=(1, 51150, 3), dtype=float32, numpy=
 array([[[9.3629062e-03, 7.2856843e-03, 4.1753352e-03],
         [4.8707724e-03, 1.5826846e-06, 3.3203959e-03],
         ...,
         [7.2056055e-03, 1.9515157e-02, 1.4944762e-02],
         [8.9454055e-03, 1.9429326e-03, 1.5336275e-03]]], dtype=float32)>}

12’.結果をGifで表示

imageio.plugins.freeimage.download()

anim_file = 'test.gif'

filenames = glob.glob('gif_frame_*.jpg')
filenames = sorted(filenames)
last = -1
images = []
for filename in filenames:
  image = imageio.imread(filename)
  images.append(image)

imageio.mimsave(anim_file, images, 'GIF-FI', fps=5)

display(IPyImage(open(anim_file, 'rb').read()))

13.モデルの保存

import os

ckpt_path = 'ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8'
os.makedirs(ckpt_path, exist_ok=True)

checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=detection_model)
manager = tf.train.CheckpointManager(checkpoint, directory=ckpt_path, max_to_keep=5)
manager.save()

14.モデルの復元


trained_model = model_builder.build(model_config=model_config, is_training=False)

ckpt_trained = tf.compat.v2.train.Checkpoint(model=ssd_model)

# ダミー入力で実行して重み値を生成
image, shapes = trained_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = trained_model.predict(image, shapes)
_ = trained_model.postprocess(prediction_dict, shapes)

ckpt_trained.restore('ckpt/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/ckpt-1')
print('重みをリストアしました!')

15.復元したモデルの実行

12、のテストの detect_model を trained_modelに書き換えて実行します。

🐣


フリーランスエンジニアです。
お仕事のご相談こちらまで
rockyshikoku@gmail.com

Core MLを使ったアプリを作っています。
機械学習関連の情報を発信しています。

Twitter
Medium

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5