Yolo_アンカーボックスはクラス毎に必要だと思うのですが...
アンカーボックスってクラス毎に必要ではないのでしょうか?
背景
2024年にもなって今更ですが、KerasでYoloV3の実装を行っています
別プロジェクトチーム(以後,別チーム)からプログラムを頂いて、編集を加えているところです。
ついでに、
中身を精査していて疑問に思い調べていたのですが解決に至らず...。
疑問点
・アンカーボックスはクラス毎に設定するものではない?
・上記認識が合っているとして、どう変更を加えたらいいのか
環境
ubuntu 20.04
Docker
Python 3.8
keras 2.2.4
疑問に思った経緯
頂いたプログラムを読んでいて、
アンカーボックス作成はkmeans手法で行っていて
1つのクラスに対してアンカーボックスを9つ用意している中身でした
cluster_number = 9 #アンカーボックスの数
ここで作成されるアンカーボックスは、
クラス毎にバウンディングボックスの形状特徴を反映させているものだと解釈しています
(ここが違う?)
そのため、クラス毎にアンカーボックスを最適化する必要があると感じました
実際に別チームでは、
複数クラスをk-means手法でアンカー作成→学習すると精度が悪化した為、
1クラス毎にk-meansと学習を行っているみたいです。
(最終的にはクラス毎にモデルがある)
じゃあクラス毎にアンカーボックス必要でしょ!!ってなりました。
※アンカーボックス最適化の中身は問題ないかなと思っているので
中身は貼りませんが、必要になれば貼ります
とりあえずクラス毎にアンカーボックス作成しようとするが...
ここで2つ目の疑問です
学習プログラムに組み込もうと思ったけどどうやるん...
ここからはプログラム貼りながら書いていきます
# アノテーションラベル名
class_names = get_classes(conf.classes_path)
# クラス数
num_classes = len(class_names)
# k_meansでアンカーボックスサイズの最適化を行ったアウトプットファイルの中身
# 9x2の配列
anchors = get_anchors(conf.anchors_path)
ラベル名、
ラベル数取得
前工程で算出したアンカーボックスを読み込む(1クラス分のみ)
あとは画像サイズなんかも設定しています
# ifは通さない前提か? Lite版モデル? 早いけど精度落ちる
is_tiny_version = len(anchors)==6 # default setting
if is_tiny_version:
model = create_tiny_model(input_shape, anchors, num_classes,
freeze_body=2, weights_path='model_data/tiny_yolo_weights.h5 ← こんなのない')
# 別チームはこっちルートのみ
else:
# ベースとなるモデルの作成?
model = create_model(input_shape, anchors, num_classes,
freeze_body=2, weights_path=conf.weight_path)
アンカーボックス数が6のときだけはtinyバージョン使う?
そういう決まりがあるんですかね?
create_modelの中身は以下
def create_model(input_shape, anchors, num_classes, load_pretrained=True, freeze_body=2,
weights_path=conf.weight_path):
'''create the training model
input_shape:画像サイズ(縦横)
anchors:k_meansで吐き出したクラスに対応したアンカーボックスサイズ
num_classes:アノテーションラベル数
load_pretrained:
freeze_body:
weights_path:重みファイル
'''
K.clear_session() # get a new session
# 入力層
image_input = Input(shape=(None, None, 3))
h, w = input_shape
num_anchors = len(anchors)
# // = 切り捨て除算 0:32 = 32, 1:16 = 16, 2:8 = 8 で計算しても同じ結果 それぞれの数値の意味は?
# 決め打ちで書かれてる数値の意図が不明 (num_anchors//3 rangeの3に合わせてる? +5は?)
y_true = [Input(shape=(h//{0:32, 1:16, 2:8}[l], w//{0:32, 1:16, 2:8}[l], \
num_anchors//3, num_classes+5)) for l in range(3)]
# 多分層の作成
model_body = yolo_body(image_input, num_anchors//3, num_classes)
print('Create YOLOv3 model with {} anchors and {} classes.'.format(num_anchors, num_classes))
# 別チームはifを必ず通してる
if load_pretrained:
# 作成したレイヤーで重みのロード
model_body.load_weights(weights_path, by_name=True, skip_mismatch=True)
print('Load weights {}.'.format(weights_path))
# 別チームはifを必ず通してる
# freeze bodyとは???
if freeze_body in [1, 2]:
# Freeze darknet53 body or freeze all but 3 output layers.
# 185?? freezebody で ヘッドの場合必ず1になるので185は残らない darknetのレイヤー数(darknet = Model(inputs, darknet_body(inputs)))
# 出力層は計算外にしてるっぽい
# 出力層以外のレイヤー数を取得
# freeze_bodyが1のときは185(darknetのレイヤー?)を選択することになる
# 2はそのままで良さそう(基本的に今あるレイヤーに対して重みの固定を行いたい感じなので)
num = (185, len(model_body.layers)-3)[freeze_body-1]
# 全部のレイヤーに対して重みの固定を行う(転移学習では必須)
for i in range(num): model_body.layers[i].trainable = False
# print('Freeze the first {} layers of total {} layers.'.format(num, len(model_body.layers)))
# ここやばそうなので一旦パス
model_loss = Lambda(yolo_loss, output_shape=(1,), name='yolo_loss',arguments={'anchors': anchors, 'num_classes': num_classes, 'ignore_thresh': 0.5})([*model_body.output, *y_true])
# 転移完了?
model = Model([model_body.input, *y_true], model_loss)
return model
プログラムを見て頂くとお分かり頂けますが疑問だらけです(笑)
とりあえず層を重ねてってるなーくらいの解釈ですが、
この段階で何かしないといけないのかなと感じてます
例えば、model_bodyを定義している部分。
yolo_bodyにアンカーボックス数やクラス数を渡してる
def yolo_body(inputs, num_anchors, num_classes):
"""Create YOLO_V3 model CNN body in Keras."""
darknet = Model(inputs, darknet_body(inputs))
x, y1 = make_last_layers(darknet.output, 512, num_anchors*(num_classes+5))
x = compose(
DarknetConv2D_BN_Leaky(256, (1,1)),
UpSampling2D(2))(x)
x = Concatenate()([x,darknet.layers[152].output])
x, y2 = make_last_layers(x, 256, num_anchors*(num_classes+5))
x = compose(
DarknetConv2D_BN_Leaky(128, (1,1)),
UpSampling2D(2))(x)
x = Concatenate()([x,darknet.layers[92].output])
x, y3 = make_last_layers(x, 128, num_anchors*(num_classes+5))
return Model(inputs, [y1,y2,y3])
この時点で,アンカーの数とクラス数に5を足した値(何故足す!?)を掛け算してる
アンカーボックス数 x (クラス数+5)はお決まり?
あ、でもここはまだ数を見ているだけで
クラスとアンカーボックスを対応している訳ではないからいいのか?
(もう迷子です)
あと、
model_loss定義部分も怪しい
model_loss = Lambda(yolo_loss, output_shape=(1,), name='yolo_loss',arguments={'anchors': anchors, 'num_classes': num_classes, 'ignore_thresh': 0.5})([*model_body.output, *y_true])
yolo_lossの中身は以下です
def yolo_loss(args, anchors, num_classes, ignore_thresh=.5, print_loss=False):
'''Return yolo_loss tensor
Parameters
----------
yolo_outputs: list of tensor, the output of yolo_body or tiny_yolo_body
y_true: list of array, the output of preprocess_true_boxes
anchors: array, shape=(N, 2), wh
num_classes: integer
ignore_thresh: float, the iou threshold whether to ignore object confidence loss
Returns
-------
loss: tensor, shape=(1,)
'''
# 3
num_layers = len(anchors)//3 # default setting
# [*model_body.output, *y_true] で受け取るので一旦分けてる
yolo_outputs = args[:num_layers]
y_true = args[num_layers:]
# 3なら左 それ以外は右
anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]]
# castは情報の付け足しや変換?(これならdtypeを置き換えてる)
# float32に置き換え
input_shape = K.cast(K.shape(yolo_outputs[0])[1:3] * 32, K.dtype(y_true[0]))
grid_shapes = [K.cast(K.shape(yolo_outputs[l])[1:3], K.dtype(y_true[0])) for l in range(num_layers)]
loss = 0
# float32に置き換え
m = K.shape(yolo_outputs[0])[0] # batch size, tensor
mf = K.cast(m, K.dtype(yolo_outputs[0]))
for l in range(num_layers):
object_mask = y_true[l][..., 4:5]
true_class_probs = y_true[l][..., 5:]
grid, raw_pred, pred_xy, pred_wh = yolo_head(yolo_outputs[l],
anchors[anchor_mask[l]], num_classes, input_shape, calc_loss=True)
pred_box = K.concatenate([pred_xy, pred_wh])
# Darknet raw box to calculate loss.
raw_true_xy = y_true[l][..., :2]*grid_shapes[l][::-1] - grid
raw_true_wh = K.log(y_true[l][..., 2:4] / anchors[anchor_mask[l]] * input_shape[::-1])
raw_true_wh = K.switch(object_mask, raw_true_wh, K.zeros_like(raw_true_wh)) # avoid log(0)=-inf
box_loss_scale = 2 - y_true[l][...,2:3]*y_true[l][...,3:4]
# Find ignore mask, iterate over each of batch.
ignore_mask = tf.TensorArray(K.dtype(y_true[0]), size=1, dynamic_size=True)
object_mask_bool = K.cast(object_mask, 'bool')
def loop_body(b, ignore_mask):
true_box = tf.boolean_mask(y_true[l][b,...,0:4], object_mask_bool[b,...,0])
iou = box_iou(pred_box[b], true_box)
best_iou = K.max(iou, axis=-1)
ignore_mask = ignore_mask.write(b, K.cast(best_iou<ignore_thresh, K.dtype(true_box)))
return b+1, ignore_mask
_, ignore_mask = K.control_flow_ops.while_loop(lambda b,*args: b<m, loop_body, [0, ignore_mask])
ignore_mask = ignore_mask.stack()
ignore_mask = K.expand_dims(ignore_mask, -1)
# K.binary_crossentropy is helpful to avoid exp overflow.
xy_loss = object_mask * box_loss_scale * K.binary_crossentropy(raw_true_xy, raw_pred[...,0:2], from_logits=True)
wh_loss = object_mask * box_loss_scale * 0.5 * K.square(raw_true_wh-raw_pred[...,2:4])
confidence_loss = object_mask * K.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True)+ \
(1-object_mask) * K.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True) * ignore_mask
class_loss = object_mask * K.binary_crossentropy(true_class_probs, raw_pred[...,5:], from_logits=True)
xy_loss = K.sum(xy_loss) / mf
wh_loss = K.sum(wh_loss) / mf
confidence_loss = K.sum(confidence_loss) / mf
class_loss = K.sum(class_loss) / mf
loss += xy_loss + wh_loss + confidence_loss + class_loss
if print_loss:
loss = tf.Print(loss, [loss, xy_loss, wh_loss, confidence_loss, class_loss, K.sum(ignore_mask)], message='loss: ')
return loss
loss...
アンカーの実際の数値を取り出してるから怪しいけど...
クラスとの紐付けは無さそう...
もう...何...?
(学習のときに紐付けしてるのかな!)
この後は、アーリーストッピングの設定やら学習率の設定やらが続いて、
いよいよアノテーションの情報取得しにいって、コンパイル。
model.fit_generatorの部分で学習開始。
val_split = 0.1
# アノテーション情報の取得
with open(conf.annotation_path) as f:
lines = f.readlines()
np.random.seed(10101)
np.random.shuffle(lines) # 順番をシャッフルしてる
np.random.seed(None)
# 10%
num_val = int(len(lines)*val_split)
# その他全部の割合
num_train = len(lines) - num_val
# Train with frozen layers first, to get a stable loss.
# Adjust num epochs to your dataset. This step is enough to obtain a not bad model.
if True:
# fit や predictの前には必要(コンピュータが読める様に変換)
model.compile(optimizer=Adam(lr=1e-3), loss={
# use custom yolo_loss Lambda layer.
'yolo_loss': lambda y_true, y_pred: y_pred})
batch_size = conf.batch
print('Train on {} samples, val on {} samples, with batch size {}.'.format(num_train, num_val, batch_size))
# 学習
history = model.fit_generator(data_generator_wrapper(lines[:num_train], batch_size, input_shape, anchors, num_classes),
steps_per_epoch=max(1, num_train//batch_size),
validation_data=data_generator_wrapper(lines[num_train:], batch_size, input_shape, anchors, num_classes),
validation_steps=max(1, num_val//batch_size),
epochs=conf.epochs,
initial_epoch=0,
callbacks=[logging, checkpoint])
model.save_weights(conf.log_dir + 'trained_weights_stage_1.h5')
# Unfreeze and continue training, to fine-tune.
# Train longer if the result is not good.
if True:
for i in range(len(model.layers)):
# 重み固定を解除?
model.layers[i].trainable = True
model.compile(optimizer=Adam(lr=1e-4), loss={'yolo_loss': lambda y_true, y_pred: y_pred}) # recompile to apply the change
print('Unfreeze all of the layers.')
batch_size = conf.batch # note that more GPU memory is required after unfreezing the body
print('Train on {} samples, val on {} samples, with batch size {}.'.format(num_train, num_val, batch_size))
history = model.fit_generator(data_generator_wrapper(lines[:num_train], batch_size, input_shape, anchors, num_classes),
steps_per_epoch=max(1, num_train//batch_size),
validation_data=data_generator_wrapper(lines[num_train:], batch_size, input_shape, anchors, num_classes),
validation_steps=max(1, num_val//batch_size),
epochs=conf.epochs,
initial_epoch=50,
callbacks=[logging, checkpoint, reduce_lr, early_stopping])
model.save_weights(f"{conf.log_dir}/{conf.model_name}")
model.save_weights(f'{conf.model_save_path}/{conf.model_name}')
data_generator_wrapperの中身は以下
def data_generator_wrapper(annotation_lines, batch_size, input_shape, anchors, num_classes):
n = len(annotation_lines)
if n==0 or batch_size<=0: return None
return data_generator(annotation_lines, batch_size, input_shape, anchors, num_classes)
data_generatorの中身は以下
def data_generator(annotation_lines, batch_size, input_shape, anchors, num_classes):
'''data generator for fit_generator'''
n = len(annotation_lines)
i = 0
while True:
image_data = []
box_data = []
for b in range(batch_size):
if i==0:
np.random.shuffle(annotation_lines)
image, box = get_random_data(annotation_lines[i], input_shape, random=True)
image_data.append(image)
box_data.append(box)
i = (i+1) % n
image_data = np.array(image_data)
box_data = np.array(box_data)
y_true = preprocess_true_boxes(box_data, input_shape, anchors, num_classes)
yield [image_data, *y_true], np.zeros(batch_size)
model.fit_generatorで学習が開始されるが、
VScodeでデバッグで動かしながら中身見ようと思うが何故かブレークポイントをスルー...。
(何でよ。)
心が折れそうになりここに質問することにしました
お力添え頂けると幸いです。よろしくお願いいたします。
必要な情報があればおっしゃってください。