Edited at

Deep Learningの気持ちを理解する - Classification編

More than 1 year has passed since last update.

画像分野では今やDeep Learningによる解析が主流となりつつありますが、結果の解釈が難しいのが難点です。医療分野などでは特に、モデルの出力に対する説明力が強く求められるのではないでしょうか。今回は、そういった時に活用できる可視化手法を紹介します。

紹介する手法はOBJECT DETECTORS EMERGE IN DEEP SCENE CNNs, Zhou+, '14で提案されている方法です。


論文中でやっていること


  1. classificationモデルの学習(通常の学習)

  2. 1と同じ前処理を適用した画像を入力として、1で学習したモデルでclass毎の確率値を計算

  3. 2で使用した画像の一部の領域をマスクした画像を入力として、class毎の確率値を計算

  4. 3を5000個程度の領域で行い、値を取得

  5. 4の個々の出力値と2の出力値の差分をとる

  6. 任意の閾値を設定し、5の値が閾値を超えていない領域は0埋めする(黒くマスクする)


Occluder

上の3で書いた、入力画像を一部マスクする処理は論文中ではOccluderと記載されています。

当該手法の勘所は、このOccluderだと思います。

3〜5の処理は、入力画像の一部を分類器から見えなくし、どの程度分類器の確信度が下がるかを見ている事に相当します。

これにより、分類器にとって重要な判断ポイント(領域)を見つけよう、というものです。

Occluderを通した画像のイメージは以下のようになります。

Occlude前




Occlude後


コード作成にあたり改善したポイント

論文の手法から改善したポイントは主に以下の2点です。


  1. Occluderをpatch切り出しに変更

  2. 閾値を動的に変化


patch切り出しへの変更

1については、入力画像サイズが大きくなるにつれて、可視化にかかる負荷が大きくなるためです。

Occluderで使用するマスクの大きさは8pix程度、strideは3程度が想定されるので、2,000×2,000pixの画像だとOccluderでマスクされた画像が約44万個も出来てしまう事になります。

これを全て計算して、やっと1枚の画像の可視化が出来る...

これ、あまり嬉しくないような気がしました。

そこで、私のコード中では本来Occluderでマスクをかける領域に対してpatchを切り出し、分類器への入力としています。

入力画像のサイズが異なるので、出力される値が全体的に低くなり不安定になりはします。

それを補うためにも入力画像により閾値を動的に変化させています。

patchを入力とする場合の留意点として、Occluderの時とscoreを反転させる必要があります。

つまり、どのpatchを入力した時に2の値に近づけるか、で判断することになります。


閾値を動的に変化

固定の閾値で計算する場合の問題点は、「論文中でやっていること」の2で出力される確率値が低いと、閾値を全く超えない画像が出てくる事です。

これでは、根拠を示すことが出来なくなってしまいます。

(判断に迷ったものは真っ黒な画像で良い、という場合はこの限りではありません。)

そこで、1画像毎に閾値を変化させています。

やっていることはかなり単純で、出力値の差分行列(5の処理で出来る行列)に対するpercentile値を閾値としています。

実験した感じでは95〜99で設定すると納得感のある可視化が出来ます。


参考コード

実験したコードをほぼそのまま貼り付けています。

汚いコードで申し訳ないのですが、参考になれば幸いです。

CPUでも計算出来る手頃なものを考えて、Cifar10の分類器を作って実験しています。

使用しているフレームワークはChainer(1.17.0)で、CPU環境で実験を行いました。

コードとしては以下のブロックに分かれています。


  • 設定

  • モデル

  • 訓練/テスト

  • 可視化


設定ファイル

Deep Learning系のコード管理で一番面倒なのは実験した設定の管理だと思っています。

このやり方が良いかどうかはわかりませんが、私は設定ファイルを分離して、複製/保存出来るようにしています(分離していなかった時よりかは快適に実験出来ています)。


visualization_settings.py

import os

from easydict import EasyDict as edict
from math import sqrt
import platform
import inflection

local_os_name = 'Darwin'
data_root_path = './data' if platform.system()==local_os_name else '/data'

# key is model file name
mult_dir = {'n_i_n': 32,
'squeeze_net': 16,
'cifar10': 4}

augmentation_params = {
'scale':[0.5, 0.75, 1.25],
'lr_shift':[-64, -32, -16, 16, 32, 64],
'ud_shift':[-64, -32, -16, 16, 32, 64],
'rotation_angle': list(range(5,360,5))
}

net_dir = {net_name:{'module_name':net_name, \
'class_name':inflection.camelize(net_name)} \
for net_name, _ in mult_dir.items()}

image_normalize_types_dir = {'ZCA': {'method':'zca_whitening', 'opts':{'eps':1e-5}},
'LCN': {'method':'local_contrast_normalization', 'opts':None},
'GCN': {'method':'global_contrast_normalization', 'opts':None}
}

# 以下は実験毎に変更するパラメタ
debug_mode = False
converse_gray = False
gpu = -1 if platform.system()==local_os_name else 1
use_net = 'cifar10'
n_class = 10 # number of class is 2, if you use ne_class classifier.
crop_size = 224
normalize_type = 'LCN'
output_path = os.path.join(data_root_path+'/results', use_net)
im_norm_type = image_normalize_types_dir[normalize_type]
model_module = net_dir[use_net]
experiment_criteria = ''
initial_model = os.path.join(data_root_path+'/results'+'/'+use_net+experiment_criteria, 'model_iter_3868')
resume = os.path.join(data_root_path+'/results'+'/'+use_net+experiment_criteria, 'snapshot_iter_xxx')
aug_flags = {'do_scale':False, 'do_flip':False,
'change_britghtness':False, 'change_contrast':False,
'do_shift':False, 'do_rotate':False}

trainig_params = { \
'lr': 1e-5,
'batch_size': 256,
'weight_decay': 0.0005,
'epoch': 100,
'decay_factor': 0.90, # as lr time decay
'decay_epoch': 50,
'snapshot_epoch': 3,
'report_epoch': 1,
}

# a body of args
train_args = \
{
'train': True,
'debug_mode': debug_mode,
'gpu': gpu,
'n_class': n_class,
'in_ch': 1 if converse_gray else 3,
'image_pointer_path': data_root_path+'/cifar-10-batches-py/data_batch_1',
'output_path': output_path,
'initial_model': initial_model,
'resume': resume,
'im_norm_type': im_norm_type,
'archtecture': model_module,
'converse_gray': converse_gray,
'do_resize': True,
'crop_params': {'flag':False, 'size': crop_size},
'multiple': mult_dir[use_net], # total stride multiple
'aug_params': {'do_augment':True,
'params': dict(augmentation_params, **aug_flags),
},
'shuffle': True, # data shuffle in SerialIterator
'training_params': trainig_params
}

test_args = \
{
'train': False,
'debug_mode': debug_mode,
'gpu': gpu,
'n_class': n_class,
'in_ch': 1 if converse_gray else 3,
'image_pointer_path': data_root_path+'/cifar-10-batches-py/data_batch_1', #test_batch
'output_path': output_path,
'initial_model': initial_model,
'im_norm_type': im_norm_type,
'archtecture': model_module,
'converse_gray': converse_gray,
'do_resize': True,
'crop_params': {'flag':False, 'size': crop_size},
'multiple': mult_dir[use_net], # total stride multiple
'aug_params': {'do_augment':False},
'multiple': mult_dir[use_net], # total stride multiple
}

def get_args(args_type='train'):
if args_type=='train':
return edict(train_args)
else:
return edict(test_args)



モデル


cifar10.py

import chainer

import chainer.functions as F
import chainer.links as L

class Cifar10(chainer.Chain):

def __init__(self, n_class, in_ch):
super().__init__(
conv1=L.Convolution2D(in_ch, 32, 5, pad=2),
conv2=L.Convolution2D(32, 32, 5, pad=2),
conv3=L.Convolution2D(32, 64, 5, pad=2),
fc4=F.Linear(1344, 4096),
fc5=F.Linear(4096, n_class),
)
self.train = True
self.n_class = n_class

def __call__(self, x, t):
x.volatile = not self.train

h = F.max_pooling_2d(F.elu(self.conv1(x)), 3, stride=2)
h = F.max_pooling_2d(F.elu(self.conv2(h)), 3, stride=2)
h = F.elu(self.conv3(h))
h = F.spatial_pyramid_pooling_2d(h, 3, F.MaxPooling2D)
h = F.dropout(F.elu(self.fc4(h)), ratio=0.5, train=self.train)
h = self.fc5(h)

self.prob = F.softmax(h)
self.loss = F.softmax_cross_entropy(h, t)
self.accuracy = F.accuracy(h, t)
chainer.report({'loss': self.loss, 'accuracy': self.accuracy}, self)

return self.loss



訓点/テスト

主にTrainer絡みの設定を行っているファイルです。

コード中で登場するDatasetPreProcessorは"chainer.dataset.DatasetMixin"を継承して作成した画像のloaderで、prepare_modelは設定ファイルの記述にあうモデルを準備してloadしてくるものです。

このあたりは必要に応じて書いて頂ければと思います。


train.py

import sys, os

sys.path.append('./src/net')
sys.path.append('./experiment_settings')
from mini_batch_loader import DatasetPreProcessor
from visualization import get_args
from model_loader import prepare_model

import chainer
import chainer.functions as F
from chainer import serializers
from chainer import cuda, optimizers, Variable
from chainer import training
from chainer.training import extensions
from chainer import Reporter, report, report_scope

import cv2
import importlib
import numpy as np

def prepare_optimizer(model, args):
optimizer = chainer.optimizers.RMSpropGraves(args.training_params.lr)
optimizer.setup(model)
optimizer.add_hook(chainer.optimizer.WeightDecay(args.training_params.weight_decay))
return optimizer

def prepare_dataset():
train_args = get_args('train')
# load dataset
train_mini_batch_loader = DatasetPreProcessor(train_args)
test_mini_batch_loader = DatasetPreProcessor(get_args('test'))
print("---set mini_batch----------")
train_it = chainer.iterators.SerialIterator( \
train_mini_batch_loader, \
train_args.training_params.batch_size, \
shuffle=train_args.shuffle)
val_it = chainer.iterators.SerialIterator( \
test_mini_batch_loader, \
1, repeat=False, shuffle=False)
return train_it, val_it, train_mini_batch_loader.__len__()

def main(args):
# load model
model, model_for_eval = prepare_model(args)
print("---set model----------")

# Setup optimizer
optimizer = prepare_optimizer(model, args)
print("---set optimzer----------")

# load data
train_it, val_it, train_data_length = prepare_dataset()
print("---set data----------")

updater = training.StandardUpdater(train_it, optimizer, device=args.gpu)
print("---set updater----------")

val_interval = args.training_params.report_epoch, 'epoch'
val_snapshot_interval = args.training_params.snapshot_epoch, 'epoch'
log_interval = args.training_params.report_epoch, 'epoch'

trainer = training.Trainer( \
updater, (args.training_params.epoch, 'epoch'), out=args.output_path)
trainer.extend( \
extensions.Evaluator(val_it, model_for_eval, device=args.gpu), \
trigger=val_interval)
trainer.extend(extensions.dump_graph('main/loss'))
trainer.extend(extensions.snapshot(), trigger=val_snapshot_interval)
trainer.extend(extensions.snapshot_object( \
model, 'model_iter_{.updater.iteration}'), \
trigger=val_snapshot_interval)
trainer.extend(extensions.ExponentialShift( \
'lr', args.training_params.decay_factor), \
trigger=(args.training_params.decay_epoch, 'epoch'))
trainer.extend(extensions.observe_lr(), trigger=log_interval)
trainer.extend(extensions.LogReport(trigger=log_interval))
trainer.extend(extensions.PrintReport([ \
'epoch', 'iteration', 'main/loss', 'validation/main/loss', \
'main/accuracy', 'validation/main/accuracy', \
]), trigger=log_interval)
trainer.extend(extensions.ProgressBar(update_interval=1))
print("---set trainer----------")

if os.path.exists(args.resume):
print('resume trainer:{}'.format(args.resume))
# Resume from a snapshot
serializers.load_npz(args.resume, trainer)
trainer.run()

if __name__ == '__main__':
args = get_args('train')
main(args)



可視化

最後に本題の可視化です。

可視化用のコードだけ載せようかと思ったんですけど、設定やモデルがわからないと再現出来ないと思い上のコードを載せています。

コードのメソッド名が上で書いた論文の手順とリンクしています(しているハズ)ので、説明は不要かと思います。


attention_visualizer.py

import sys, os

sys.path.append('./src/net')
sys.path.append('./experiment_settings')
from mini_batch_loader import DatasetPreProcessor
from visualization_settings import get_args
from model_loader import prepare_model

import chainer
from chainer import Variable, cuda
import numpy as np
import cv2
from itertools import product
from math import ceil, floor

class AttentionVisualizer(object):
"""
see OBJECT DETECTORS EMERGE IN DEEP SCENE CNNs, Zhou+, '14
https://arxiv.org/abs/1412.6856
"""

def __init__(self, args):
self.args = args
self.xp = cuda.cupy if args.gpu>=0 else np

def calculate_slice_idxs(self, size, x, y, h, w):
patch_harf_slide = ceil(size/2)
sl_strt_x = int(max(0, x - patch_harf_slide))
sl_end_x = int(min(h, x - patch_harf_slide + size))
sl_strt_y = int(max(0, y - patch_harf_slide))
sl_end_y = int(min(w, y - patch_harf_slide + size))
return sl_strt_x, sl_end_x, sl_strt_y, sl_end_y

def calculate_target_class(self, y, gt):
if self.args.view_type=='gt':
prob = y[0, gt]
target_class = gt
elif self.args.view_type=='infer':
max_class_idx = int(self.xp.argmax(y[0, :]))
prob = y[0, max_class_idx]
target_class = max_class_idx
return prob, target_class,

def crop_patches(self, image):
n_img, ch, h, w = image.shape

num_occluded_img = \
((h - 1)//self.args.stride+1) * ((w - 1)//self.args.stride+1)
patches = self.xp.zeros( \
(num_occluded_img, ch, self.args.size, self.args.size), \
dtype=np.float32)

window_pos = []
idx = 0

for x, y in product(range(self.args.size//2, h, self.args.stride), \
range(self.args.size//2, w, self.args.stride)):
_img = image.copy()
sl_strt_x, sl_end_x, sl_strt_y, sl_end_y = \
self.calculate_slice_idxs(self.args.size, x, y, h, w)

patch = image[:, :, sl_strt_x:sl_end_x, sl_strt_y:sl_end_y]
if min(patch.shape[2:])<self.args.size:
continue
patches[idx] = patch
window_pos.append([x, y])
idx += 1
window_pos = self.xp.array(window_pos, dtype=np.int32)
return patches[:idx], window_pos

def compute_one_batch_mask( \
self, mask, patches, prob, target_class, w_pos, idx):

n_img, ch, h, w = mask.shape
x_batch = Variable(patches[idx:idx+self.args.patch_batchsize])
self.args.net(x_batch, self.xp.array( \
[target_class]*len(x_batch.data), np.int32))
y_batch = self.args.net.prob.data
patches_prob = y_batch[:, target_class]

# attention score
diff = patches_prob - prob
if self.args.gpu>=0:
threshold = np.percentile(self.xp.asnumpy(diff), self.args.percentile)
else:
threshold = self.xp.percentile(diff, self.args.percentile)

batch_w_pos = w_pos[idx:idx+self.args.patch_batchsize]
crux_coordinate = self.xp.array([batch_w_pos[idx] for idx, flag in \
enumerate(diff > threshold) if flag], dtype=np.float32)

for x, y in crux_coordinate:
sl_strt_x, sl_end_x, sl_strt_y, sl_end_y = \
self.calculate_slice_idxs(self.args.size, x, y, h, w)
mask[:, :, sl_strt_x:sl_end_x, sl_strt_y:sl_end_y] = 1.
return mask

def compute_attention_mask(self, image, gt):
ch, h, w = image.shape
image = image.reshape(1, ch, h, w).astype(np.float32)

if self.args.gpu >= 0:
image = cuda.to_gpu(image, device=self.args.gpu)

x = Variable(image)
self.args.net(x, self.xp.array([gt], np.int32))
y = self.args.net.prob.data
prob, target_class = self.calculate_target_class(y, gt)

patches, w_pos = self.crop_patches(image)
if self.args.gpu>=0:
patches = cuda.to_gpu(patches, device=self.args.gpu)

mask = self.xp.zeros_like(image)
for idx in range(0, len(patches), self.args.patch_batchsize):
mask = self.compute_one_batch_mask( \
mask, patches, prob, target_class, w_pos, idx)
return mask, target_class

def visualize_attention(self, raw_image, preprocessed_image, gt):
h, w, _ = raw_image.shape

mask, target_class = \
self.compute_attention_mask(preprocessed_image, gt)
mask = mask[0].transpose(1,2,0).astype(np.uint8)

xp = cuda.get_array_module(mask)
if xp!=np:
mask = self.xp.asnumpy(mask)
mask = cv2.resize(mask, (w, h))
return raw_image * mask, target_class

if __name__=='__main__':
output_path_base = 'visualized_attention_images'
args = get_args('test')
mini_batch_loader = DatasetPreProcessor(args)

# patch config
args.size = 16 # stride=2のmax pooling×2個, 3階層のspp×1なのでモデルの出力を行う上で2*2*4のサイズが最低限必要
args.stride = 3

args.converse_gray = False
args.in_ch = 3
args.percentile = 95 # percentile value
args.patch_batchsize = 4096
# infer:モデルが出力したclassの分類根拠、gt:正解ラベルclassにおける注目点
# 正しく分類出来ていれば、inferでもgtでも可視化結果は同じ
args.view_type = 'infer' # select 'infer' or 'gt'.

visualizer = AttentionVisualizer(args)

_, model_eval = prepare_model(get_args('train'))
args.net = model_eval

for idx, (raw_image, label) in enumerate(mini_batch_loader.pairs):
preprocessed_image, _ = mini_batch_loader.get_example(idx)
attention_view, target_class = \
visualizer.visualize_attention(raw_image, preprocessed_image, label)

image_fname = 'image'+str(idx)+'_'+str(label)+'_'+str(target_class)+'.jpg'
output_path = os.path.join(args.output_path, output_path_base)
if not os.path.exists(output_path):
print("create directory:", output_path)
os.mkdir(output_path)
output_path = os.path.join(output_path, image_fname)
cv2.imwrite(output_path, attention_view)



可視化結果

結果は学習時間の都合上、training accuracyが0.8弱のモデルでtraining dataに対して可視化しています。

大量に画像を貼り付けてもしょうがないので、比較的目視で確認しやすかった画像を2枚貼り付けておきます。

Cifar10の元画像が32×32pixなので解像度が低くてわかり辛いですがご了承下さい。


可視化結果

bog

automobile

元画像が32×32でpatchサイズが16×16なので、かなり雑に見えてしまいますね。。

でも、まあま上手く可視化してくれていそうです。

お手数ですが間違いありましたらご指摘いただけますと助かります。