1. 強化学習
1.1 要約
強化学習とは、長期的に報酬を最大化できるように環境の中で行動を選択できるエージェントを作ることを目標とする機械学習の分野であり、優れた方策を見つけることが目標である。
強化学習には冬の時代があったが、関数近似法とQ学習を組み合わせる手法が登場した。
Q学習とは、行動価値関数を、行動する毎に更新することにより学習を進める方法であり、
関数近似法とは、価値関数や方策関数を関数近似する手法である。
価値関数とは価値を表す関数であり、ある状態の価値に注目する場合は状態価値関数、状態と価値を組み合わせた価値に注目する場合は行動価値関数と呼ぶ。
方策勾配法
平均報酬、割引報酬和の定義に対応して行動価値関数$Q(s,a)$の定義を行う。
このとき、方策勾配定理が成り立つ。
\Delta_{\theta}J(\theta)=E_{\pi_{\theta}}[\Delta_{\theta}(\log{\pi_{\theta}(a|s)Q^{\pi}(s,a)})]
1.2 実装
OpenAI Gymを使ってQ-learningで迷路を解かせる。
まずは必要なライブラリをインポートする。
from collections import defaultdict
import gym
from el_agent import ELAgent
from gym.envs.registration import register
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
環境の設定と行動価値を可視化する関数の定義を行う。
from gym.envs.registration import register
register(id="FrozenLakeEasy-v1", entry_point="gym.envs.toy_text:FrozenLakeEnv",
kwargs={"is_slippery": False})
def show_q_value(Q):
"""
Show Q-values for FrozenLake-v0.
To show each action's evaluation,
a state is shown as 3 x 3 matrix like following.
+---+---+---+
| | u | | u: up value
| l | m | r | l: left value, r: right value, m: mean value
| | d | | d: down value
+---+---+---+
"""
env = gym.make("FrozenLake-v1")
nrow = env.unwrapped.nrow
ncol = env.unwrapped.ncol
state_size = 3
q_nrow = nrow * state_size
q_ncol = ncol * state_size
reward_map = np.zeros((q_nrow, q_ncol))
for r in range(nrow):
for c in range(ncol):
s = r * ncol + c
state_exist = False
if isinstance(Q, dict) and s in Q:
state_exist = True
elif isinstance(Q, (np.ndarray, np.generic)) and s < Q.shape[0]:
state_exist = True
if state_exist:
# At the display map, the vertical index is reversed.
_r = 1 + (nrow - 1 - r) * state_size
_c = 1 + c * state_size
reward_map[_r][_c - 1] = Q[s][0] # LEFT = 0
reward_map[_r - 1][_c] = Q[s][1] # DOWN = 1
reward_map[_r][_c + 1] = Q[s][2] # RIGHT = 2
reward_map[_r + 1][_c] = Q[s][3] # UP = 3
reward_map[_r][_c] = np.mean(Q[s]) # Center
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
plt.imshow(reward_map, cmap=cm.RdYlGn, interpolation="bilinear",
vmax=abs(reward_map).max(), vmin=-abs(reward_map).max())
ax.set_xlim(-0.5, q_ncol - 0.5)
ax.set_ylim(-0.5, q_nrow - 0.5)
ax.set_xticks(np.arange(-0.5, q_ncol, state_size))
ax.set_yticks(np.arange(-0.5, q_nrow, state_size))
ax.set_xticklabels(range(ncol + 1))
ax.set_yticklabels(range(nrow + 1))
ax.grid(which="both")
plt.show()
今回の迷路は次の図の通りである。
紫がスタート、黄色がゴール、緑が落とし穴となっている。
青が通路で落とし穴を避けてゴールを目指す。
Q-learningの実装を行う。
class QLearningAgent(ELAgent):
def __init__(self, epsilon=0.1):
super().__init__(epsilon)
def learn(self, env, episode_count=1000, gamma=0.9,
learning_rate=0.1, render=False, report_interval=50):
self.init_log()
actions = list(range(env.action_space.n))
self.Q = defaultdict(lambda: [0] * len(actions))
for e in range(episode_count):
s = env.reset()
done = False
while not done:
if render:
env.render()
a = self.policy(s, actions)
n_state, reward, done, info = env.step(a)
gain = reward + gamma * max(self.Q[n_state])
estimated = self.Q[s][a]
self.Q[s][a] += learning_rate * (gain - estimated)
s = n_state
else:
self.log(reward)
if e != 0 and e % report_interval == 0:
self.show_reward_log(episode=e)
def train():
agent = QLearningAgent()
env = gym.make("FrozenLakeEasy-v1")
agent.learn(env, episode_count=500)
show_q_value(agent.Q)
agent.show_reward_log()
学習を行い。結果を確認する。
色が濃いほど価値が高いことを表す。ゴールに向かっていることが分かる。
train()
報酬も学習が進むごとに大きくなっていることが分かる。
2. Alpha Go
2.1 要約
Alpha Go
PolicyNet
出力は19×19マスの着手予想確率が出力される
ValueNet
出力は現局面の勝率を-1~1で表したものが出力される
RollOutPolicy
NNではなく線形の方策関数であり、探索中に高速に着手確率を出すために使用される
出力はそのマスの着手確率となる
Alpha Goの学習
以下の手順で行われる
- 教師あり学習によるRollOutPlicyとPolicyNetの学習
- 強化学習によるPolicyNetの学習
- 強化学習によるValueNetの学習
- PolicyNetの教師あり学習
3000万局面分の教師を用意し、教師と同じ着手を予測できるように学習を行った。
具体的には、教師が着手した手を1とし残りを0とした19×19次元の配列を教師とし、それを分類問題として学習した。
- PolicyNetの強化学習
現状のPolicyNetとPlicyPoolからランダムに選択されたPlicyNetと対局シミュレーションを行い、その結果を用いて方策勾配法で学習を行った。
PlicyPoolとは、PolicyNetの強化学習の過程を500Iterationごとに記録して保存しておいたものである。
- ValueNetの強化学習
PolicyNetを使用して対局シミュレーションを行い、その結果を勝敗の教師として学習した。
教師データの作成基準は、
- SL PolicyNet(教師あり学習で作成したPlicyNet)でN手まで打つ
- N+1手目の手をランダムに選択し、その手で進めた局面をS(N+1)とする
- S(N+1)からRL PolicyNet(強化学習で作成したPolicyNet)で終局まで打ち、その勝敗報酬をRとする
S(N+1)とRを教師データ対とし、損失関数を平均二乗誤差とし、回帰問題として学習した。
モンテカルロ木探索
盤面評価値に頼らず末端評価値、つまり勝敗のみを使う探索法。囲碁の場合は、最大手数はマスの数でほぼ限定されるため、末端局面に到達しやすい。
現局面から末端局面までのPlayOutと呼ばれるランダムシミュレーションを多数回行い、その勝敗を集計して着手の優劣を決定する。
また、該当手のシミュレーション回数が一定数を超えたら、その手を着手したあとの局面をシミュレーション開始局面とするよう、探索木を成長させる。
Alpha Goのモンテカルロ木探索は4ステップで構成される。
- 選択
- 評価
- バックアップ
- 成長
Alpah Go Zero
Alpha Go ZeroのAlpha Goとの違いは、
- 教師あり学習を一切行わず、強化学習のみで作成
- 特徴入力からヒューリスティックな要素を排除し、石の配置のみにした
- PolicyNetとValueNetを1つのネットワークに統合した
- Residual Netを導入した
- モンテカルロ木探索からRollOutシミュレーションをなくした
PolicyValueNet(Alpha Go Zero)
Residual Net
ネットワークにショートカット構造を追加して、勾配爆発や勾配消失を抑える効果を狙ったもの
Residual Netの派生形として、
-
Residual Blockの工夫:
- Bottleneck
- PreActivation -
Network構造の工夫:
- WideResNet
- PyramidNet
などがある。
Alpha Go Zeroのモンテカルロ木探索
3つのステップで構成される。
- 選択
- 評価及び成長(RollOutは行わない)
- バックアップ
Alpha Go Zeroの学習法
3つのステップで構成される。
- 自己対極による教師データの作成
- 学習
- ネットワークの更新
3. 軽量化・高速化技術
分散深層学習
複数の計算資源を使用し、並列的にニューラルネットワークを構成することで効率の良い学習を行う。
高速化技術として
- データ並列化
- モデル並列化
- GPU
データ並列化
- 親モデルを各ワーカーに子モデルとしてコピー
- データを分割し、各ワーカーに計算させる
- 同期型: 各ワーカーが計算が終わるのを待ち、親モデルのパラメータを更新する
- 非同期型: 各ワーカーはお互いの計算を待たず、各子モデルごとに更新を行う。
処理のスピードは非同期型の方が早いが、学習が不安定になりやすく精度の面でも同期型に劣る。
モデル並列化
- 親モデルを各ワーカーに分割し、それぞれのモデルを学習させる。すべてのデータで学習が終わった後で、一つのモデルに復元。
- モデルが大きいときはモデル並列化を、データが大きいときはデータ並列化を行うとよい。
GPU
- GPGPU: 元々の使用目的であるグラフィック以外の用途で使用されるGPUの総称
- CUDA
- GPU上で並列コンピューティングを行うためのプラットフォーム
- OpenCL
- オープンな並列コンピューティングのプラットフォーム - CPU
- 高性能なコアが少数
- 複雑で連続的な処理が得意 - GPU
- 比較的低性能なコアが多数
- 簡単な並列処理が得意
- ニューラルネットワークの学習は単純な計算が多いので、高速化が可能
モデルの軽量化
モデルの精度を維持しつつパラメータや円演回数を提言する手法の総称
- 高メモリ 負荷の高い演算性能が求められる
- 低メモリ 低演算性能での利用が必要とされるIoTなど
モデルの軽量化は計算の高速化と省メモリ化を行うためモバイル、IoT機器において有効な手法
計量化の手法としては
- 量子化
- 蒸留
- プルーニング
量子化
通常のパラメータの64 bit浮動小数点を32 bitなど下位の精度に落とすことでメモリと演算処理の削減を行う。
利点としては、計算の高速化・省メモリ化が挙げられるが、精度が低下するという欠点がある。
- 計算の高速化
64 bitと32 bitは演算性能が大きく違うため、量子化により精度を落とすことによりより多くの計算をすることができる。
「FLOPS」はコンピュータの処理性能を示す単位である。
- 省メモリ化
ニューロンの重みをbit数を少なくして有効桁数を下げることで、ニューロンのメモリサイズを小さくすることができ、メモリ消費量を抑えることができる。
-
精度の低下
ニューロンで表現できる少数の桁数が小さくなるため、モデルの表現力が低下する。
実際の問題では倍精度を単精度にしてもほぼ精度は変わらない。 -
極端な量子化
量子化する際は極端に精度が落ちない程度に量子化をしなければならない
速度の比較
計量化されたものの方が計算が速いことが確認できる
精度の比較
検出されたBBを見ると、オブジェクト以外の領域が多くなっている。
蒸留
精度の高いモデルはニューロンの規模が大きくなっている。
規模の大きなモデルの知識を使い軽量なモデルの作成を行うことを蒸留という。
- モデルの簡約化
学習済みの精度の高いモデルの知識を計量なモデルへ継承させる
- 教師モデルと生徒モデル
蒸留は教師モデルと生徒モデルの2つで構成される
教師モデル 予測精度の高い、複雑なモデルやアンサンブルされたモデル
生徒モデル 教師モデルをもとに作られる軽量なモデル
教師モデルの重みを固定し生徒モデルの重みを更新していく
誤差は教師モデルと生徒モデルのそれぞれの誤差を使い重みを更新していく
プルーニング
大きなネットワークの大量のパラメータがすべて制度に寄与しているわけではないことから、寄与が少ないニューロンを削除することでモデルの軽量化・高速化を行うこと。
寄与の少ないニューロンの削除を行いモデルの圧縮を行う。
ニューロンの削減は重みが閾値以下の場合ニューロンを削減し、再学習を行う。
下図は閾値が0.1の場合の例である。
3.2 実装
pythonでGPUを使う方法としてcupyがあり、ほぼnumpyと同じように書くことができる。
(GPUがないため実行はできない)
import cupy as cp
A = cp.arange(9).reshape(3, 3).astype('f') #cupy上で3*3の行列を生成
B = cp.arange(9).reshape(3, 3).astype('f') #cupy上で3*3の行列を生成
print('A = \n', A)
print('B = \n', B)
A = [[0., 1., 2.],
[3., 4., 5.],
[6., 7., 8.]]
B = [[0., 1., 2.],
[3., 4., 5.],
[6., 7., 8.]]
蒸留
実装は行わないが、実装の際は、
- 学習済みモデルを教師モデルとして重みの学習をさせないように設定する
- 新たにモデルを生徒モデルとして作成し、教師モデルと出力の形式を合わせる
- 教師モデルの出力を教師データとして生徒モデルの学習を行う
といったような流れとなる.
参考URL
https://qiita.com/M_Hiro/items/0ba24788c78540046bcd
4. 応用技術
4.1 要約
MobileNet
ディープラーニングモデルの軽量化・高速化・高精度化を実現
通常の畳み込み層
計算量を考えると、
出力マップでの1つの領域に対応する計算では、面積(カーネルサイズ×カーネルサイズ)×チャンネル数×フィルタ数と計算できる。
$$
(K×K×C)×M
$$
出力マップ全体では上式の面積倍となるから
$$
(H×W)×(K×K×C)×M
$$
となる。
Depthwise Convolution
- 入力マップのチャネルごとに畳み込みを実施
- 出力マップをそれらと結合
チャンネル間での計算はされない(チャネル数は変えない)ので計算量は、
$$
(H×W)×C×(K×K)
$$
となる。
Pointwise Convolution
- 1×1 Conv
- 入力マップのポイントごとに畳み込みを実施
- 出力マップはフィルタ数分だけ作成可能
計算量は、通常の畳み込みのカーネルサイズが1のものであることから、
$$
(H×W)×C×M
$$
となる。
MobileNet
Depthwise Separable ConvolutionではそれらをDepthwise ConvolutionとPointwise Convolutionと呼ばれる演算によって個別に行うことで計算量を削減する。
DenseNet
ニューラルネットワークでは層が深くなるにつれて学習が難しくなるが、前方の層から後方の層へアイデンティティ接続を介してパスを作ることで問題に対処した。
- 出力層に前の層の入力を足し合わせる
入力に対し、Batch正規化-ReLU-3×3畳み込み処理を行い、入力特徴に足し合わせる
第l層の出力は次のようになる。
$$
x_l=H_l([x_0,x_1,\cdots,x_{l-1}])
$$
growth rate(成長率)
ネットワークの大きさを表すパラメータ$k$をgrowth rateと呼ぶ。
$k$個ずつ特徴マップのチャネル数が増加していく。
Transition Layer
Dense blockをつなぐ層で、特徴マップのサイズを変更する。
Batch Norm
ミニバッチに含まれるsampleの同一チャネルが同一分布に従うように正規化
Layer Norm
それぞれのsampleの全てのpixelが同一分布に従うよう正規化
バッチ数に依存しない
入力データのスケール、重み行列のスケールやシフトに関してロバスト
Instance Norm
さらにchannelも同一分布に従うように正規化
(各サンプルの各チャンネルごとに正規化)
WaveNet
生の音声波形を生成するモデル
時系列データに対して畳み込みを適用
Dilated convolution
層が深くなるにつれて畳み込むリンクを話す
4.2 実装
mobile netの実装を行う。
必要なライブラリのインポートを行う。
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization, Input, Reshape, Add, ReLU, Reshape, Lambda, Multiply
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, AveragePooling2D, Flatten, DepthwiseConv2D
from tensorflow.keras import backend as K
import tensorflow.keras as keras
from keras.layers.merge import concatenate
import tensorflow as tf
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import LearningRateScheduler
import math
import cv2
import numpy as np
データの準備
'''データセットの読み込み'''
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
'''バッチサイズ、クラス数、エポック数の設定'''
batch_size=64
num_classes=10
epochs=100
'''データリサイズ'''
img_rows=224
img_cols=224
x_train = np.array([cv2.resize(img, (img_rows,img_cols)) for img in x_train[::5,:,:,:]])
x_test = np.array([cv2.resize(img, (img_rows,img_cols)) for img in x_test[::5,:,:,:]])
'''データ正規化'''
x_train=x_train.astype('float32')
x_train/=255
x_test=x_test.astype('float32')
x_test/=255
'''one-hotベクトル化'''
y_train = y_train[::5]
y_test = y_test[::5]
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
'''shape表示'''
print("x_train : ", x_train.shape)
print("y_train : ", y_train.shape)
print("x_test : ", x_test.shape)
print("y_test : ", y_test.shape)
必要なモジュールの定義等を行う。
# mobile net V3
def hard_sigmoid(x):
return ReLU(6.)(x + 3.) * (1. / 6.)
def hard_swish(x):
return Multiply()([layers.Activation(hard_sigmoid)(x), x])
MyGlobalAverage2D = Lambda(lambda t4d: K.mean(t4d, axis=(1,2), keepdims=True), name='GlobalAverage2D' )
def se_block(inputs, filters, se_ratio):
x = MyGlobalAverage2D(inputs)
x = Conv2D(int(filters * se_ratio),
kernel_size=1,
padding='same')(x)
x = ReLU()(x)
x = Conv2D(filters,
kernel_size=1,
padding='same')(x)
x = Activation(hard_sigmoid)(x)
x = Multiply()([inputs, x])
return x
def MobileNetV3_Block(input_layer, output_ch, s, exp_size, use_se, se_ratio, activation):
"""Inverted Residual block"""
k = K.int_shape(input_layer)[3]
# 1×1の通常の畳み込み
# ここでチャネル数を変える
x = Conv2D(filters = exp_size, kernel_size = (1,1), padding = 'same')(input_layer)
x = BatchNormalization()(x)
x = Activation(activation)(x)
# チャネルごとに畳み込み処理を行う
# チャネル数はここでは変わらない
x = DepthwiseConv2D(kernel_size = (3,3), strides=s, padding = 'same')(x)
x = BatchNormalization()(x)
x = Activation(activation)(x)
# Squeeze and Excitation
if use_se:
x = se_block(x, filters=exp_size, se_ratio=se_ratio)
# 1×1の通常の畳み込み
# ここでチャネル数を変える
x = Conv2D(filters = output_ch, kernel_size = (1,1), padding = 'same')(x)
x = BatchNormalization()(x)
if K.int_shape(input_layer) == K.int_shape(x):
x = Add()([input_layer, x])
return x
ネットワークの定義
output_dims = [16, 24, 24, 40, 40, 40, 80, 80, 80, 80, 112, 112, 160, 160, 160]
exp_sizes = [16, 64, 72, 72, 120, 120, 240, 200, 184, 184, 480, 672, 672, 960]
use_se = [False, False, False, True, True, True, False, False, False, False, True, True, True, True, True]
activations = ['RE', 'RE', 'RE', 'RE', 'RE', 'RE', 'HS', 'HS', 'HS', 'HS', 'HS', 'HS', 'HS', 'HS', 'HS']
strides = [1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1]
input_layer = Input(shape = (224, 224, 3))
x = Conv2D(filters = 16, kernel_size = (1,1), strides=2, padding = 'same')(input_layer)
x = BatchNormalization()(x)
x = Activation(hard_sigmoid)(x)
for o, es, se, s, activation in zip(output_dims, exp_sizes, use_se, strides, activations):
ac = hard_sigmoid if activation == 'HS' else ReLU()
x = MobileNetV3_Block(x, output_ch=o, exp_size=es, s=s, use_se=se, se_ratio=0.25, activation=ac)
x = Conv2D(filters = 960, kernel_size = (1,1), padding = 'same')(x)
x = BatchNormalization()(x)
x = Activation(hard_sigmoid)(x)
# 全結合層
x = GlobalAveragePooling2D()(x)
x = Dense(1280)(x)
x = Activation(hard_sigmoid)(x)
outputs = Dense(10, activation='softmax')(x)
model = Model(input_layer, outputs)
学習
model.compile(
optimizer='sgd',
loss='categorical_crossentropy',
metrics=['accuracy']
)
history=model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,
validation_split=0.2)
#validation_data=(x_test, y_test))
model.save('my_model')
4.3 演習
(い) $(H×W)×C×(D_K×D_K)$
(う) $(H×W)×C×M$
(あ) Dilated convolution
(い) 受容野を簡単に増やすことができる
5. Transformer
要約
- Attention
翻訳先の各単語を選択する際に、翻訳元の文中の各単語の隠れ状態を利用
翻訳元の各単語の隠れ状態の加重平均
$$
c_i=\sum_{j=1}^{T_x}\alpha_{ij}h_{j}
$$
重みは
$$
\alpha_{ij}=\frac{\exp{e_{ij}}}{\sum_{k=1}^{T_x}\exp{e_{ik}}}\
e_{ij}=a(s_{i-1},h_j)
$$
- Transformer
RNNを使わない、Attentionのみを用いたモデル
Transformer Encoder
6層で構成される
Self-Attention
Attentionには
- Source Target Attention
- Self-Attention
の2種類がある。
TransformerではSelf-Attentionを使用する。
Position-Wise Feed-Forward Networks
位置情報を保持したまま順伝搬させる
各Attention層の出力を決定する
- ReLUを挟んだ2層の全結合NN
Scaled dot product attention
全単語に関するAttentionをまとめて計算する
$$
Attention(Q,K,V)=softmax\biggl(\frac{QK^T}{\sqrt{d_k}} \biggr)V
$$
Multi Head Attention
8個のScaled Dot-ProductによるAttentionの出力を結合
Transformer Decoder
- 6層で構成される
- 未来の情報をみないようにマスク
- Encoderの出力へのAttention
Add & Norm
- Add(Residual Connection)
入出力の差分を学習させる
実装上は出力に入力をそのまま加算させるだけ
- Norm(Layer Normalization)
学習を高速化する効果がある
Position Encoding
単語列の語順情報を追加する
$$
PE_{(pos,2i)}=\sin\biggl(\frac{pos}{10000^{2i/512}} \biggr)\
PE_{(pos,2i+1)}=\cos\biggl(\frac{pos}{10000^{2i/512}} \biggr)
$$
5.2 実装
必要なライブラリとデータの準備を行う。
import time
import numpy as np
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from nltk import bleu_score
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from utils import Vocab
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)
random_state = 42
PAD = 0
UNK = 1
BOS = 2
EOS = 3
PAD_TOKEN = '<PAD>'
UNK_TOKEN = '<UNK>'
BOS_TOKEN = '<S>'
EOS_TOKEN = '</S>'
データの準備
データを読み込みます。
データは日本語とそれに対応する英語のデータです。
def load_data(file_path):
"""
テキストファイルからデータを読み込む
:param file_path: str, テキストファイルのパス
:return data: list, 文章(単語のリスト)のリスト
"""
data = []
for line in open(file_path, encoding='utf-8'):
words = line.strip().split() # スペースで単語を分割
data.append(words)
return data
train_X = load_data('./data/train.en')
train_Y = load_data('./data/train.ja')
# 訓練データと検証データに分割
train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
MIN_COUNT = 2 # 語彙に含める単語の最低出現回数
word2id = {
PAD_TOKEN: PAD,
BOS_TOKEN: BOS,
EOS_TOKEN: EOS,
UNK_TOKEN: UNK,
}
vocab_X = Vocab(word2id=word2id)
vocab_Y = Vocab(word2id=word2id)
vocab_X.build_vocab(train_X, min_count=MIN_COUNT)
vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT)
vocab_size_X = len(vocab_X.id2word)
vocab_size_Y = len(vocab_Y.id2word)
def sentence_to_ids(vocab, sentence):
"""
単語のリストをインデックスのリストに変換する
:param vocab: Vocabのインスタンス
:param sentence: list of str
:return indices: list of int
"""
ids = [vocab.word2id.get(word, UNK) for word in sentence]
ids = [BOS] + ids + [EOS] # EOSを末尾に加える
return ids
train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X]
train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y]
valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X]
valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
pytorchでの学習ではDataLoaderを定義して、データの読み込みを行います。
class DataLoader(object):
def __init__(self, src_insts, tgt_insts, batch_size, shuffle=True):
"""
:param src_insts: list, 入力言語の文章(単語IDのリスト)のリスト
:param tgt_insts: list, 出力言語の文章(単語IDのリスト)のリスト
:param batch_size: int, バッチサイズ
:param shuffle: bool, サンプルの順番をシャッフルするか否か
"""
self.data = list(zip(src_insts, tgt_insts))
self.batch_size = batch_size
self.shuffle = shuffle
self.start_index = 0
self.reset()
def reset(self):
if self.shuffle:
self.data = shuffle(self.data, random_state=random_state)
self.start_index = 0
def __iter__(self):
return self
def __next__(self):
def preprocess_seqs(seqs):
# パディング
max_length = max([len(s) for s in seqs])
data = [s + [PAD] * (max_length - len(s)) for s in seqs]
# 単語の位置を表現するベクトルを作成
positions = [[pos+1 if w != PAD else 0 for pos, w in enumerate(seq)] for seq in data]
# テンソルに変換
data_tensor = torch.tensor(data, dtype=torch.long, device=device)
position_tensor = torch.tensor(positions, dtype=torch.long, device=device)
return data_tensor, position_tensor
# ポインタが最後まで到達したら初期化する
if self.start_index >= len(self.data):
self.reset()
raise StopIteration()
# バッチを取得して前処理
src_seqs, tgt_seqs = zip(*self.data[self.start_index:self.start_index+self.batch_size])
src_data, src_pos = preprocess_seqs(src_seqs)
tgt_data, tgt_pos = preprocess_seqs(tgt_seqs)
# ポインタを更新する
self.start_index += self.batch_size
return (src_data, src_pos), (tgt_data, tgt_pos)
各モジュールの定義
1. Position Encoding
def position_encoding_init(n_position, d_pos_vec):
"""
Positional Encodingのための行列の初期化を行う
:param n_position: int, 系列長
:param d_pos_vec: int, 隠れ層の次元数
:return torch.tensor, size=(n_position, d_pos_vec)
"""
# PADがある単語の位置はpos=0にしておき、position_encも0にする
position_enc = np.array([
[pos / np.power(10000, 2 * (j // 2) / d_pos_vec) for j in range(d_pos_vec)]
if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
return torch.tensor(position_enc, dtype=torch.float)
pe = position_encoding_init(50, 256).numpy()
plt.figure(figsize=(16,8))
sns.heatmap(pe, cmap='Blues')
plt.show()
Position Encodingを可視化する。
縦軸が単語の位置を、横軸が成分の次元を表している。
2. Multihead Attention
まず、Scaled Dot-Product Attentinoの実装を行う。
class ScaledDotProductAttention(nn.Module):
def __init__(self, d_model, attn_dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param attn_dropout: float, ドロップアウト率
"""
super(ScaledDotProductAttention, self).__init__()
self.temper = np.power(d_model, 0.5) # スケーリング因子
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=-1)
def forward(self, q, k, v, attn_mask):
"""
:param q: torch.tensor, queryベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:param k: torch.tensor, key,
size=(n_head*batch_size, len_k, d_model/n_head)
:param v: torch.tensor, valueベクトル,
size=(n_head*batch_size, len_v, d_model/n_head)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(n_head*batch_size, len_q, len_k)
:return output: 出力ベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:return attn: Attention
size=(n_head*batch_size, len_q, len_k)
"""
# QとKの内積でAttentionの重みを求め、スケーリングする
attn = torch.bmm(q, k.transpose(1, 2)) / self.temper # (n_head*batch_size, len_q, len_k)
# Attentionをかけたくない部分がある場合は、その部分を負の無限大に飛ばしてSoftmaxの値が0になるようにする
attn.data.masked_fill_(attn_mask, -float('inf'))
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
Multi-Head Attentinoの実装を行う。
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
"""
:param n_head: int, ヘッド数
:param d_model: int, 隠れ層の次元数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(MultiHeadAttention, self).__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
# 各ヘッドごとに異なる重みで線形変換を行うための重み
# nn.Parameterを使うことで、Moduleのパラメータとして登録できる. TFでは更新が必要な変数はtf.Variableでラップするのでわかりやすい
self.w_qs = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_ks = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_vs = nn.Parameter(torch.empty([n_head, d_model, d_v], dtype=torch.float))
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.w_qs)
nn.init.xavier_normal_(self.w_ks)
nn.init.xavier_normal_(self.w_vs)
self.attention = ScaledDotProductAttention(d_model)
self.layer_norm = nn.LayerNorm(d_model) # 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正則化
self.proj = nn.Linear(n_head*d_v, d_model) # 複数ヘッド分のAttentionの結果を元のサイズに写像するための線形層
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.proj.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, q, k, v, attn_mask=None):
"""
:param q: torch.tensor, queryベクトル,
size=(batch_size, len_q, d_model)
:param k: torch.tensor, key,
size=(batch_size, len_k, d_model)
:param v: torch.tensor, valueベクトル,
size=(batch_size, len_v, d_model)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(batch_size, len_q, len_k)
:return outputs: 出力ベクトル,
size=(batch_size, len_q, d_model)
:return attns: Attention
size=(n_head*batch_size, len_q, len_k)
"""
d_k, d_v = self.d_k, self.d_v
n_head = self.n_head
# residual connectionのための入力 出力に入力をそのまま加算する
residual = q
batch_size, len_q, d_model = q.size()
batch_size, len_k, d_model = k.size()
batch_size, len_v, d_model = v.size()
# 複数ヘッド化
# torch.repeat または .repeatで指定したdimに沿って同じテンソルを作成
q_s = q.repeat(n_head, 1, 1) # (n_head*batch_size, len_q, d_model)
k_s = k.repeat(n_head, 1, 1) # (n_head*batch_size, len_k, d_model)
v_s = v.repeat(n_head, 1, 1) # (n_head*batch_size, len_v, d_model)
# ヘッドごとに並列計算させるために、n_headをdim=0に、batch_sizeをdim=1に寄せる
q_s = q_s.view(n_head, -1, d_model) # (n_head, batch_size*len_q, d_model)
k_s = k_s.view(n_head, -1, d_model) # (n_head, batch_size*len_k, d_model)
v_s = v_s.view(n_head, -1, d_model) # (n_head, batch_size*len_v, d_model)
# 各ヘッドで線形変換を並列計算(p16左側`Linear`)
q_s = torch.bmm(q_s, self.w_qs) # (n_head, batch_size*len_q, d_k)
k_s = torch.bmm(k_s, self.w_ks) # (n_head, batch_size*len_k, d_k)
v_s = torch.bmm(v_s, self.w_vs) # (n_head, batch_size*len_v, d_v)
# Attentionは各バッチ各ヘッドごとに計算させるためにbatch_sizeをdim=0に寄せる
q_s = q_s.view(-1, len_q, d_k) # (n_head*batch_size, len_q, d_k)
k_s = k_s.view(-1, len_k, d_k) # (n_head*batch_size, len_k, d_k)
v_s = v_s.view(-1, len_v, d_v) # (n_head*batch_size, len_v, d_v)
# Attentionを計算(p16.左側`Scaled Dot-Product Attention * h`)
outputs, attns = self.attention(q_s, k_s, v_s, attn_mask=attn_mask.repeat(n_head, 1, 1))
# 各ヘッドの結果を連結(p16左側`Concat`)
# torch.splitでbatch_sizeごとのn_head個のテンソルに分割
outputs = torch.split(outputs, batch_size, dim=0) # (batch_size, len_q, d_model) * n_head
# dim=-1で連結
outputs = torch.cat(outputs, dim=-1) # (batch_size, len_q, d_model*n_head)
# residual connectionのために元の大きさに写像(p16左側`Linear`)
outputs = self.proj(outputs) # (batch_size, len_q, d_model)
outputs = self.dropout(outputs)
outputs = self.layer_norm(outputs + residual)
return outputs, attns
3. Position-Wise Feed Forward Network
class PositionwiseFeedForward(nn.Module):
"""
:param d_hid: int, 隠れ層1層目の次元数
:param d_inner_hid: int, 隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
def __init__(self, d_hid, d_inner_hid, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
# window size 1のconv層を定義することでPosition wiseな全結合層を実現する.
self.w_1 = nn.Conv1d(d_hid, d_inner_hid, 1)
self.w_2 = nn.Conv1d(d_inner_hid, d_hid, 1)
self.layer_norm = nn.LayerNorm(d_hid)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
def forward(self, x):
"""
:param x: torch.tensor,
size=(batch_size, max_length, d_hid)
:return: torch.tensor,
size=(batch_size, max_length, d_hid)
"""
residual = x
output = self.relu(self.w_1(x.transpose(1, 2)))
output = self.w_2(output).transpose(2, 1)
output = self.dropout(output)
return self.layer_norm(output + residual)
4. Masking
def get_attn_padding_mask(seq_q, seq_k):
"""
keyのPADに対するattentionを0にするためのマスクを作成する
:param seq_q: tensor, queryの系列, size=(batch_size, len_q)
:param seq_k: tensor, keyの系列, size=(batch_size, len_k)
:return pad_attn_mask: tensor, size=(batch_size, len_q, len_k)
"""
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(PAD).unsqueeze(1) # (N, 1, len_k) PAD以外のidを全て0にする
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # (N, len_q, len_k)
return pad_attn_mask
def get_attn_subsequent_mask(seq):
"""
未来の情報に対するattentionを0にするためのマスクを作成する
:param seq: tensor, size=(batch_size, length)
:return subsequent_mask: tensor, size=(batch_size, length, length)
"""
attn_shape = (seq.size(1), seq.size(1))
# 上三角行列(diagonal=1: 対角線より上が1で下が0)
subsequent_mask = torch.triu(torch.ones(attn_shape, dtype=torch.uint8, device=device), diagonal=1)
subsequent_mask = subsequent_mask.repeat(seq.size(0), 1, 1)
return subsequent_mask
モデルの定義
Encoder
class EncoderLayer(nn.Module):
"""Encoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(EncoderLayer, self).__init__()
# Encoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout)
# Postionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
"""
:param enc_input: tensor, Encoderの入力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attn: tensor, EncoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてEncoderの入力(enc_input)が入る
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, attn_mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
class Encoder(nn.Module):
"""EncoderLayerブロックからなるEncoderのクラス"""
def __init__(
self, n_src_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Encoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=PAD)
# EncoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, src_seq, src_pos):
"""
:param src_seq: tensor, 入力系列,
size=(batch_size, max_length)
:param src_pos: tensor, 入力系列の各単語の位置情報,
size=(batch_size, max_length)
:return enc_output: tensor, Encoderの最終出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attns: list, EncoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
enc_input = self.src_word_emb(src_seq)
# Positional EncodingのEmbeddingを加算する
enc_input += self.position_enc(src_pos)
enc_slf_attns = []
enc_output = enc_input
# key(=enc_input)のPADに対応する部分のみ1のマスクを作成
enc_slf_attn_mask = get_attn_padding_mask(src_seq, src_seq)
# n_layers個のEncoderLayerに入力を通す
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(
enc_output, slf_attn_mask=enc_slf_attn_mask)
enc_slf_attns += [enc_slf_attn]
return enc_output, enc_slf_attns
Decoder
class DecoderLayer(nn.Module):
"""Decoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(DecoderLayer, self).__init__()
# Decoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Encoder-Decoder間のSource-Target Attention
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Positionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, dec_input, enc_output, slf_attn_mask=None, dec_enc_attn_mask=None):
"""
:param dec_input: tensor, Decoderの入力,
size=(batch_size, max_length, d_model)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:param dec_enc_attn_mask: tensor, Soutce-Target Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return dec_output: tensor, Decoderの出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attn: tensor, DecoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
:return dec_enc_attn: tensor, DecoderのSoutce-Target Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてDecoderの入力(dec_input)が入る
dec_output, dec_slf_attn = self.slf_attn(
dec_input, dec_input, dec_input, attn_mask=slf_attn_mask)
# Source-Target-AttentionのqueryにはDecoderの出力(dec_output), key, valueにはEncoderの出力(enc_output)が入る
dec_output, dec_enc_attn = self.enc_attn(
dec_output, enc_output, enc_output, attn_mask=dec_enc_attn_mask)
dec_output = self.pos_ffn(dec_output)
return dec_output, dec_slf_attn, dec_enc_attn
class Decoder(nn.Module):
"""DecoderLayerブロックからなるDecoderのクラス"""
def __init__(
self, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(
n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.tgt_word_emb = nn.Embedding(
n_tgt_vocab, d_word_vec, padding_idx=PAD)
self.dropout = nn.Dropout(dropout)
# DecoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, tgt_seq, tgt_pos, src_seq, enc_output):
"""
:param tgt_seq: tensor, 出力系列,
size=(batch_size, max_length)
:param tgt_pos: tensor, 出力系列の各単語の位置情報,
size=(batch_size, max_length)
:param src_seq: tensor, 入力系列,
size=(batch_size, n_src_vocab)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return dec_output: tensor, Decoderの最終出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
dec_input = self.tgt_word_emb(tgt_seq)
# Positional EncodingのEmbeddingを加算する
dec_input += self.position_enc(tgt_pos)
# Self-Attention用のマスクを作成
# key(=dec_input)のPADに対応する部分が1のマスクと、queryから見たkeyの未来の情報に対応する部分が1のマスクのORをとる
dec_slf_attn_pad_mask = get_attn_padding_mask(tgt_seq, tgt_seq) # (N, max_length, max_length)
dec_slf_attn_sub_mask = get_attn_subsequent_mask(tgt_seq) # (N, max_length, max_length)
dec_slf_attn_mask = torch.gt(dec_slf_attn_pad_mask + dec_slf_attn_sub_mask, 0) # ORをとる
# key(=dec_input)のPADに対応する部分のみ1のマスクを作成
dec_enc_attn_pad_mask = get_attn_padding_mask(tgt_seq, src_seq) # (N, max_length, max_length)
dec_slf_attns, dec_enc_attns = [], []
dec_output = dec_input
# n_layers個のDecoderLayerに入力を通す
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output,
slf_attn_mask=dec_slf_attn_mask,
dec_enc_attn_mask=dec_enc_attn_pad_mask)
dec_slf_attns += [dec_slf_attn]
dec_enc_attns += [dec_enc_attn]
return dec_output, dec_slf_attns, dec_enc_attns
最後に、Transformerモジュールの実装を行う。
class Transformer(nn.Module):
"""Transformerのモデル全体のクラス"""
def __init__(
self, n_src_vocab, n_tgt_vocab, max_length, n_layers=6, n_head=8,
d_word_vec=512, d_model=512, d_inner_hid=1024, d_k=64, d_v=64,
dropout=0.1, proj_share_weight=True):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
:param proj_share_weight: bool, 出力言語の単語のEmbeddingと出力の写像で重みを共有する
"""
super(Transformer, self).__init__()
self.encoder = Encoder(
n_src_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.decoder = Decoder(
n_tgt_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.tgt_word_proj = nn.Linear(d_model, n_tgt_vocab, bias=False)
nn.init.xavier_normal_(self.tgt_word_proj.weight)
self.dropout = nn.Dropout(dropout)
assert d_model == d_word_vec # 各モジュールの出力のサイズは揃える
if proj_share_weight:
# 出力言語の単語のEmbeddingと出力の写像で重みを共有する
assert d_model == d_word_vec
self.tgt_word_proj.weight = self.decoder.tgt_word_emb.weight
def get_trainable_parameters(self):
# Positional Encoding以外のパラメータを更新する
enc_freezed_param_ids = set(map(id, self.encoder.position_enc.parameters()))
dec_freezed_param_ids = set(map(id, self.decoder.position_enc.parameters()))
freezed_param_ids = enc_freezed_param_ids | dec_freezed_param_ids
return (p for p in self.parameters() if id(p) not in freezed_param_ids)
def forward(self, src, tgt):
src_seq, src_pos = src
tgt_seq, tgt_pos = tgt
src_seq = src_seq[:, 1:]
src_pos = src_pos[:, 1:]
tgt_seq = tgt_seq[:, :-1]
tgt_pos = tgt_pos[:, :-1]
enc_output, *_ = self.encoder(src_seq, src_pos)
dec_output, *_ = self.decoder(tgt_seq, tgt_pos, src_seq, enc_output)
seq_logit = self.tgt_word_proj(dec_output)
return seq_logit
学習
def compute_loss(batch_X, batch_Y, model, criterion, optimizer=None, is_train=True):
# バッチの損失を計算
model.train(is_train)
pred_Y = model(batch_X, batch_Y)
gold = batch_Y[0][:, 1:].contiguous()
# gold = batch_Y[0].contiguous()
loss = criterion(pred_Y.view(-1, pred_Y.size(2)), gold.view(-1))
if is_train: # 訓練時はパラメータを更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
gold = gold.data.cpu().numpy().tolist()
pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().tolist()
return loss.item(), gold, pred
MAX_LENGTH = 20
batch_size = 64
num_epochs = 15
lr = 0.001
ckpt_path = 'transformer.pth'
max_length = MAX_LENGTH + 2
model_args = {
'n_src_vocab': vocab_size_X,
'n_tgt_vocab': vocab_size_Y,
'max_length': max_length,
'proj_share_weight': True,
'd_k': 32,
'd_v': 32,
'd_model': 128,
'd_word_vec': 128,
'd_inner_hid': 256,
'n_layers': 3,
'n_head': 6,
'dropout': 0.1,
}
# DataLoaderやモデルを定義
train_dataloader = DataLoader(
train_X, train_Y, batch_size
)
valid_dataloader = DataLoader(
valid_X, valid_Y, batch_size,
shuffle=False
)
model = Transformer(**model_args).to(device)
optimizer = optim.Adam(model.get_trainable_parameters(), lr=lr)
criterion = nn.CrossEntropyLoss(ignore_index=PAD, size_average=False).to(device)
BLEUスコアを計算する関数を定義する。
BLEUは機械翻訳などで使われる評価指標である。
def calc_bleu(refs, hyps):
"""
BLEUスコアを計算する関数
:param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:param hyps: list, モデルの生成した訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:return: float, BLEUスコア(0~100)
"""
refs = [[ref[:ref.index(EOS)]] for ref in refs]
hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps]
return 100 * bleu_score.corpus_bleu(refs, hyps)
# 訓練
best_valid_bleu = 0.
for epoch in range(1, num_epochs+1):
start = time.time()
train_loss = 0.
train_refs = []
train_hyps = []
valid_loss = 0.
valid_refs = []
valid_hyps = []
# train
for batch in train_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, optimizer, is_train=True
)
train_loss += loss
train_refs += gold
train_hyps += pred
# valid
for batch in valid_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, is_train=False
)
valid_loss += loss
valid_refs += gold
valid_hyps += pred
# 損失をサンプル数で割って正規化
train_loss /= len(train_dataloader.data)
valid_loss /= len(valid_dataloader.data)
# BLEUを計算
train_bleu = calc_bleu(train_refs, train_hyps)
valid_bleu = calc_bleu(valid_refs, valid_hyps)
# validationデータでBLEUが改善した場合にはモデルを保存
if valid_bleu > best_valid_bleu:
ckpt = model.state_dict()
torch.save(ckpt, ckpt_path)
best_valid_bleu = valid_bleu
elapsed_time = (time.time()-start) / 60
print('Epoch {} [{:.1f}min]: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format(
epoch, elapsed_time, train_loss, train_bleu, valid_loss, valid_bleu))
print('-'*80)
評価
def test(model, src, max_length=20):
# 学習済みモデルで系列を生成する
model.eval()
src_seq, src_pos = src
batch_size = src_seq.size(0)
enc_output, enc_slf_attns = model.encoder(src_seq, src_pos)
tgt_seq = torch.full([batch_size, 1], BOS, dtype=torch.long, device=device)
tgt_pos = torch.arange(1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
# 時刻ごとに処理
for t in range(1, max_length+1):
dec_output, dec_slf_attns, dec_enc_attns = model.decoder(
tgt_seq, tgt_pos, src_seq, enc_output)
dec_output = model.tgt_word_proj(dec_output)
out = dec_output[:, -1, :].max(dim=-1)[1].unsqueeze(1)
# 自身の出力を次の時刻の入力にする
tgt_seq = torch.cat([tgt_seq, out], dim=-1)
tgt_pos = torch.arange(t+1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
return tgt_seq[:, 1:], enc_slf_attns, dec_slf_attns, dec_enc_attns
def ids_to_sentence(vocab, ids):
# IDのリストを単語のリストに変換する
return [vocab.id2word[_id] for _id in ids]
def trim_eos(ids):
# IDのリストからEOS以降の単語を除外する
if EOS in ids:
return ids[:ids.index(EOS)]
else:
return ids
# 学習済みモデルの読み込み
model = Transformer(**model_args).to(device)
ckpt = torch.load(ckpt_path)
model.load_state_dict(ckpt)
# テストデータの読み込み
test_X = load_data('./data/dev.en')
test_Y = load_data('./data/dev.ja')
test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X]
test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
生成
test_dataloader = DataLoader(
test_X, test_Y, 1,
shuffle=False
)
src, tgt = next(test_dataloader)
src_ids = src[0][0].cpu().numpy()
tgt_ids = tgt[0][0].cpu().numpy()
print('src: {}'.format(' '.join(ids_to_sentence(vocab_X, src_ids[1:-1]))))
print('tgt: {}'.format(' '.join(ids_to_sentence(vocab_Y, tgt_ids[1:-1]))))
preds, enc_slf_attns, dec_slf_attns, dec_enc_attns = test(model, src)
pred_ids = preds[0].data.cpu().numpy().tolist()
print('out: {}'.format(' '.join(ids_to_sentence(vocab_Y, trim_eos(pred_ids)))))
BLEUの評価
# BLEUの評価
test_dataloader = DataLoader(
test_X, test_Y, 128,
shuffle=False
)
refs_list = []
hyp_list = []
for batch in test_dataloader:
batch_X, batch_Y = batch
preds, *_ = test(model, batch_X)
preds = preds.data.cpu().numpy().tolist()
refs = batch_Y[0].data.cpu().numpy()[:, 1:].tolist()
refs_list += refs
hyp_list += preds
bleu = calc_bleu(refs_list, hyp_list)
print(bleu)
6. 物体検知・セグメンテーション
6.1 要約
IoU
物体検出においては、クラスラベルだけでなく物体位置の予測精度も評価する。
位置の評価にはIoU(Intersection over Union、別名Jaccard係数)を使う。
出力結果が以下の場合だったとき、
confが0.5以上のものに対し、IoUが0.5以上であるかどうかでTPかどうか判断する。
あるクラスラベルですでに検出されているものは、FPとする。
4つの人が画像中に移っておりそれを検出する場合を考える。
confが0.5以上のものに対し、IoUが0.5以上であるかどうかでTPかどうか判断する。
クラスが固定されているので、検出した数を数える。
Precisionの分母は予測した数(ここでは6)、Recallの分母は正解の数(ここでは4である。)
AP:Average Precision
conf.の閾値を$\beta$としたとき、$Recall=R(\beta),Precision=P(\beta)$とすると、
AP(Average Precision)は
$$
AP=\int_{0}^1P(R)dR
$$
mAP: mean Average Precision
各クラスで計算したAPの平均をmAPと呼び クラス数が$C$のときのmAPは
$$
mAP=\frac{1}{C}\sum_{i=1}^CAP_i
$$
と計算できる。
FPS: Flames per Second
検出精度だけでなく検出測度も問題となる。
物体検知のフレームワーク
-
2段階検出器
- 候補領域ごとの検出とクラス推定を別々に行う
- 相対的に精度が高い傾向
- 相対的に計算量が大きく推論も遅い傾向 -
1段階検出器
- 候補領域ごとの検出とクラス推定を同時に行う
- 相対的に精度が低い傾向
- 相対的に計算量が小さく推論も早い傾向
SSD(Single Shot Detector)
- Default BOXを用意
- Default BOXを変形し、conf.を出力
アーキテクチャ
VGGネットワークを使用している。
特徴マップからの出力
- マップ中の1つの特徴量における1つのDefault Boxにつite
クラス数にオフセット項$(\delta x,\delta y,\delta w,\delta h)$が4つの成分をもつので、出力サイズは、
$$
class数+4
$$
となる。
ここで、Default Boxの角の座標は、
$$
x=x_0+0.1×\delta x × w_0\
y=y_0+0.1×\delta y × h_0\
$$
幅・高さは、
$$
w=w_0×\exp{(0.2×\delta w)}\
h=h_0×\exp{(0.2×\delta h)}
$$
と書ける。
- マップ中の各特徴量に$k$個のDefault Boxを用意するとき出力サイズは、
$$
k(class数+4)
$$
となる。
- 特徴マップのサイズが$m×n$であるとすれば、出力サイズは
$$
k(class数+4)mn
$$
となる。
SSDのデフォルトボックスの数
青い部分が各特徴量に対し4つのDefault Boxを持ち、赤い部分では6つのDefault Boxを持つ。
出力される情報は、8732×(21+4)個となる。
Non-Maximum Suppression
1つの物体に対して、1つのBBoxのみを残す処理を行う処理。
BBox同士の被っている面積が閾値以上である場合には、同じ物体への冗長なBBoxと判定する。
そして、確信度が一番大きなBBoxのみを残す処理を行う。
Hard Negative Mining
Hard Negative Miningは、Negative DBoxに分類されたDBoxのうち、学習に使用するDBoxの数を絞る操作である。
8732個のDBoxの大半はNegative DBoxに分類されるため、ラベル0ばかり学習することになるため調整を行う。
そこでNegative DBoxの数をPositive DBoxの一定数倍に制限する。
損失関数
検出位置に関してはmoothL1Loss、クラス分類に関しては交差エントロピー誤差関数を用いる。
Semantic segmentation
- FCN(Fully Convolutional Network)
- Deconvolution/Transposed convolution
処理の手順は、
- 特徴マップのpixel間隔をstrideだけ空ける
- 特徴マップのまわりに(kernel size - 1) - paddingだけ余白を作る
- 畳み込み演算を行う
- 輪郭情報の補完
低レイヤーPooling層の出力をelement-wise additionすることでローカルな情報を補完してからup-sampling
U-Net
DeconvNet&SegNet
Dilated Convolution
6.2 実装
SSDの実装を行う。
まずは、必要なライブラリのインポートを行う。
import numpy as np
import pandas as pd
from itertools import product as product
from math import sqrt as sqrt
from utils.match import match
import os.path as osp
import random
import xml.etree.ElementTree as ET
import cv2
import matplotlib.pyplot as plt
import torch
from torch import nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.autograd import Function
import torch.utils.data as data
データの準備
def make_datapath_list(rootpath):
imgpath_template = osp.join(rootpath, 'JPEGImages', '%s.jpg')
annopath_template = osp.join(rootpath, 'Annotations', '%s.xml')
train_id_names = osp.join(rootpath + 'ImageSets/Main/train.txt')
val_id_names = osp.join(rootpath + 'ImageSets/Main/val.txt')
train_img_list = list()
train_anno_list = list()
for line in open(train_id_names):
file_id = line.strip()
img_path = (imgpath_template % file_id)
anno_path = (annopath_template % file_id)
train_img_list.append(img_path)
train_anno_list.append(anno_path)
val_img_list = list()
val_anno_list = list()
for line in open(val_id_names):
file_id = line.strip()
img_path = (imgpath_template % file_id)
anno_path = (annopath_template % file_id)
val_img_list.append(img_path)
val_anno_list.append(anno_path)
return train_img_list, train_anno_list, val_img_list, val_anno_list
rootpath = "./data/VOCdevkit/VOC2012/"
train_img_list, train_anno_list, val_img_list, val_anno_list = make_datapath_list(rootpath)
xml形式のアノテーションデータをリストに変換
class Anno_xml2list(object):
def __init__(self, classes):
self.classes = classes
def __call__(self, xml_path, width, height):
ret = []
xml = ET.parse(xml_path).getroot()
for obj in xml.iter('object'):
difficult = int(obj.find('difficult').text)
if difficult == 1:
continue
bndbox = []
name = obj.find('name').text.lower().strip()
bbox = obj.find('bndbox')
pts = ['xmin', 'ymin', 'xmax', 'ymax']
for pt in (pts):
cur_pixel = int(bbox.find(pt).text) - 1
if pt == 'xmin' or pt == 'xmax':
cur_pixel /= width
else:
cur_pixel /= height
bndbox.append(cur_pixel)
label_idx = self.classes.index(name)
bndbox.append(label_idx)
ret += [bndbox]
return np.array(ret)
voc_classes = ['aeroplane', 'bicycle', 'bird', 'boat',
'bottle', 'bus', 'car', 'cat', 'chair',
'cow', 'diningtable', 'dog', 'horse',
'motorbike', 'person', 'pottedplant',
'sheep', 'sofa', 'train', 'tvmonitor']
transform_anno = Anno_xml2list(voc_classes)
ind = 1
image_file_path = val_img_list[ind]
img = cv2.imread(image_file_path)
height, width, channels = img.shape
transform_anno(val_anno_list[ind], width, height)
画像とアノテーションの前処理
学習時にデータオーギュメンテーションを行うようにする。
class DataTransform():
def __init__(self, input_size, color_mean):
self.deta_transform = {
"train": Compose([
ConvertFromInts(),
ToAbsoluteCoords(),
PhotometricDistort(),
Expand(color_mean),
RandomSampleCrop(),
RandomMirror(),
ToPercentCoords(),
Resize(input_size),
SubtractMeans(color_mean)
]),
"val": Compose([
ConvertFromInts(),
Resize(input_size),
SubtractMeans(color_mean)
])
}
def __call__(self, img, phase, boxes, labels):
return self.deta_transform[phase](img, boxes, labels)
class VOCDataset(data.Dataset):
def __init__(self, img_list, anno_list, phase, transform, transform_anno):
self.img_list = img_list
self.anno_list = anno_list
self.phase = phase
self.transform = transform
self.transform_anno = transform_anno
def __len__(self):
return len(self.img_list)
def __getitem__(self, index):
im, gt, h, w = self.pull_item(index)
return im, gt
def pull_item(self, index):
image_file_path = self.img_list[index]
img = cv2.imread(image_file_path)
height, width, channels = img.shape
anno_file_path = self.anno_list[index]
anno_list = self.transform_anno(anno_file_path, width, height)
img, boxes, labels = self.transform(img, self.phase, anno_list[:, :4], anno_list[:, 4])
img = torch.from_numpy(img[:, :, (2,1,0)]).permute(2,0,1)
gt = np.hstack((boxes, np.expand_dims(labels, axis=1)))
return img, gt, height, width
color = (104, 117, 123)
input_size = 300
train_dataset = VOCDataset(train_img_list, train_anno_list, phase='train',
transform = DataTransform(input_size, color_mean),
transform_anno = Anno_xml2list(voc_classes))
val_dataset = VOCDataset(val_img_list, val_anno_list, phase='val',
transform = DataTransform(input_size, color_mean),
transform_anno = Anno_xml2list(voc_classes))
Dataloader
アノテーションの情報、変数gtのサイズ(物体数)が画像データごとに異なるため、それに対応する必要がある。
def od_collate_fn(batch):
targets = []
imgs = []
for sample in batch:
imgs.append(sample[0])
targets.append(torch.FloatTensor(sample[1]))
imgs = torch.stack(imgs, dim=0)
return imgs, targets
rootpath = "./data/VOCdevkit/VOC2012/"
train_img_list, train_anno_list, val_img_list, val_anno_list = make_datapath_list(rootpath)
voc_classes = ['aeroplane', 'bicycle', 'bird', 'boat',
'bottle', 'bus', 'car', 'cat', 'chair',
'cow', 'diningtable', 'dog', 'horse',
'motorbike', 'person', 'pottedplant',
'sheep', 'sofa', 'train', 'tvmonitor']
color = (104, 117, 123)
input_size = 300
train_dataset = VOCDataset(train_img_list, train_anno_list, phase='train',
transform = DataTransform(input_size, color_mean),
transform_anno = Anno_xml2list(voc_classes))
val_dataset = VOCDataset(val_img_list, val_anno_list, phase='val',
transform = DataTransform(input_size, color_mean),
transform_anno = Anno_xml2list(voc_classes))
batch_size = 32
train_dataloader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=od_collate_fn)
val_dataloader = data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True, collate_fn=od_collate_fn)
dataloaders_dicts = {'train':train_dataloader, 'val':val_dataloader}
各モジュールの定義
1. VGG
def make_vgg():
layers = []
in_channels = 3
cfg = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'MC',
512, 512, 512, 'M', 512, 512, 512]
for v in cfg:
if v == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
elif v == 'MC':
layers += [nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
layers += [conv2d, nn.ReLU(inplace=True)]
in_channels = v
pool5 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=6, dilation=6)
conv7 = nn.Conv2d(1024, 1024, kernel_size=1)
layers += [pool5, conv6, nn.ReLU(inplace=True),
conv7, nn.ReLU(inplace=True)]
return nn.ModuleList(layers)
2. extra
畳み込みを8回行う。2回畳み込むごとにそれを出力する(合計4つ)。
各特徴マップの大きさが異なり、様々な大きさの物体を想定した特徴量を得ることができる。
特徴量マップの大きい(細かい)ものは、畳み込みの回数が少ないので、SSDでは小さな領域の特徴量抽出・物体検出が苦手であり、
画像内の小さな物体の検出精度が画像内の大きな物体の検出精度よりも低くなる傾向がある。
def make_extras():
layers = []
in_channels = 1024
cfg = [256, 512, 128, 256, 128, 256, 128, 256]
layers += [nn.Conv2d(in_channels, cfg[0], kernel_size=(1))]
layers += [nn.Conv2d(cfg[0], cfg[1], kernel_size=(3), stride=2, padding=1)]
layers += [nn.Conv2d(cfg[1], cfg[2], kernel_size=(1))]
layers += [nn.Conv2d(cfg[2], cfg[3], kernel_size=(3), stride=2, padding=1)]
layers += [nn.Conv2d(cfg[3], cfg[4], kernel_size=(1))]
layers += [nn.Conv2d(cfg[4], cfg[5], kernel_size=(3))]
layers += [nn.Conv2d(cfg[5], cfg[6], kernel_size=(1))]
layers += [nn.Conv2d(cfg[6], cfg[7], kernel_size=(3))]
return nn.ModuleList(layers)
3. loc、conf
vgg、extraで6つの出力が得られたが、
locでは各出力に1回ずつ畳み込み処理を実施し、オフセット情報を出力する。
また、confでは各出力に1回ずつ畳み込み処理を実施し、各クラスの信頼度を出力する。
def make_loc_conf(num_classes=21, bbox_aspect_num=[4,6,6,6,4,4]):
loc_layers = []
conf_layers = []
# VGG22層目 conv4_3(source1)に対する畳み込み
loc_layers += [nn.Conv2d(512, bbox_aspect_num[0] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(512, bbox_aspect_num[0] * num_classes, kernel_size=3, padding=1)]
# VGG最終層(source2)に対する畳み込み
loc_layers += [nn.Conv2d(1024, bbox_aspect_num[1] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(1024, bbox_aspect_num[1] * num_classes, kernel_size=3, padding=1)]
# extraのsource3に対する畳み込み
loc_layers += [nn.Conv2d(512, bbox_aspect_num[2] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(512, bbox_aspect_num[2] * num_classes, kernel_size=3, padding=1)]
# extraのsource4に対する畳み込み
loc_layers += [nn.Conv2d(256, bbox_aspect_num[3] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(256, bbox_aspect_num[3] * num_classes, kernel_size=3, padding=1)]
# extraのsource5に対する畳み込み
loc_layers += [nn.Conv2d(256, bbox_aspect_num[4] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(256, bbox_aspect_num[4] * num_classes, kernel_size=3, padding=1)]
# extraのsource6に対する畳み込み
loc_layers += [nn.Conv2d(256, bbox_aspect_num[5] * 4, kernel_size=3, padding=1)]
conf_layers += [nn.Conv2d(256, bbox_aspect_num[5] * num_classes, kernel_size=3, padding=1)]
return nn.ModuleList(loc_layers), nn.ModuleList(conf_layers)
4. L2Norm
チャネル方向に対して正規化を行う。
今回の場合は、L2Norm層への入力は512チャネル×38×38であるので、38×38=1444のセルについて512チャネルに渡って正規化を行う。
また、正規化した512チャネルのテンソルに対して、チャネルごとに係数を掛け合わせる。この係数も学習させるパラメータである。
class L2Norm(nn.Module):
def __init__(self, input_channels=512, scale=20):
super(L2Norm, self).__init__()
self.weight = nn.Parameter(torch.Tensor(input_channels))
self.scale = scale
self.reset_parameters()
self.eps = 1e-10
def reset_parameters(self):
init.constant_(self.weight, self.scale)
def forward(self, x):
# 正規化
norm = x.pow(2).sum(dim=1, keepdim=True).sqrt()+self.eps
x = torch.div(x, norm)
# 係数をかける
weights = self.weight.unsqueeze(0).unsqueeze(2).unsqueeze(3).expand_as(x)
out = weights * x
return out
デフォルトボックス
それぞれの特徴マップに4または6種類のDBoxを用意する。
4種類の場合は小さい正方形、大きい正方形、1:2または2:1の長方形、そして6種類の場合はこれに加え1:3または3:1の長方形を用意する。
class DBox(object):
def __init__(self, cfg):
super(DBox, self).__init__()
self.image_size = cfg['input_size']
self.feature_maps = cfg['feature_maps']
self.num_priors = cfg['feature_maps']
self.steps = cfg['steps']
self.min_sizes = cfg['min_sizes']
self.max_sizes = cfg['max_sizes']
self.aspect_ratios = cfg['aspect_ratios']
def make_dbox_list(self):
mean = []
for k, f in enumerate(self.feature_maps):
for i, j in product(range(f), repeat=2):
f_k = self.image_size / self.steps[k]
cx = (j + 0.5) / f_k
cy = (i + 0.5) / f_k
# アスペクト比1 小さい正方形
s_k = self.min_sizes[k]/self.image_size
mean += [cx, cy, s_k, s_k]
# アスペクト比1 大きい正方形
s_k_prime = sqrt(s_k * (self.max_sizes[k]/self.image_size))
mean += [cx, cy, s_k_prime, s_k_prime]
for ar in self.aspect_ratios[k]:
mean += [cx, cy, s_k*sqrt(ar), s_k/sqrt(ar)]
mean += [cx, cy, s_k/sqrt(ar), s_k+sqrt(ar)]
output = torch.Tensor(mean).view(-1, 4)
output.clamp_(max=1, min=0)
return output
順伝搬関数の実装
decode関数
DBOX=$(cx_d,cy_d,w_d,h_d)$と、SSDモデルから求めたオフセット情報loc=$(\Delta cx, \Delta cy, \Delta w, \Delta h)$を使用し、
バウンディングボックス(BBox)の座標を作成する関数
BBOXの情報は、
$$
cx=cx_d+0.1\Delta cx×w_d\
cx=cy_d+0.1\Delta cy×h_d\
w=w_d×\exp{(0.2\Delta w)}\
h=h_d×\exp{(0.2\Delta h)}
$$
と計算される。
def decode(loc, dbox_list):
boxes = torch.cat((
dbox_list[:, :2] + loc[:, :2] * 0.1 * dbox_list[:, 2:],
dbox_list[:, 2:] * torch.exp(loc[:, 2:] * 0.2)), dim=1)
# [cx ,cy, width, height] -> [xmin, ymin, xmax, ymax]
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
Non-Maximum Suppression
1つの物体に対して、1つのBBoxのみを残す処理を行う処理。
BBox同士の被っている面積が閾値以上である場合には、同じ物体への冗長なBBoxと判定する。
そして、確信度が一番大きなBBoxのみを残す処理を行う。
def nm_supression(boxes, scores, overlap=0.45, top_k=200):
count = 0
keep = scores.new(scores.size(0)).zero_().long()
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
area = torch.mul(x2 - x1, y2 - y1)
tmp_x1 = boxes.new()
tmp_y1 = boxes.new()
tmp_x2 = boxes.new()
tmp_y2 = boxes.new()
tmp_w = boxes.new()
tmp_h = boxes.new()
v, idx = scores.sort(0)
idx = idx[-top_k:]
while idx.numel() > 0:
i = idx[-1]
keep[count] = i
count += 1
if idx.size(0) == 1:
break
idx = idx[:-1]
# keepに格納したBBoxと被りの大きいBBoxを抽出し除去する
torch.index_select(x1, 0, idx, out=tmp_x1)
torch.index_select(y1, 0, idx, out=tmp_y1)
torch.index_select(x2, 0, idx, out=tmp_x2)
torch.index_select(y2, 0, idx, out=tmp_y2)
tmp_x1 = torch.clamp(tmp_x1, min=x1[i])
tmp_y1 = torch.clamp(tmp_y1, min=y1[i])
tmp_x2 = torch.clamp(tmp_x2, min=x2[i])
tmp_y2 = torch.clamp(tmp_y2, min=y2[i])
tmp_w.resize_as_(tmp_x2)
tmp_h.resize_as_(tmp_y2)
tmp_w = tmp_x2 - tmp_x1
tmp_h = tmp_y2 - tmp_y1
tmp_w = torch.clamp(tmp_w, min=0.0)
tmp_h = torch.clamp(tmp_h, min=0.0)
inter = tmp_w*tmp_h
rem_areas = torch.index_select(area, 0, idx)
union = (rem_areas - inter) + area[i]
IoU = inter / union
idx = idx[IoU.le(overlap)]
return keep, count
クラスDetect
class Detect(Function):
def __init__(self, conf_thresh=0.01, top_k=200, nms_thresh=0.45):
self.softmax = nn.Softmax(dim=-1)
self.conf_thresh = conf_thresh
self.top_k = top_k
self.nms_thresh = nms_thresh
def forward(self, loc_data, conf_data, dbox_list):
num_batch = loc_data.size(0)
num_dbox = loc_data.size(1)
num_classes = conf_data.size(2)
conf_data = self.softmax(conf_data)
output = torch.zeros(num_batch, num_classes, self.top_k, 5)
conf_preds = conf_data.transpose(2, 1)
for i in range(num_batch):
decoded_boxes = decode(loc_data[i], dbox_list)
conf_scores = conf_preds[i].clone()
for cl in range(1, num_classes):
c_mask = conf_scores[cl].gt(self.conf_thresh)
scores = conf_scores[cl][c_mask]
if scores.nelement() == 0:
continue
l_mask = c_mask.unsqueeze(1).expand_as(decoded_boxes)
boxes = decoded_boxes[l_mask].view(-1, 4)
ids, count = nm_suppression(boxes, scores, self.nms_thresh, self.top_k)
output[i, cl, :count] = torch.cat((scores[ids[:count]].unsqueeze(1), boxes[ids[:count]]), 1)
return output
クラスSSD
# SSDクラスを作成する
class SSD(nn.Module):
def __init__(self, phase, cfg):
super(SSD, self).__init__()
self.phase = phase # train or inferenceを指定
self.num_classes = cfg["num_classes"] # クラス数=21
# SSDのネットワークを作る
self.vgg = make_vgg()
self.extras = make_extras()
self.L2Norm = L2Norm()
self.loc, self.conf = make_loc_conf(
cfg["num_classes"], cfg["bbox_aspect_num"])
# DBox作成
dbox = DBox(cfg)
self.dbox_list = dbox.make_dbox_list()
# 推論時はクラス「Detect」を用意します
if phase == 'inference':
self.detect = Detect()
def forward(self, x):
sources = list() # locとconfへの入力source1~6を格納
loc = list() # locの出力を格納
conf = list() # confの出力を格納
# vggのconv4_3まで計算する
for k in range(23):
x = self.vgg[k](x)
# conv4_3の出力をL2Normに入力し、source1を作成、sourcesに追加
source1 = self.L2Norm(x)
sources.append(source1)
# vggを最後まで計算し、source2を作成、sourcesに追加
for k in range(23, len(self.vgg)):
x = self.vgg[k](x)
sources.append(x)
# extrasのconvとReLUを計算
# source3~6を、sourcesに追加
for k, v in enumerate(self.extras):
x = F.relu(v(x), inplace=True)
if k % 2 == 1: # conv→ReLU→cov→ReLUをしたらsourceに入れる
sources.append(x)
# source1~6に、それぞれ対応する畳み込みを1回ずつ適用する
# zipでforループの複数のリストの要素を取得
# source1~6まであるので、6回ループが回る
for (x, l, c) in zip(sources, self.loc, self.conf):
# Permuteは要素の順番を入れ替え
loc.append(l(x).permute(0, 2, 3, 1).contiguous())
conf.append(c(x).permute(0, 2, 3, 1).contiguous())
# l(x)とc(x)で畳み込みを実行
# l(x)とc(x)の出力サイズは[batch_num, 4*アスペクト比の種類数, featuremapの高さ, featuremap幅]
# sourceによって、アスペクト比の種類数が異なり、面倒なので順番入れ替えて整える
# permuteで要素の順番を入れ替え、
# [minibatch数, featuremap数, featuremap数,4*アスペクト比の種類数]へ
# (注釈)
# torch.contiguous()はメモリ上で要素を連続的に配置し直す命令です。
# あとでview関数を使用します。
# このviewを行うためには、対象の変数がメモリ上で連続配置されている必要があります。
# さらにlocとconfの形を変形
# locのサイズは、torch.Size([batch_num, 34928])
# confのサイズはtorch.Size([batch_num, 183372])になる
loc = torch.cat([o.view(o.size(0), -1) for o in loc], 1)
conf = torch.cat([o.view(o.size(0), -1) for o in conf], 1)
# さらにlocとconfの形を整える
# locのサイズは、torch.Size([batch_num, 8732, 4])
# confのサイズは、torch.Size([batch_num, 8732, 21])
loc = loc.view(loc.size(0), -1, 4)
conf = conf.view(conf.size(0), -1, self.num_classes)
# 最後に出力する
output = (loc, conf, self.dbox_list)
if self.phase == "inference": # 推論時
# クラス「Detect」のforwardを実行
# 返り値のサイズは torch.Size([batch_num, 21, 200, 5])
return self.detect(output[0], output[1], output[2])
else: # 学習時
return output
# 返り値は(loc, conf, dbox_list)のタプル
損失関数
class MultiBoxLoss(nn.Module):
"""SSDの損失関数のクラスです。"""
def __init__(self, jaccard_thresh=0.5, neg_pos=3, device='cpu'):
super(MultiBoxLoss, self).__init__()
self.jaccard_thresh = jaccard_thresh # 0.5 関数matchのjaccard係数の閾値
self.negpos_ratio = neg_pos # 3:1 Hard Negative Miningの負と正の比率
self.device = device # CPUとGPUのいずれで計算するのか
def forward(self, predictions, targets):
"""
損失関数の計算。
Parameters
----------
predictions : SSD netの訓練時の出力(tuple)
(loc=torch.Size([num_batch, 8732, 4]), conf=torch.Size([num_batch, 8732, 21]), dbox_list=torch.Size [8732,4])。
targets : [num_batch, num_objs, 5]
5は正解のアノテーション情報[xmin, ymin, xmax, ymax, label_ind]を示す
Returns
-------
loss_l : テンソル
locの損失の値
loss_c : テンソル
confの損失の値
"""
# SSDモデルの出力がタプルになっているので、個々にばらす
loc_data, conf_data, dbox_list = predictions
# 要素数を把握
num_batch = loc_data.size(0) # ミニバッチのサイズ
num_dbox = loc_data.size(1) # DBoxの数 = 8732
num_classes = conf_data.size(2) # クラス数 = 21
# 損失の計算に使用するものを格納する変数を作成
# conf_t_label:各DBoxに一番近い正解のBBoxのラベルを格納させる
# loc_t:各DBoxに一番近い正解のBBoxの位置情報を格納させる
conf_t_label = torch.LongTensor(num_batch, num_dbox).to(self.device)
loc_t = torch.Tensor(num_batch, num_dbox, 4).to(self.device)
# loc_tとconf_t_labelに、
# DBoxと正解アノテーションtargetsをmatchさせた結果を上書きする
for idx in range(num_batch): # ミニバッチでループ
# 現在のミニバッチの正解アノテーションのBBoxとラベルを取得
truths = targets[idx][:, :-1].to(self.device) # BBox
# ラベル [物体1のラベル, 物体2のラベル, …]
labels = targets[idx][:, -1].to(self.device)
# デフォルトボックスを新たな変数で用意
dbox = dbox_list.to(self.device)
# 関数matchを実行し、loc_tとconf_t_labelの内容を更新する
# (詳細)
# loc_t:各DBoxに一番近い正解のBBoxの位置情報が上書きされる
# conf_t_label:各DBoxに一番近いBBoxのラベルが上書きされる
# ただし、一番近いBBoxとのjaccard overlapが0.5より小さい場合は
# 正解BBoxのラベルconf_t_labelは背景クラスの0とする
variance = [0.1, 0.2]
# このvarianceはDBoxからBBoxに補正計算する際に使用する式の係数です
match(self.jaccard_thresh, truths, dbox,
variance, labels, loc_t, conf_t_label, idx)
# ----------
# 位置の損失:loss_lを計算
# Smooth L1関数で損失を計算する。ただし、物体を発見したDBoxのオフセットのみを計算する
# ----------
# 物体を検出したBBoxを取り出すマスクを作成
pos_mask = conf_t_label > 0 # torch.Size([num_batch, 8732])
# pos_maskをloc_dataのサイズに変形
pos_idx = pos_mask.unsqueeze(pos_mask.dim()).expand_as(loc_data)
# Positive DBoxのloc_dataと、教師データloc_tを取得
loc_p = loc_data[pos_idx].view(-1, 4)
loc_t = loc_t[pos_idx].view(-1, 4)
# 物体を発見したPositive DBoxのオフセット情報loc_tの損失(誤差)を計算
loss_l = F.smooth_l1_loss(loc_p, loc_t, reduction='sum')
# ----------
# クラス予測の損失:loss_cを計算
# 交差エントロピー誤差関数で損失を計算する。ただし、背景クラスが正解であるDBoxが圧倒的に多いので、
# Hard Negative Miningを実施し、物体発見DBoxと背景クラスDBoxの比が1:3になるようにする。
# そこで背景クラスDBoxと予想したもののうち、損失が小さいものは、クラス予測の損失から除く
# ----------
batch_conf = conf_data.view(-1, num_classes)
# クラス予測の損失を関数を計算(reduction='none'にして、和をとらず、次元をつぶさない)
loss_c = F.cross_entropy(
batch_conf, conf_t_label.view(-1), reduction='none')
# -----------------
# これからNegative DBoxのうち、Hard Negative Miningで抽出するものを求めるマスクを作成します
# -----------------
# 物体発見したPositive DBoxの損失を0にする
# (注意)物体はlabelが1以上になっている。ラベル0は背景。
num_pos = pos_mask.long().sum(1, keepdim=True) # ミニバッチごとの物体クラス予測の数
loss_c = loss_c.view(num_batch, -1) # torch.Size([num_batch, 8732])
loss_c[pos_mask] = 0 # 物体を発見したDBoxは損失0とする
# Hard Negative Miningを実施する
# 各DBoxの損失の大きさloss_cの順位であるidx_rankを求める
_, loss_idx = loss_c.sort(1, descending=True)
_, idx_rank = loss_idx.sort(1)
# (注釈)
# 実装コードがかなり特殊で直感的ではないです。
# 上記2行は、要は各DBoxに対して、損失の大きさが何番目なのかの情報を
# 変数idx_rankとして高速に取得したいというコードです。
#
# DBOXの損失値の大きい方から降順に並べ、DBoxの降順のindexをloss_idxに格納。
# 損失の大きさloss_cの順位であるidx_rankを求める。
# ここで、
# 降順になった配列indexであるloss_idxを、0から8732まで昇順に並べ直すためには、
# 何番目のloss_idxのインデックスをとってきたら良いのかを示すのが、idx_rankである。
# 例えば、
# idx_rankの要素0番目 = idx_rank[0]を求めるには、loss_idxの値が0の要素、
# つまりloss_idx[?}=0 の、?は何番かを求めることになる。ここで、? = idx_rank[0]である。
# いま、loss_idx[?]=0の0は、元のloss_cの要素の0番目という意味である。
# つまり?は、元のloss_cの要素0番目は、降順に並び替えられたloss_idxの何番目ですか
# を求めていることになり、 結果、
# ? = idx_rank[0] はloss_cの要素0番目が、降順の何番目かを示すことになる。
# 背景のDBoxの数num_negを決める。HardNegative Miningにより、
# 物体発見のDBoxの数num_posの3倍(self.negpos_ratio倍)とする。
# ただし、万が一、DBoxの数を超える場合は、DBoxの数を上限とする
num_neg = torch.clamp(num_pos*self.negpos_ratio, max=num_dbox)
# idx_rankは各DBoxの損失の大きさが上から何番目なのかが入っている
# 背景のDBoxの数num_negよりも、順位が低い(すなわち損失が大きい)DBoxを取るマスク作成
# torch.Size([num_batch, 8732])
neg_mask = idx_rank < (num_neg).expand_as(idx_rank)
# -----------------
# (終了)これからNegative DBoxのうち、Hard Negative Miningで抽出するものを求めるマスクを作成します
# -----------------
# マスクの形を整形し、conf_dataに合わせる
# pos_idx_maskはPositive DBoxのconfを取り出すマスクです
# neg_idx_maskはHard Negative Miningで抽出したNegative DBoxのconfを取り出すマスクです
# pos_mask:torch.Size([num_batch, 8732])→pos_idx_mask:torch.Size([num_batch, 8732, 21])
pos_idx_mask = pos_mask.unsqueeze(2).expand_as(conf_data)
neg_idx_mask = neg_mask.unsqueeze(2).expand_as(conf_data)
# conf_dataからposとnegだけを取り出してconf_hnmにする。形はtorch.Size([num_pos+num_neg, 21])
conf_hnm = conf_data[(pos_idx_mask+neg_idx_mask).gt(0)
].view(-1, num_classes)
# (注釈)gtは greater than (>)の略称。これでmaskが1のindexを取り出す。
# pos_idx_mask+neg_idx_maskは足し算だが、indexへのmaskをまとめているだけである。
# つまり、posであろうがnegであろうが、マスクが1のものを足し算で一つのリストにし、それをgtで取得
# 同様に教師データであるconf_t_labelからposとnegだけを取り出してconf_t_label_hnmに
# 形はtorch.Size([pos+neg])になる
conf_t_label_hnm = conf_t_label[(pos_mask+neg_mask).gt(0)]
# confidenceの損失関数を計算(要素の合計=sumを求める)
loss_c = F.cross_entropy(conf_hnm, conf_t_label_hnm, reduction='sum')
# 物体を発見したBBoxの数N(全ミニバッチの合計)で損失を割り算
N = num_pos.sum()
loss_l /= N
loss_c /= N
return loss_l, loss_c
モデルの作成
ssd_cfg = {
'num_classes': 21,
'input_size': 300,
'bbox_aspect_num': [4,6,6,6,4,4],
'feature_maps': [38,19,10,5,3,1],
'steps': [8,16,32,64,100,300],
'min_sizes': [30,60,111,162,213,264],
'max_sizes': [60,111,162,213,264,315],
'aspect_ratios': [[2],[2,3],[2,3],[2,3],[2],[2]]
}
net = SSD(phase='train', cfg=ssd_cfg)
vgg_weights = torch.load('./weights/vgg16_reducedfc.pth')
net.vgg.load_state_dict(vgg_weights)
def weights_init(m):
if isinstance(m, nn.Conv2d):
init.kaiming_normal_(m.weight.data)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
net.extras.apply(weights_init)
net.loc.apply(weights_init)
net.conf.apply(weights_init)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("使用デバイス:", device)
import torch.optim as optim
criterion = MultiBoxLoss(jaccard_thresh=0.5, neg_pos=3, device=device)
optimizer = optim.SGD(net.parameters(), lr=1e-3, momentum=0.9, weight_decay=5e-4)
学習
import time
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("使用デバイス:", device)
net.to(device)
torch.backends.cudnn.benchmark = True
iteration = 1
epoch_train_loss = 0.0
epoch_val_loss = 0.0
logs = []
for epoch in range(num_epochs+1):
t_epoch_start = time.time()
t_iter_start = time.time()
print('-'*20)
print('Epoch {}/{}'.format(epoch+1, num_epochs))
print('-'*20)
for phase in ['train', 'val']:
if phase == 'train':
net.train()
print(' (train) ')
else:
if((epoch+1)%10==0):
net.eval()
print('-'*20)
print(' (val) ')
else:
continue
for images, targets in dataloaders_dict[phase]:
images = images.to(device)
targets = [ann.to(device) for ann in targets]
optimizer.zero_grad()
with torch.set_grad_enabled(phase == 'train'):
outputs = net(images)
loss_l, loss_c = criterion(outputs, targets)
loss = loss_l + loss_c
if phase == 'train':
loss.backward()
nn.utils.clip_grad_value_(net.parameters(), clip_value=2.0)
optimizer.step()
if (iteration % 10 == 0):
t_iter_finish = time.time()
duration = t_iter_finish - t_iter_start
print('イテレーション {} || Loss: {:.4f} || 10iter: {:.4f} sec.'.format(
iteration, loss.item(), duration))
t_iter_start = time.time()
iteration += 1
else:
epoch_val_loss += loss.item()
t_epoch_finish = time.time()
print('-'*20)
print('epoch {} || Epoch_TRAIN_Loss:{:.4f} ||Epoch_VAL_Loss:{:.4f}'.format(
epoch+1, epoch_train_loss, epoch_val_loss))
print('timer: {:.4f} sec.'.format(t_epoch_finish - t_epoch_start))
t_epoch_start = time.time()
log_epoch = {'epoch':epoch+1,
'train_loss': epoch_train_loss, 'val_loss': epoch_val_loss}
logs.append(log_epoch)
df = pd.DataFrame(logs)
df.to_csv('log_output.csv')
epoch_train_loss = 0.0
epoch_val_loss = 0.0
if ((epoch+1) % 10 == 0):
torch.save(net.state_dict(), 'weights/ssd300_'+str(epoch+1)+'.pth')
num_epochs = 2
train_model(net, dataloaders_dicts, criterion, optimizer, num_epochs=num_epochs)