JDLA E資格の受験用に、深層学習day4を勉強した備忘録
#Section1:強化学習
##強化学習とは
長期的に報酬を最大化できるように環境のなかで行動を選択できるエージェントを
作ることを目標とする機械学習の一分野
⇒行動の結果として与えられる利益(報酬)をもとに、行動を決定する原理を改善していく仕組み
##強化学習の応用例
<マーケティングの場合>
環境:会社の販売促進部エージェント:プロフィールと購入履歴に基づいて、キャンペーンメールを送る顧客を決めるソフトウェア
行動:顧客ごとに送信、非送信のふたつの行動を選ぶ
報酬:キャンペーンのコストという負の報酬とキャンペーンで生み出されると推測される売上という正の報酬を受ける
##探索と利用のトレードオフ
環境について事前に完璧な知識があれば、最適な行動を予測し決定することは可能
・過去のデータで、ベストとされる行動のみを常に取り続ければ他にもっとベストな行動を見つけることはできない。
・未知の行動のみを常に取り続ければ、過去の経験が活かせない。
上記の二つがトレードオフの関係となる
##強化学習の差分
強化学習と通常の教師あり、教師なし学習との違い
結論:目標が違う・教師なし、あり学習では、データに含まれるパターンを見つけ出す
およびそのデータから予測することが目標・強化学習では、優れた方策を見つけることが目標
##強化学習の歴史
<強化学習について>
冬の時代があったが、計算速度の進展により大規模な状態をもつ場合の、強化学習を可能としつつある。
・関数近似法と、Q学習を組み合わせる手法の登場
<Q学習>
・行動価値関数を、行動する毎に更新することにより学習を進める方法
<関数近似法>
・価値関数や方策関数を関数近似する手法のこと
##価値関数
価値関数とは
価値を表す関数としては、状態価値関数と行動価値関数の2種類がある
・ある状態の価値に注目する場合は、状態価値関数
・状態と価値を組み合わせた価値に注目する場合は、行動価値関数
##方策関数
方策ベースの強化学習手法において、ある状態でどのような行動を採るのかの確率を与える関数
##方策勾配法について
方策をモデルにすることで最適化(学習)する手法
\theta^{(t+1)}=\theta^{(t)}+\epsilon\nabla J(\theta)
$J$とは、方策の良さで、定義しなければならない
定義方法は、以下の2つ
・平均報酬
・割引報酬和
上記の定義に対応して、行動価値関数$Qπ(s,a)$の定義を行い、方策勾配定理が成り立つ
\begin{eqnarray} \nabla_{\theta}J(\theta)&=&\nabla_{\theta}\sum_{a\in A}{\pi_{\theta}(a|s)Q^{\pi}(s,a)} \\ \nabla_{\theta}J(\theta)&=&\mathbb{E}_{\pi_{\theta}}[(\nabla_{\theta}log\pi_{\theta}(a|s)Q^{\pi}(s,a))] \end{eqnarray}
#Section2:Alpha Go
AlphaGoの学習は以下のステップで行われる
1.教師あり学習によるRollOutPolicyとPolicyNetの学習
2.強化学習によるPolicyNetの学習(方策関数の学習)
3.強化学習によるValueNetの学習(価値関数の学習)
PolicyNet、ValueNetは、いずれも畳み込みニューラルネットワーク
<PolicyNetの教師あり学習>
KGS Go Server(ネット囲碁対局サイト)の棋譜データから3000万局面分の教師を用意し、教師と同じ着手を予測できるよう学習を行った。具体的には、教師が着手した手を1とし残りを0とした19×19次元の配列を教師とし、それを分類問題として学習した。この学習で作成したPolicyNetは57%ほどの精度である
<PolicyNetの強化学習>
現状のPolicyNetとPolicyPoolからランダムに選択されたPolicyNetと対局シミュレーションを行い、その結果を用いて方策勾配法で学習を行った。PolicyPoolとは、PolicyNetの強化学習の過程を500Iteraionごとに記録し保存しておいたものである。現状のPolicyNet同士の対局ではなく、PolicyPoolに保存されているものとの対局を使用する理由は、対局に幅を持たせて過学習を防ごうというのが主である。この学習をminibatch size 128で1万回行った
<ValueNetの学習>
PolicyNetを使用して対局シミュレーションを行い、その結果の勝敗を教師として学習した。教師データ作成の手順は1、まずSL PolicyNet(教師あり学習で作成したPolicyNet)でN手まで打つ。2、N+1手目の手をランダムに選択し、その手で進めた局面をS(N+1)とする。3、S(N+1)からRLPolicyNet(強化学習で作成したPolicyNet)で終局まで打ち、その勝敗報酬をRとする。S(N+1)とRを教師データ対とし、損失関数を平均二乗誤差とし、回帰問題として学習した。この学習をminibatch size 32で5000万回行ったN手までとN+1手からのPolicyNetを別々にしてある理由は、過学習を防ぐためであると論文では説明されている
<モンテカルロ木探索>
コンピュータ囲碁ソフトでは現在もっとも有効とされている探索法。
モンテカルロ木探索はこの木の成長を行うことによって、一定条件下において探索結果は
最善手を返すということが理論的に証明されている。
<AlphaGoZeroの違い>
1.教師あり学習を一切行わず、強化学習のみで作成
2.特徴入力からヒューリスティックな要素を排除し、石の配置のみにした
3.PolicyNetとValueNetを1つのネットワークに統合した
4.Residual Net(後述)を導入した
5.モンテカルロ木探索からRollOutシミュレーションをなくした
#Section3:軽量化・高速化技術
##分散深層学習
・深層学習は多くのデータを使用したり、パラメータ調整のために多くの時間を使用したりするため、高速な計算が求められる。
・複数の計算資源(ワーカー)を使用し、並列的にニューラルネットを構成することで、効率の良い学習を行いたい。
・データ並列化、モデル並列化、GPUによる高速技術は不可欠である。
##高速化
<データ並列化>
・親モデルを各ワーカーに子モデルとしてコピー
・データを分割し、各ワーカーごとに計算させる
<データ並列化: 同期型>
・データ並列化は各モデルのパラメータの合わせ方で、同期型か非同期型か決まる。
<同期型と非同期型の比較>
・処理のスピードは、お互いのワーカーの計算を待たない非同期型の方が早い。
・非同期型は最新のモデルのパラメータを利用できないので、学習が不安定になりやすい。-> Stale Gradient Problem
・現在は同期型の方が精度が良いことが多いので、主流となっている。
<モデル並列化>
・親モデルを各ワーカーに分割し、それぞれのモデルを学習させる。全てのデータで学習が終わった後で、一つのモデルに復元。
・モデルが大きい時はモデル並列化を、データが大きい時はデータ並列化をすると良い
・モデルのパラメータ数が多いほど、スピードアップの効率も向上する
##GPU
GPUによる高速化
・GPGPU (General-purpose on GPU)
⇒元々の使用目的であるグラフィック以外の用途で使用されるGPUの総称
<CPUとGPUの違い>
・CPU
⇒高性能なコアが少数
⇒複雑で連続的な処理が得意
・GPU
⇒比較的低性能なコアが多数
⇒簡単な並列処理が得意
⇒ニューラルネットの学習は単純な行列演算が多いので、高速化が可能
<CUDA>
・GPU上で並列コンピューティングを行うためのプラットフォーム
・NVIDIA社が開発しているGPUのみで使用可能。
・Deep Learning用に提供されているので、使いやすい
<OpenCL>
・オープンな並列コンピューティングのプラットフォーム
・NVIDIA社以外の会社(Intel, AMD, ARMなど)のGPUからでも使用可能。
・Deep Learning用の計算に特化しているわけではない。
※Deep Learningフレームワーク(Tensorflow, Pytorch)内で実装されているので、
使用する際は指定すれば良い
##軽量化
<量子化>
ネットワークが大きくなると大量のパラメータが必要なり学習や推論に多くのメモリと演算処理が必要
通常のパラメータの64 bit 浮動小数点を32 bit など下位の精度に落とすことでメモリと演算処理の削減を行う
量子化の利点と欠点
・利点⇒計算の高速化、省メモリ化
・欠点⇒精度の低下
※bit数を減らすと浮動小数点数で表現できる小数点数の精度が落ちます
⇒ただし、実際の問題では倍精度を単精度にしてもほぼ精度は変わらない
<蒸留(Distillation)>
精度の高いモデルはニューロンの規模が大きくなっているため推論に多くのメモリと演算処理が必要
そのため、規模の大きなモデルの知識を使い、軽量なモデルの作成を行います。これが蒸留
蒸留は教師モデルと生徒モデルの2つで構成される
<教師モデル>
予測精度の高い、複雑なモデルやアンサンブルされたモデル
<生徒モデル>
教師モデルをもとに作られる軽量なモデル
<プルーニング(Pruning)>
ネットワークが大きくなると大量のパラメータになりますが、すべてのニューロンが計算の精度に寄与しているわけではありません。
モデルの精度に寄与が少ないニューロンを削減することで、モデルの軽量化、高速化が見込まれます。
<まとめ>
量子化:重みの精度を下げることにより計算の高速化と省メモリ化を行う技術
蒸留:複雑で精度の良い教師モデルから軽量な生徒モデルを効率よく学習を行う技術
プルーニング:寄与の少ないニューロンをモデルから削減し高速化と省メモリ化を行う技術
#Section4:応用技術
##MobileNet
・ディープラーニングモデルは精度は良いが、その分ネットワークが深くなり計算量が増える。
・計算量が増えると、多くの計算リソースが必要で、お金がかかってしまう。
・ディープラーニングモデルの軽量化・高速化・高精度化を実現(その名の通りモバイルなネットワーク)
⇒MobileNetはDepthwise ConvolutionとPointwise Convolutionの組み合わせで軽量化を実現
##DenseNet
・Dense Convolutional Network(以下、DenseNet)は、畳込みニューラルネットワーク(以下、CNN)アーキテクチャの一種である。
ニューラルネットワークでは層が深くなるにつれて、学習が難しくなるという問題があったが、Residual Network(以下、ResNet)などのCNNアーキテクチャでは前方の層から後方の層へアイデンティティ接続を介してパスを作ることで問題を対処した。
DenseBlockと呼ばれるモジュールを用いた、DenseNetもそのようなアーキテクチャの一つである。
<Dense Block>
Dense Blockは複数の層で構成されており、各層を通り抜けるとチャンネル(前の層で処理された内容)が増えていきます。
ResNetのように前の層からのスキップコネクションがあります。
ResNetでは前1層の入力のみ使われていましたが、Dense Blockでは前の各層全てが使われます。
特徴マップの入力に対してバッチ正規化、ReLU関数による変換、3×3による畳み込みを行います
<Transition Layer>
Dense Blockをつなぐ層のことです。
Dense Blockでチャンネル数が増えていくため、ダウンサンプリングしていきます。
これによって特徴量を抽出しつつ、元のチャンネル数に戻します。
<DenseNetとResNetの違い>
・DenseBlockでは前方の各層からの出力全てが後方の層への入力として用いられる
・RessidualBlockでは前1層の入力のみ後方の層へ入力
<成長率>
・DenseNet内で使用されるDenseBlockと呼ばれるモジュールでは成⻑率(Growth Rate)と呼ばれるハイパーパラメータが存在する。
・DenseBlock内の各ブロック毎にk個ずつ特徴マップのチャネル数が増加していく時、kを成⻑率と呼ぶ
<正規化>
●Batch Norm:ミニバッチに含まれるsampleの同一チャネルが同一分布に従うよう正規化
●Layer Norm:それぞれのsampleの全てのpixelsが同一分布に従うよう正規化
●Instance Nrom:さらにchannelも同一分布に従うよう正規化
<WaveNet>
生の音声波形を生成する深層学習モデル
Pixel CNN(高解像度の画像を精密に生成できる手法)を音声に応用したもの
時系列データに対して畳み込み(Dilated convolution)を適用する
層が深くなるにつれて畳み込むリンクを離すDilated causal convolutionを提案した。
(い)⇒HxWxCxKxK
(う)⇒HxWxCxM
⇒答え:Dilated causal convolution
⇒答え:パラメータ数に対する受容野が広い
#Section5:Transformer
##Seq2Seqとは
系列(Sequence)を入力として、系列を出力するもの
・Encoder-Decoderモデルとも呼ばれる
・入力系列がEncode(内部状態に変換)され、内部状態からDecode(系列に変換)する
・ 実応用上も、入力・出力共に系列情報なものは多い
例)
翻訳 (英語→日本語)
音声認識 (波形→テキスト)
チャットボット (テキスト→テキスト)
RNN x 言語モデル
・RNNは系列情報を内部状態に変換することができる
・文章の各単語が現れる際の同時確率は、事後確率で分解できる
・したがって、事後確率を求めることがRNNの目標になる
・言語モデルを再現するようにRNNの重みが学習されていれば、ある時点の次の単語を予測することができる
・先頭単語を与えれば文章を生成することも可能
##Transformerとは
ニューラル機械翻訳の問題点
長さに弱い
RNNを使用せず、Attentionのみ使用しているモデルです。
機械翻訳のみならず、様々な自然言語処理で使用されています。
Decoderは次の単語を予測しますが、RNNを使っていないため、系列全ての単語が一度に与えられます。
そのため、未来の単語が見えないようになり、Decoderでは未来の単語をマスクします。
Source Target-Attentionに加えてSelf-Attentionも用いられ、正規化としてLayer Normalizationが用いられます。
##BERT(Bidirectional Encoder Representations from Transformers)
Googleによって開発された自然言語処理(NLP)の事前学習用のためのTransformerのEncoderを使った機械学習手法である。
BERTの詳細説明は、省略。
以下は、Seq2seqとTransformerの実装演習
<実装:Seq2Seq>
英語-日本語の対訳コーパスである、Tanaka Corpusを使用
フレームワークはPytorchを使用
def load_data(file_path):
# テキストファイルからデータを読み込むメソッド
data = []
for line in open(file_path, encoding='utf-8'):
words = line.strip().split() # スペースで単語を分割
data.append(words)
return data
train_X = load_data('./data/train.en')
train_Y = load_data('./data/train.ja')
# 訓練データと検証データに分割
train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
・単語辞書の作成
# まず特殊トークンを定義しておく
PAD_TOKEN = '<PAD>' # バッチ処理の際に、短い系列の末尾を埋めるために使う (Padding)
BOS_TOKEN = '<S>' # 系列の始まりを表す (Beggining of sentence)
EOS_TOKEN = '</S>' # 系列の終わりを表す (End of sentence)
UNK_TOKEN = '<UNK>' # 語彙に存在しない単語を表す (Unknown)
PAD = 0
BOS = 1
EOS = 2
UNK = 3
MIN_COUNT = 2 # 語彙に含める単語の最低出現回数 再提出現回数に満たない単語はUNKに置き換えられる
# 単語をIDに変換する辞書の初期値を設定
word2id = {
PAD_TOKEN: PAD,
BOS_TOKEN: BOS,
EOS_TOKEN: EOS,
UNK_TOKEN: UNK,
}
# 単語辞書を作成
vocab_X = Vocab(word2id=word2id)
vocab_Y = Vocab(word2id=word2id)
vocab_X.build_vocab(train_X, min_count=MIN_COUNT)
vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT)
vocab_size_X = len(vocab_X.id2word)
vocab_size_Y = len(vocab_Y.id2word)
print('入力言語の語彙数:', vocab_size_X)
print('出力言語の語彙数:', vocab_size_Y)
入力言語の語彙数: 3725
出力言語の語彙数: 4405
・テンソルへの変換
def sentence_to_ids(vocab, sentence):
# 単語(str)のリストをID(int)のリストに変換する関数
ids = [vocab.word2id.get(word, UNK) for word in sentence]
ids += [EOS] # EOSを加える
return ids
train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X]
train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y]
valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X]
valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
def pad_seq(seq, max_length):
# 系列(seq)が指定の文長(max_length)になるように末尾をパディングする
res = seq + [PAD for i in range(max_length - len(seq))]
return res
class DataLoader(object):
def __init__(self, X, Y, batch_size, shuffle=False):
"""
:param X: list, 入力言語の文章(単語IDのリスト)のリスト
:param Y: list, 出力言語の文章(単語IDのリスト)のリスト
:param batch_size: int, バッチサイズ
:param shuffle: bool, サンプルの順番をシャッフルするか否か
"""
self.data = list(zip(X, Y))
self.batch_size = batch_size
self.shuffle = shuffle
self.start_index = 0
self.reset()
def reset(self):
if self.shuffle: # サンプルの順番をシャッフルする
self.data = shuffle(self.data, random_state=random_state)
self.start_index = 0 # ポインタの位置を初期化する
def __iter__(self):
return self
def __next__(self):
# ポインタが最後まで到達したら初期化する
if self.start_index >= len(self.data):
self.reset()
raise StopIteration()
# バッチを取得
seqs_X, seqs_Y = zip(*self.data[self.start_index:self.start_index+self.batch_size])
# 入力系列seqs_Xの文章の長さ順(降順)に系列ペアをソートする
seq_pairs = sorted(zip(seqs_X, seqs_Y), key=lambda p: len(p[0]), reverse=True)
seqs_X, seqs_Y = zip(*seq_pairs)
# 短い系列の末尾をパディングする
lengths_X = [len(s) for s in seqs_X] # 後述のEncoderのpack_padded_sequenceでも用いる
lengths_Y = [len(s) for s in seqs_Y]
max_length_X = max(lengths_X)
max_length_Y = max(lengths_Y)
padded_X = [pad_seq(s, max_length_X) for s in seqs_X]
padded_Y = [pad_seq(s, max_length_Y) for s in seqs_Y]
# tensorに変換し、転置する
batch_X = torch.tensor(padded_X, dtype=torch.long, device=device).transpose(0, 1)
batch_Y = torch.tensor(padded_Y, dtype=torch.long, device=device).transpose(0, 1)
# ポインタを更新する
self.start_index += self.batch_size
return batch_X, batch_Y, lengths_X
・モデルの構築(出力は省略)
# 系列長がそれぞれ4,3,2の3つのサンプルからなるバッチを作成
batch = [[1,2,3,4], [5,6,7], [8,9]]
lengths = [len(sample) for sample in batch]
print('各サンプルの系列長:', lengths)
print()
# 最大系列長に合うように各サンプルをpadding
_max_length = max(lengths)
padded = torch.tensor([pad_seq(sample, _max_length) for sample in batch])
print('paddingされたテンソル:\n', padded)
padded = padded.transpose(0,1) # (max_length, batch_size)に転置
print('padding & 転置されたテンソル:\n', padded)
print('padding & 転置されたテンソルのサイズ:\n', padded.size())
print()
# PackedSequenceに変換(テンソルをRNNに入力する前に適用する)
packed = pack_padded_sequence(padded, lengths=lengths) # 各サンプルの系列長も与える
print('PackedSequenceのインスタンス:\n', packed) # テンソルのPAD以外の値(data)と各時刻で計算が必要な(=PADに到達していない)バッチの数(batch_sizes)を有するインスタンス
print()
# PackedSequenceのインスタンスをRNNに入力する(ここでは省略)
output = packed
# テンソルに戻す(RNNの出力に対して適用する)
output, _length = pad_packed_sequence(output) # PADを含む元のテンソルと各サンプルの系列長を返す
print('PADを含む元のテンソル:\n', output)
print('各サンプルの系列長:', _length)
・Encoder
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size):
"""
:param input_size: int, 入力言語の語彙数
:param hidden_size: int, 隠れ層のユニット数
"""
super(Encoder, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size, padding_idx=PAD)
self.gru = nn.GRU(hidden_size, hidden_size)
def forward(self, seqs, input_lengths, hidden=None):
"""
:param seqs: tensor, 入力のバッチ, size=(max_length, batch_size)
:param input_lengths: 入力のバッチの各サンプルの文長
:param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される
:return output: tensor, Encoderの出力, size=(max_length, batch_size, hidden_size)
:return hidden: tensor, Encoderの隠れ状態, size=(1, batch_size, hidden_size)
"""
emb = self.embedding(seqs) # seqsはパディング済み
packed = pack_padded_sequence(emb, input_lengths) # PackedSequenceオブジェクトに変換
output, hidden = self.gru(packed, hidden)
output, _ = pad_packed_sequence(output)
return output, hidden
・Decoder
class Decoder(nn.Module):
def __init__(self, hidden_size, output_size):
"""
:param hidden_size: int, 隠れ層のユニット数
:param output_size: int, 出力言語の語彙数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.embedding = nn.Embedding(output_size, hidden_size, padding_idx=PAD)
self.gru = nn.GRU(hidden_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, seqs, hidden):
"""
:param seqs: tensor, 入力のバッチ, size=(1, batch_size)
:param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される
:return output: tensor, Decoderの出力, size=(1, batch_size, output_size)
:return hidden: tensor, Decoderの隠れ状態, size=(1, batch_size, hidden_size)
"""
emb = self.embedding(seqs)
output, hidden = self.gru(emb, hidden)
output = self.out(output)
return output, hidden
・EncoderDecoder
class EncoderDecoder(nn.Module):
"""EncoderとDecoderの処理をまとめる"""
def __init__(self, input_size, output_size, hidden_size):
"""
:param input_size: int, 入力言語の語彙数
:param output_size: int, 出力言語の語彙数
:param hidden_size: int, 隠れ層のユニット数
"""
super(EncoderDecoder, self).__init__()
self.encoder = Encoder(input_size, hidden_size)
self.decoder = Decoder(hidden_size, output_size)
def forward(self, batch_X, lengths_X, max_length, batch_Y=None, use_teacher_forcing=False):
"""
:param batch_X: tensor, 入力系列のバッチ, size=(max_length, batch_size)
:param lengths_X: list, 入力系列のバッチ内の各サンプルの文長
:param max_length: int, Decoderの最大文長
:param batch_Y: tensor, Decoderで用いるターゲット系列
:param use_teacher_forcing: Decoderでターゲット系列を入力とするフラグ
:return decoder_outputs: tensor, Decoderの出力,
size=(max_length, batch_size, self.decoder.output_size)
"""
# encoderに系列を入力(複数時刻をまとめて処理)
_, encoder_hidden = self.encoder(batch_X, lengths_X)
_batch_size = batch_X.size(1)
# decoderの入力と隠れ層の初期状態を定義
decoder_input = torch.tensor([BOS] * _batch_size, dtype=torch.long, device=device) # 最初の入力にはBOSを使用する
decoder_input = decoder_input.unsqueeze(0) # (1, batch_size)
decoder_hidden = encoder_hidden # Encoderの最終隠れ状態を取得
# decoderの出力のホルダーを定義
decoder_outputs = torch.zeros(max_length, _batch_size, self.decoder.output_size, device=device) # max_length分の固定長
# 各時刻ごとに処理
for t in range(max_length):
decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
decoder_outputs[t] = decoder_output
# 次の時刻のdecoderの入力を決定
if use_teacher_forcing and batch_Y is not None: # teacher forceの場合、ターゲット系列を用いる
decoder_input = batch_Y[t].unsqueeze(0)
else: # teacher forceでない場合、自身の出力を用いる
decoder_input = decoder_output.max(-1)[1]
return decoder_outputs
・訓練
mce = nn.CrossEntropyLoss(size_average=False, ignore_index=PAD) # PADを無視する
def masked_cross_entropy(logits, target):
logits_flat = logits.view(-1, logits.size(-1)) # (max_seq_len * batch_size, output_size)
target_flat = target.view(-1) # (max_seq_len * batch_size, 1)
return mce(logits_flat, target_flat)
# ハイパーパラメータの設定
num_epochs = 20
batch_size = 64
lr = 1e-3 # 学習率
teacher_forcing_rate = 0.2 # Teacher Forcingを行う確率
ckpt_path = 'model.pth' # 学習済みのモデルを保存するパス
model_args = {
'input_size': vocab_size_X,
'output_size': vocab_size_Y,
'hidden_size': 256,
}
# データローダを定義
train_dataloader = DataLoader(train_X, train_Y, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_X, valid_Y, batch_size=batch_size, shuffle=False)
# モデルとOptimizerを定義
model = EncoderDecoder(**model_args).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
def compute_loss(batch_X, batch_Y, lengths_X, model, optimizer=None, is_train=True):
# 損失を計算する関数
model.train(is_train) # train/evalモードの切替え
# 一定確率でTeacher Forcingを行う
use_teacher_forcing = is_train and (random.random() < teacher_forcing_rate)
max_length = batch_Y.size(0)
# 推論
pred_Y = model(batch_X, lengths_X, max_length, batch_Y, use_teacher_forcing)
# 損失関数を計算
loss = masked_cross_entropy(pred_Y.contiguous(), batch_Y.contiguous())
if is_train: # 訓練時はパラメータを更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_Y = batch_Y.transpose(0, 1).contiguous().data.cpu().tolist()
pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().T.tolist()
return loss.item(), batch_Y, pred
def calc_bleu(refs, hyps):
"""
BLEUスコアを計算する関数
:param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:param hyps: list, モデルの生成した訳。単語のリストのリスト (例: ['I', 'have', 'a', 'pen'])
:return: float, BLEUスコア(0~100)
"""
refs = [[ref[:ref.index(EOS)]] for ref in refs] # EOSは評価しないで良いので切り捨てる, refsのほうは複数なのでlistが一個多くかかっている
hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps]
return 100 * bleu_score.corpus_bleu(refs, hyps)
# 訓練
best_valid_bleu = 0.
for epoch in range(1, num_epochs+1):
train_loss = 0.
train_refs = []
train_hyps = []
valid_loss = 0.
valid_refs = []
valid_hyps = []
# train
for batch in train_dataloader:
batch_X, batch_Y, lengths_X = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, lengths_X, model, optimizer,
is_train=True
)
train_loss += loss
train_refs += gold
train_hyps += pred
# valid
for batch in valid_dataloader:
batch_X, batch_Y, lengths_X = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, lengths_X, model,
is_train=False
)
valid_loss += loss
valid_refs += gold
valid_hyps += pred
# 損失をサンプル数で割って正規化
train_loss = np.sum(train_loss) / len(train_dataloader.data)
valid_loss = np.sum(valid_loss) / len(valid_dataloader.data)
# BLEUを計算
train_bleu = calc_bleu(train_refs, train_hyps)
valid_bleu = calc_bleu(valid_refs, valid_hyps)
# validationデータでBLEUが改善した場合にはモデルを保存
if valid_bleu > best_valid_bleu:
ckpt = model.state_dict()
torch.save(ckpt, ckpt_path)
best_valid_bleu = valid_bleu
print('Epoch {}: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format(
epoch, train_loss, train_bleu, valid_loss, valid_bleu))
print('-'*80)
・評価
# 学習済みモデルの読み込み
ckpt = torch.load(ckpt_path) # cpuで処理する場合はmap_locationで指定する必要があります。
model.load_state_dict(ckpt)
model.eval()
def ids_to_sentence(vocab, ids):
# IDのリストを単語のリストに変換する
return [vocab.id2word[_id] for _id in ids]
def trim_eos(ids):
# IDのリストからEOS以降の単語を除外する
if EOS in ids:
return ids[:ids.index(EOS)]
else:
return ids
# テストデータの読み込み
test_X = load_data('./data/dev.en')
test_Y = load_data('./data/dev.ja')
test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X]
test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False)
# 生成
batch_X, batch_Y, lengths_X = next(test_dataloader)
sentence_X = ' '.join(ids_to_sentence(vocab_X, batch_X.data.cpu().numpy()[:-1, 0]))
sentence_Y = ' '.join(ids_to_sentence(vocab_Y, batch_Y.data.cpu().numpy()[:-1, 0]))
print('src: {}'.format(sentence_X))
print('tgt: {}'.format(sentence_Y))
output = model(batch_X, lengths_X, max_length=20)
output = output.max(dim=-1)[1].view(-1).data.cpu().tolist()
output_sentence = ' '.join(ids_to_sentence(vocab_Y, trim_eos(output)))
output_sentence_without_trim = ' '.join(ids_to_sentence(vocab_Y, output))
print('out: {}'.format(output_sentence))
print('without trim: {}'.format(output_sentence_without_trim))
出力:
src: show your own business .
tgt: 自分 の 事 を しろ 。
out: 自分 の 仕事 は 与え な い 。
without trim: 自分 の 仕事 は 与え な い 。
・BLEUの計算
# BLEUの計算
test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False)
refs_list = []
hyp_list = []
for batch in test_dataloader:
batch_X, batch_Y, lengths_X = batch
pred_Y = model(batch_X, lengths_X, max_length=20)
pred = pred_Y.max(dim=-1)[1].view(-1).data.cpu().tolist()
refs = batch_Y.view(-1).data.cpu().tolist()
refs_list.append(refs)
hyp_list.append(pred)
bleu = calc_bleu(refs_list, hyp_list)
print(bleu)
出力:20.128962623532413
※翻訳の結果は、いまいちの結果であった
<実装:Transformer>
・データセット
def load_data(file_path):
"""
テキストファイルからデータを読み込む
:param file_path: str, テキストファイルのパス
:return data: list, 文章(単語のリスト)のリスト
"""
data = []
for line in open(file_path, encoding='utf-8'):
words = line.strip().split() # スペースで単語を分割
data.append(words)
return data
train_X = load_data('./data/train.en')
train_Y = load_data('./data/train.ja')
# 訓練データと検証データに分割
train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
MIN_COUNT = 2 # 語彙に含める単語の最低出現回数
word2id = {
PAD_TOKEN: PAD,
BOS_TOKEN: BOS,
EOS_TOKEN: EOS,
UNK_TOKEN: UNK,
}
vocab_X = Vocab(word2id=word2id)
vocab_Y = Vocab(word2id=word2id)
vocab_X.build_vocab(train_X, min_count=MIN_COUNT)
vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT)
vocab_size_X = len(vocab_X.id2word)
vocab_size_Y = len(vocab_Y.id2word)
def sentence_to_ids(vocab, sentence):
"""
単語のリストをインデックスのリストに変換する
:param vocab: Vocabのインスタンス
:param sentence: list of str
:return indices: list of int
"""
ids = [vocab.word2id.get(word, UNK) for word in sentence]
ids = [BOS] + ids + [EOS] # EOSを末尾に加える
return ids
train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X]
train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y]
valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X]
valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
class DataLoader(object):
def __init__(self, src_insts, tgt_insts, batch_size, shuffle=True):
"""
:param src_insts: list, 入力言語の文章(単語IDのリスト)のリスト
:param tgt_insts: list, 出力言語の文章(単語IDのリスト)のリスト
:param batch_size: int, バッチサイズ
:param shuffle: bool, サンプルの順番をシャッフルするか否か
"""
self.data = list(zip(src_insts, tgt_insts))
self.batch_size = batch_size
self.shuffle = shuffle
self.start_index = 0
self.reset()
def reset(self):
if self.shuffle:
self.data = shuffle(self.data, random_state=random_state)
self.start_index = 0
def __iter__(self):
return self
def __next__(self):
def preprocess_seqs(seqs):
# パディング
max_length = max([len(s) for s in seqs])
data = [s + [PAD] * (max_length - len(s)) for s in seqs]
# 単語の位置を表現するベクトルを作成
positions = [[pos+1 if w != PAD else 0 for pos, w in enumerate(seq)] for seq in data]
# テンソルに変換
data_tensor = torch.tensor(data, dtype=torch.long, device=device)
position_tensor = torch.tensor(positions, dtype=torch.long, device=device)
return data_tensor, position_tensor
# ポインタが最後まで到達したら初期化する
if self.start_index >= len(self.data):
self.reset()
raise StopIteration()
# バッチを取得して前処理
src_seqs, tgt_seqs = zip(*self.data[self.start_index:self.start_index+self.batch_size])
src_data, src_pos = preprocess_seqs(src_seqs)
tgt_data, tgt_pos = preprocess_seqs(tgt_seqs)
# ポインタを更新する
self.start_index += self.batch_size
return (src_data, src_pos), (tgt_data, tgt_pos)
・各モジュールの定義
def position_encoding_init(n_position, d_pos_vec):
"""
Positional Encodingのための行列の初期化を行う
:param n_position: int, 系列長
:param d_pos_vec: int, 隠れ層の次元数
:return torch.tensor, size=(n_position, d_pos_vec)
"""
# PADがある単語の位置はpos=0にしておき、position_encも0にする
position_enc = np.array([
[pos / np.power(10000, 2 * (j // 2) / d_pos_vec) for j in range(d_pos_vec)]
if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
return torch.tensor(position_enc, dtype=torch.float)
class ScaledDotProductAttention(nn.Module):
def __init__(self, d_model, attn_dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param attn_dropout: float, ドロップアウト率
"""
super(ScaledDotProductAttention, self).__init__()
self.temper = np.power(d_model, 0.5) # スケーリング因子
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=-1)
def forward(self, q, k, v, attn_mask):
"""
:param q: torch.tensor, queryベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:param k: torch.tensor, key,
size=(n_head*batch_size, len_k, d_model/n_head)
:param v: torch.tensor, valueベクトル,
size=(n_head*batch_size, len_v, d_model/n_head)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(n_head*batch_size, len_q, len_k)
:return output: 出力ベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:return attn: Attention
size=(n_head*batch_size, len_q, len_k)
"""
# QとKの内積でAttentionの重みを求め、スケーリングする
attn = torch.bmm(q, k.transpose(1, 2)) / self.temper # (n_head*batch_size, len_q, len_k)
# Attentionをかけたくない部分がある場合は、その部分を負の無限大に飛ばしてSoftmaxの値が0になるようにする
attn.data.masked_fill_(attn_mask, -float('inf'))
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
"""
:param n_head: int, ヘッド数
:param d_model: int, 隠れ層の次元数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(MultiHeadAttention, self).__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
# 各ヘッドごとに異なる重みで線形変換を行うための重み
# nn.Parameterを使うことで、Moduleのパラメータとして登録できる. TFでは更新が必要な変数はtf.Variableでラップするのでわかりやすい
self.w_qs = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_ks = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_vs = nn.Parameter(torch.empty([n_head, d_model, d_v], dtype=torch.float))
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.w_qs)
nn.init.xavier_normal_(self.w_ks)
nn.init.xavier_normal_(self.w_vs)
self.attention = ScaledDotProductAttention(d_model)
self.layer_norm = nn.LayerNorm(d_model) # 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正則化
self.proj = nn.Linear(n_head*d_v, d_model) # 複数ヘッド分のAttentionの結果を元のサイズに写像するための線形層
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.proj.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, q, k, v, attn_mask=None):
"""
:param q: torch.tensor, queryベクトル,
size=(batch_size, len_q, d_model)
:param k: torch.tensor, key,
size=(batch_size, len_k, d_model)
:param v: torch.tensor, valueベクトル,
size=(batch_size, len_v, d_model)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(batch_size, len_q, len_k)
:return outputs: 出力ベクトル,
size=(batch_size, len_q, d_model)
:return attns: Attention
size=(n_head*batch_size, len_q, len_k)
"""
d_k, d_v = self.d_k, self.d_v
n_head = self.n_head
# residual connectionのための入力 出力に入力をそのまま加算する
residual = q
batch_size, len_q, d_model = q.size()
batch_size, len_k, d_model = k.size()
batch_size, len_v, d_model = v.size()
# 複数ヘッド化
# torch.repeat または .repeatで指定したdimに沿って同じテンソルを作成
q_s = q.repeat(n_head, 1, 1) # (n_head*batch_size, len_q, d_model)
k_s = k.repeat(n_head, 1, 1) # (n_head*batch_size, len_k, d_model)
v_s = v.repeat(n_head, 1, 1) # (n_head*batch_size, len_v, d_model)
# ヘッドごとに並列計算させるために、n_headをdim=0に、batch_sizeをdim=1に寄せる
q_s = q_s.view(n_head, -1, d_model) # (n_head, batch_size*len_q, d_model)
k_s = k_s.view(n_head, -1, d_model) # (n_head, batch_size*len_k, d_model)
v_s = v_s.view(n_head, -1, d_model) # (n_head, batch_size*len_v, d_model)
# 各ヘッドで線形変換を並列計算(p16左側`Linear`)
q_s = torch.bmm(q_s, self.w_qs) # (n_head, batch_size*len_q, d_k)
k_s = torch.bmm(k_s, self.w_ks) # (n_head, batch_size*len_k, d_k)
v_s = torch.bmm(v_s, self.w_vs) # (n_head, batch_size*len_v, d_v)
# Attentionは各バッチ各ヘッドごとに計算させるためにbatch_sizeをdim=0に寄せる
q_s = q_s.view(-1, len_q, d_k) # (n_head*batch_size, len_q, d_k)
k_s = k_s.view(-1, len_k, d_k) # (n_head*batch_size, len_k, d_k)
v_s = v_s.view(-1, len_v, d_v) # (n_head*batch_size, len_v, d_v)
# Attentionを計算(p16.左側`Scaled Dot-Product Attention * h`)
outputs, attns = self.attention(q_s, k_s, v_s, attn_mask=attn_mask.repeat(n_head, 1, 1))
# 各ヘッドの結果を連結(p16左側`Concat`)
# torch.splitでbatch_sizeごとのn_head個のテンソルに分割
outputs = torch.split(outputs, batch_size, dim=0) # (batch_size, len_q, d_model) * n_head
# dim=-1で連結
outputs = torch.cat(outputs, dim=-1) # (batch_size, len_q, d_model*n_head)
# residual connectionのために元の大きさに写像(p16左側`Linear`)
outputs = self.proj(outputs) # (batch_size, len_q, d_model)
outputs = self.dropout(outputs)
outputs = self.layer_norm(outputs + residual)
return outputs, attns
class PositionwiseFeedForward(nn.Module):
"""
:param d_hid: int, 隠れ層1層目の次元数
:param d_inner_hid: int, 隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
def __init__(self, d_hid, d_inner_hid, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
# window size 1のconv層を定義することでPosition wiseな全結合層を実現する.
self.w_1 = nn.Conv1d(d_hid, d_inner_hid, 1)
self.w_2 = nn.Conv1d(d_inner_hid, d_hid, 1)
self.layer_norm = nn.LayerNorm(d_hid)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
def forward(self, x):
"""
:param x: torch.tensor,
size=(batch_size, max_length, d_hid)
:return: torch.tensor,
size=(batch_size, max_length, d_hid)
"""
residual = x
output = self.relu(self.w_1(x.transpose(1, 2)))
output = self.w_2(output).transpose(2, 1)
output = self.dropout(output)
return self.layer_norm(output + residual)
def get_attn_padding_mask(seq_q, seq_k):
"""
keyのPADに対するattentionを0にするためのマスクを作成する
:param seq_q: tensor, queryの系列, size=(batch_size, len_q)
:param seq_k: tensor, keyの系列, size=(batch_size, len_k)
:return pad_attn_mask: tensor, size=(batch_size, len_q, len_k)
"""
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(PAD).unsqueeze(1) # (N, 1, len_k) PAD以外のidを全て0にする
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # (N, len_q, len_k)
return pad_attn_mask
_seq_q = torch.tensor([[1, 2, 3]])
_seq_k = torch.tensor([[4, 5, 6, 7, PAD]])
_mask = get_attn_padding_mask(_seq_q, _seq_k) # 行がquery、列がkeyに対応し、key側がPAD(=0)の時刻だけ1で他が0の行列ができる
print('query:\n', _seq_q)
print('key:\n', _seq_k)
print('mask:\n', _mask)
def get_attn_subsequent_mask(seq):
"""
未来の情報に対するattentionを0にするためのマスクを作成する
:param seq: tensor, size=(batch_size, length)
:return subsequent_mask: tensor, size=(batch_size, length, length)
"""
attn_shape = (seq.size(1), seq.size(1))
# 上三角行列(diagonal=1: 対角線より上が1で下が0)
subsequent_mask = torch.triu(torch.ones(attn_shape, dtype=torch.uint8, device=device), diagonal=1)
subsequent_mask = subsequent_mask.repeat(seq.size(0), 1, 1)
return subsequent_mask
_seq = torch.tensor([[1,2,3,4]])
_mask = get_attn_subsequent_mask(_seq) # 行がquery、列がkeyに対応し、queryより未来のkeyの値が1で他は0の行列ができいる
print('seq:\n', _seq)
print('mask:\n', _mask)
・モデルの定義
class EncoderLayer(nn.Module):
"""Encoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(EncoderLayer, self).__init__()
# Encoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout)
# Postionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
"""
:param enc_input: tensor, Encoderの入力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attn: tensor, EncoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてEncoderの入力(enc_input)が入る
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, attn_mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
class Encoder(nn.Module):
"""EncoderLayerブロックからなるEncoderのクラス"""
def __init__(
self, n_src_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Encoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=PAD)
# EncoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, src_seq, src_pos):
"""
:param src_seq: tensor, 入力系列,
size=(batch_size, max_length)
:param src_pos: tensor, 入力系列の各単語の位置情報,
size=(batch_size, max_length)
:return enc_output: tensor, Encoderの最終出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attns: list, EncoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
enc_input = self.src_word_emb(src_seq)
# Positional EncodingのEmbeddingを加算する
enc_input += self.position_enc(src_pos)
enc_slf_attns = []
enc_output = enc_input
# key(=enc_input)のPADに対応する部分のみ1のマスクを作成
enc_slf_attn_mask = get_attn_padding_mask(src_seq, src_seq)
# n_layers個のEncoderLayerに入力を通す
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(
enc_output, slf_attn_mask=enc_slf_attn_mask)
enc_slf_attns += [enc_slf_attn]
return enc_output, enc_slf_attns
class DecoderLayer(nn.Module):
"""Decoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(DecoderLayer, self).__init__()
# Decoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Encoder-Decoder間のSource-Target Attention
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Positionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, dec_input, enc_output, slf_attn_mask=None, dec_enc_attn_mask=None):
"""
:param dec_input: tensor, Decoderの入力,
size=(batch_size, max_length, d_model)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:param dec_enc_attn_mask: tensor, Soutce-Target Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return dec_output: tensor, Decoderの出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attn: tensor, DecoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
:return dec_enc_attn: tensor, DecoderのSoutce-Target Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてDecoderの入力(dec_input)が入る
dec_output, dec_slf_attn = self.slf_attn(
dec_input, dec_input, dec_input, attn_mask=slf_attn_mask)
# Source-Target-AttentionのqueryにはDecoderの出力(dec_output), key, valueにはEncoderの出力(enc_output)が入る
dec_output, dec_enc_attn = self.enc_attn(
dec_output, enc_output, enc_output, attn_mask=dec_enc_attn_mask)
dec_output = self.pos_ffn(dec_output)
return dec_output, dec_slf_attn, dec_enc_attn
class Decoder(nn.Module):
"""DecoderLayerブロックからなるDecoderのクラス"""
def __init__(
self, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(
n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.tgt_word_emb = nn.Embedding(
n_tgt_vocab, d_word_vec, padding_idx=PAD)
self.dropout = nn.Dropout(dropout)
# DecoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, tgt_seq, tgt_pos, src_seq, enc_output):
"""
:param tgt_seq: tensor, 出力系列,
size=(batch_size, max_length)
:param tgt_pos: tensor, 出力系列の各単語の位置情報,
size=(batch_size, max_length)
:param src_seq: tensor, 入力系列,
size=(batch_size, n_src_vocab)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return dec_output: tensor, Decoderの最終出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
dec_input = self.tgt_word_emb(tgt_seq)
# Positional EncodingのEmbeddingを加算する
dec_input += self.position_enc(tgt_pos)
# Self-Attention用のマスクを作成
# key(=dec_input)のPADに対応する部分が1のマスクと、queryから見たkeyの未来の情報に対応する部分が1のマスクのORをとる
dec_slf_attn_pad_mask = get_attn_padding_mask(tgt_seq, tgt_seq) # (N, max_length, max_length)
dec_slf_attn_sub_mask = get_attn_subsequent_mask(tgt_seq) # (N, max_length, max_length)
dec_slf_attn_mask = torch.gt(dec_slf_attn_pad_mask + dec_slf_attn_sub_mask, 0) # ORをとる
# key(=dec_input)のPADに対応する部分のみ1のマスクを作成
dec_enc_attn_pad_mask = get_attn_padding_mask(tgt_seq, src_seq) # (N, max_length, max_length)
dec_slf_attns, dec_enc_attns = [], []
dec_output = dec_input
# n_layers個のDecoderLayerに入力を通す
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output,
slf_attn_mask=dec_slf_attn_mask,
dec_enc_attn_mask=dec_enc_attn_pad_mask)
dec_slf_attns += [dec_slf_attn]
dec_enc_attns += [dec_enc_attn]
return dec_output, dec_slf_attns, dec_enc_attns
class Transformer(nn.Module):
"""Transformerのモデル全体のクラス"""
def __init__(
self, n_src_vocab, n_tgt_vocab, max_length, n_layers=6, n_head=8,
d_word_vec=512, d_model=512, d_inner_hid=1024, d_k=64, d_v=64,
dropout=0.1, proj_share_weight=True):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
:param proj_share_weight: bool, 出力言語の単語のEmbeddingと出力の写像で重みを共有する
"""
super(Transformer, self).__init__()
self.encoder = Encoder(
n_src_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.decoder = Decoder(
n_tgt_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.tgt_word_proj = nn.Linear(d_model, n_tgt_vocab, bias=False)
nn.init.xavier_normal_(self.tgt_word_proj.weight)
self.dropout = nn.Dropout(dropout)
assert d_model == d_word_vec # 各モジュールの出力のサイズは揃える
if proj_share_weight:
# 出力言語の単語のEmbeddingと出力の写像で重みを共有する
assert d_model == d_word_vec
self.tgt_word_proj.weight = self.decoder.tgt_word_emb.weight
def get_trainable_parameters(self):
# Positional Encoding以外のパラメータを更新する
enc_freezed_param_ids = set(map(id, self.encoder.position_enc.parameters()))
dec_freezed_param_ids = set(map(id, self.decoder.position_enc.parameters()))
freezed_param_ids = enc_freezed_param_ids | dec_freezed_param_ids
return (p for p in self.parameters() if id(p) not in freezed_param_ids)
def forward(self, src, tgt):
src_seq, src_pos = src
tgt_seq, tgt_pos = tgt
src_seq = src_seq[:, 1:]
src_pos = src_pos[:, 1:]
tgt_seq = tgt_seq[:, :-1]
tgt_pos = tgt_pos[:, :-1]
enc_output, *_ = self.encoder(src_seq, src_pos)
dec_output, *_ = self.decoder(tgt_seq, tgt_pos, src_seq, enc_output)
seq_logit = self.tgt_word_proj(dec_output)
return seq_logit
・学習
def compute_loss(batch_X, batch_Y, model, criterion, optimizer=None, is_train=True):
# バッチの損失を計算
model.train(is_train)
pred_Y = model(batch_X, batch_Y)
gold = batch_Y[0][:, 1:].contiguous()
# gold = batch_Y[0].contiguous()
loss = criterion(pred_Y.view(-1, pred_Y.size(2)), gold.view(-1))
if is_train: # 訓練時はパラメータを更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
gold = gold.data.cpu().numpy().tolist()
pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().tolist()
return loss.item(), gold, pred
MAX_LENGTH = 20
batch_size = 64
num_epochs = 15
lr = 0.001
ckpt_path = 'transformer.pth'
max_length = MAX_LENGTH + 2
model_args = {
'n_src_vocab': vocab_size_X,
'n_tgt_vocab': vocab_size_Y,
'max_length': max_length,
'proj_share_weight': True,
'd_k': 32,
'd_v': 32,
'd_model': 128,
'd_word_vec': 128,
'd_inner_hid': 256,
'n_layers': 3,
'n_head': 6,
'dropout': 0.1,
}
# DataLoaderやモデルを定義
train_dataloader = DataLoader(
train_X, train_Y, batch_size
)
valid_dataloader = DataLoader(
valid_X, valid_Y, batch_size,
shuffle=False
)
model = Transformer(**model_args).to(device)
optimizer = optim.Adam(model.get_trainable_parameters(), lr=lr)
criterion = nn.CrossEntropyLoss(ignore_index=PAD, size_average=False).to(device)
def calc_bleu(refs, hyps):
"""
BLEUスコアを計算する関数
:param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:param hyps: list, モデルの生成した訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:return: float, BLEUスコア(0~100)
"""
refs = [[ref[:ref.index(EOS)]] for ref in refs]
hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps]
return 100 * bleu_score.corpus_bleu(refs, hyps)
# 訓練
best_valid_bleu = 0.
for epoch in range(1, num_epochs+1):
start = time.time()
train_loss = 0.
train_refs = []
train_hyps = []
valid_loss = 0.
valid_refs = []
valid_hyps = []
# train
for batch in train_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, optimizer, is_train=True
)
train_loss += loss
train_refs += gold
train_hyps += pred
# valid
for batch in valid_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, is_train=False
)
valid_loss += loss
valid_refs += gold
valid_hyps += pred
# 損失をサンプル数で割って正規化
train_loss /= len(train_dataloader.data)
valid_loss /= len(valid_dataloader.data)
# BLEUを計算
train_bleu = calc_bleu(train_refs, train_hyps)
valid_bleu = calc_bleu(valid_refs, valid_hyps)
# validationデータでBLEUが改善した場合にはモデルを保存
if valid_bleu > best_valid_bleu:
ckpt = model.state_dict()
torch.save(ckpt, ckpt_path)
best_valid_bleu = valid_bleu
elapsed_time = (time.time()-start) / 60
print('Epoch {} [{:.1f}min]: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format(
epoch, elapsed_time, train_loss, train_bleu, valid_loss, valid_bleu))
print('-'*80)
・評価
def test(model, src, max_length=20):
# 学習済みモデルで系列を生成する
model.eval()
src_seq, src_pos = src
batch_size = src_seq.size(0)
enc_output, enc_slf_attns = model.encoder(src_seq, src_pos)
tgt_seq = torch.full([batch_size, 1], BOS, dtype=torch.long, device=device)
tgt_pos = torch.arange(1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
# 時刻ごとに処理
for t in range(1, max_length+1):
dec_output, dec_slf_attns, dec_enc_attns = model.decoder(
tgt_seq, tgt_pos, src_seq, enc_output)
dec_output = model.tgt_word_proj(dec_output)
out = dec_output[:, -1, :].max(dim=-1)[1].unsqueeze(1)
# 自身の出力を次の時刻の入力にする
tgt_seq = torch.cat([tgt_seq, out], dim=-1)
tgt_pos = torch.arange(t+1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
return tgt_seq[:, 1:], enc_slf_attns, dec_slf_attns, dec_enc_attns
def ids_to_sentence(vocab, ids):
# IDのリストを単語のリストに変換する
return [vocab.id2word[_id] for _id in ids]
def trim_eos(ids):
# IDのリストからEOS以降の単語を除外する
if EOS in ids:
return ids[:ids.index(EOS)]
else:
return ids
# 学習済みモデルの読み込み
model = Transformer(**model_args).to(device)
ckpt = torch.load(ckpt_path)
model.load_state_dict(ckpt)
# テストデータの読み込み
test_X = load_data('./data/dev.en')
test_Y = load_data('./data/dev.ja')
test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X]
test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
・生成
test_dataloader = DataLoader(
test_X, test_Y, 1,
shuffle=False
)
src, tgt = next(test_dataloader)
src_ids = src[0][0].cpu().numpy()
tgt_ids = tgt[0][0].cpu().numpy()
print('src: {}'.format(' '.join(ids_to_sentence(vocab_X, src_ids[1:-1]))))
print('tgt: {}'.format(' '.join(ids_to_sentence(vocab_Y, tgt_ids[1:-1]))))
preds, enc_slf_attns, dec_slf_attns, dec_enc_attns = test(model, src)
pred_ids = preds[0].data.cpu().numpy().tolist()
print('out: {}'.format(' '.join(ids_to_sentence(vocab_Y, trim_eos(pred_ids)))))
出力:
src: show your own business .
tgt: 自分 の 事 を しろ 。
out: 自分 の 問題 を 教え て くれ る よう に し て くれ た 。
# BLEUの評価
test_dataloader = DataLoader(
test_X, test_Y, 128,
shuffle=False
)
refs_list = []
hyp_list = []
for batch in test_dataloader:
batch_X, batch_Y = batch
preds, *_ = test(model, batch_X)
preds = preds.data.cpu().numpy().tolist()
refs = batch_Y[0].data.cpu().numpy()[:, 1:].tolist()
refs_list += refs
hyp_list += preds
bleu = calc_bleu(refs_list, hyp_list)
print(bleu)
出力:25.94844231217106
※Seq2Seqよりも高い値となった(訳は微妙な結果であるが、、、)
#Section6:物体検知・セグメンテーション
<物体認識タスク>
分類:(画像に対し単一または複数の)クラスラベル
物体検知:Bounding Box
セマンティックセグメンテーション(意味領域分割):(各ピクセルに対し単一の) クラスラベル
インスタンスセグメンテーション(個体領域分割):(各ピクセルに対し単一の) クラスラベル
<代表的なデータセット>
VOC12(Visual Object Classes)
ILSVRC12: ImageNetのサブセット
MS COCO18 (Microsoft Common Object in Context)
OICOD18 (Open Images Challenges Object Detection)
<評価指標>
IOU(Intersection over Union:Jaccard係数)
物体位置の予測精度を評価する指標です。
予測したBounding Boxに対する専有面積を表します
<物体検知の大枠>
物体検知のフレームワークの2段階検出と1段階検出の2種類に大別される。
2段階検出とは、候補領域の検出とクラス推定を別々に行う(RCNN系)
1段階検出は、候補領域の検出とクラス推定を同時に行う(SSD、YOLO、RetinaNetなど)
<SSDの特徴>
・畳み込みニューラルネットワークの途中で、各層での大きいサイズの特徴マップ
も利用する事で、比較的小さな物体の検出も可能にしている。
#Section7:GAN
受験資格認定テストにGAN(DCGAN)に関する設問が出ていたので下記にまとめる
・DiscriminatorとGeneratorの更新に関するソースコード穴埋め問題があった。
Discriminatorは、Real_lossとfake_lossのバイナリクロスエントロピーを計算するため
下記のようになる(Discriminatorは、realは1に、fakeは0にしたい)
real_loss = tf.keras.losses.binary_crossentropy(tf.ones_like(real_pred), real_pred)
fake_loss = tf.keras.losses.binary_crossentropy(tf.zeros_like(fake_pred), fake_pred)
Generatorrは、fake_lossのバイナリクロスエントロピーを計算するため下記のようになる
(Generatorは、fakeを0にしたい。バイナリクロスエントロピは
マイナスなので、打ち消すため頭にマイナスを入れている。)
fake_loss = -tf.keras.losses.binary_crossentropy(tf.zeros_like(fake_pred), fake_pred)
2.GANとDCGANの違い
⇒
・隠れ層に全結合を用いず、畳込みとする。
・プーリングの代わりにストライドの畳み込みを用いる。
・バッチ正規化をすべての層に対して行う。
・Generatorの活性化関数は出力層はtanh、それ以外はReLuを用いる。
・Discriminatorのすべての層の活性関数にLeakyReLuを用いる