#0 はじめに
ラビットチャレンジを受講した際に提出が必要となるレポート記事である。
#1 深層学習Day3
#1.1 再帰型ニューラルネットワークの概念
#1.1.1 RNN全体像
#1.1.1.1 RNNとは
再帰型ニューラルネットワーク(Reccurent Neural Network)とは、時系列データに対応できるニューラルネットワークの事である。
#1.1.1.2 時系列データ
時系列データとは、時間的順序を追って一定の間隔ごとに観察され、相互に統計的な依存関係が認められるようなデータのことである。
例としては、音声データやテキストデータ、株価等が挙げられる。
#1.1.1.3 RNNについて
時系列データを扱うために、初期の状態や、過去の状態を保持し、次の状態を求める再帰的構造が必要となる。下図のような構造で実現している
u^t = W_{(in)}x^t + Wz^{t-1} + b\\
x^t = f(W_{(in)}x^t + Wz^{t-1} + b)\\
v^t = W_{out}z^t + c\\
y^t = g(W_{(out)}z^t + c)
#1.1.2 BPTT
#1.1.2.1 BPTTとは
BPTT(Back Propagation Trough Time)とは、RNNにおいて誤差逆伝搬を適用したものである。
#1.1.2.2 BPTTの数学的記述
\frac{\partial E}{\partial W_{in}} = \sigma^t\begin{bmatrix} x^t \end{bmatrix} ^T\\
\frac{\partial E}{\partial W_{out}} = \sigma^{out,t}\begin{bmatrix} Z^t \end{bmatrix} ^T\\
\frac{\partial E}{\partial W} = \sigma^t\begin{bmatrix} z^{t-1} \end{bmatrix} ^T\\
\frac{\partial E}{\partial b} = \sigma^t\\
\frac{\partial E}{\partial c} = \sigma^{out,t}\\
#1.1.2.3 BPTTの全体像
BPTTでは、時系列を遡れば遡るほど勾配が消失してしまう問題がある。解決策としては以下がある。
・正則化
・勾配消失に強い活性化関数
・構造自体を変更して解決(LSTM)
#1.2 LSTM
LSTM(Long short-term memory)とはRNNの勾配消失問題を解決した時系列データに対応するモデルのことである。入力ゲート、忘却ゲート、出力ゲートの3つのゲートから作られている。
#1.2.1 CEC
記憶機能のみをもつ。勾配消失、爆発の解決方法として、勾配が1であれば解決できる。CECでは覚えるだけなので勾配の操作をせず、1となる。
CECの課題として、入力データの時間依存度が関係なく重みが一律であるため、ニューラルネットワークの学習特性がないことが挙げられる。
#1.2.2 入力ゲートと出力ゲート
CECの課題を解決するために設けられたゲートである。CECに何を覚えさせるか、何を出力させるかの役割をもつ。
#1.2.3 忘却ゲート
入力ゲートと出力ゲートを設けただけでは、過去の情報が全て保管されており、必要ない情報を削除することができない。この問題を解決するために、過去の情報を削除する忘却ゲートが誕生した。
#1.2.4 覗き穴結合
CECに保存されている過去の情報を、任意のタイミングで他のノードに伝播させたり、任意のタイミングで忘却させるための構造のこと。
#1.3 GRU
LSTMではパラメータが多数存在しており、計算負荷が大きい。この問題を解決するためにパラメータを大幅に削減し、精度が同等、又はそれ以上が望めるようになった構造のこと。計算負荷が低いことがメリットとして挙げられる。
#1.4 双方向RNN
過去の情報だけでなく、未来の情報を加味することで、精度を向上させるためのモデルのことである。文章の推敲や、機械翻訳に用いられる。
#1.5 Seq2Seq
Encoder-Decoderモデルの一種。機械対話や機械翻訳等に使用されている。
#1.5.1 Encorder RNN
分の意味を集約する。テキストデータを単語等のトークンを区切って渡す構造のこと。
・Taiking:文章を単語等のトークン毎に分割し、トークン毎のIDに分割する。
・Embedding:IDから、そのトークンを表す分散表現ベクトル(one-hot-vector→embedding)に変換する。
・Encorder RNN:ベクトルを順番にRNNに入力する。
#1.5.2 Decorder RNN
入力から次の文字を予測する構造のこと。
・Sampling:生成確立に基づいてtokenをランダムに選ぶ。
・Embedding:tokenをEmbeddingしてDecorder RNNへの入力とする。
・Detokenize:繰り返し得られたtokenを文字列に直す。
#1.5.3 HRED
seq2seqで文脈の意味ベクトルを追加することで意味をくみ取れるようにした構造のこと。確率的な多様性が字面にしかなく、会話の流れのような多用性がないことが課題として挙げられる。
#1.5.4 VHRED
HREDにVAEの潜在変数の概念を追加し、HREDの課題を解決した構造のこと。
#1.5.5 VAE
#1.5.5.1 オートエンコーダ
オートエンコーダとは、教師なし学習の一つである。入力データから潜在変数zに変換するNNをEncorderとして用い、潜在変数をインプットとし、元画像を復元するNNをDecorderとして用いた構造のこと。メリットとして、次元削減が挙げられる。
#1.5.5.2 VAE
通常のオートエンコーダの場合、潜在変数zにデータを押し込めているが、構造の状態がわからない。この潜在変数を生起確率分布と仮定した構造のこと。
#1.6 Word2vec
word2vecはNNを通して、単語の分散表現を獲得するための手法である。
RNNでは、単語のような可変長の文字列をNNに与えることができないため、固定長形式で単語を表す必要がある。
学習データからボキャブラリを作成し、辞書の単語の数だけone-hot-vectorが作成される。
メリットとして、大規模データの分散表現の学習が、現実的な計算速度とメモリ量で実現可能になったことが挙げられる。
#1.7 Attention Mechanism
seq2seqでは、単語の数に関わらず固定次元ベクトルの中に入力しなければならず、長い文章への対応が難しかった。そのため、文章が長くなれば長くなるほどシーケンスの内部表現の次元も大きくなっていく仕組みが必要であった。
Attention Mechanismとは、入力と出力のどの単語が関連しているかの関連戸を学習する仕組みである。
#2 深層学習Day4
#2.1 強化学習
#2.1.1 強化学習とは
長期的に報酬を最大化できるように環境の中で行動を選択できるエージェントを作ることを目標とする機械学習の一分野である。
行動の結果から与えられる報酬をもとに行動を決定する原理を改善する仕組みである。
#2.1.2 強化学習の応用例
エージェント:プロフィールと購入履歴に基づいてキャンペーンメールを送る顧客を決めるソフトウェア
行動:顧客ごとに送信、非送信のふたつの行動
報酬:キャンペーンのコスト(負の報酬)
キャンペーンで生み出されると推測される売り上げ(正の報酬)
#2.1.3 探索と利用のトレードオフ
過去データからベストな行動をとり続けるともっとベストな行動をとれず、未知な行動のみをとり続けると過去のデータを生かせないという関係がある。
#2.1.5 強化学習の差分
強化学習と、教師あり教師なし学習の違いはについて述べる。
教師あり、教師なし学習ではデータに含まれるパターン(特徴量)を見つけ出し、予測するのが目標であるが、強化学習では、優れた方策を見つけるのが目標となる。
#2.1.6 行動価値関数
状態価値関数と行動価値関数の2種類がある。
・状態価値関数
状態の価値に注目したもの。
・行動価値関数
状態と価値を組み合わせた価値に注目したもの。
#2.1.7 方策関数
方策ベースの強化学習において、エージェントがどんな行動をとるのかを決める関数のこと。
#2.1.8 方策勾配法
方策をモデル化し、最適化する手法のこと。Jは方策の良さであり、定義しなければならない。定義方法は以下の方法がある。
・平均報酬
・割引報酬和
上記の定義に対応して、行動価値関数を定義することで方策勾配定理うが成り立つ。
\theta ^{t+1} = \theta^t + \epsilon \nabla J(\theta)\\
\nabla J(\theta) = E_{\pi \theta}\begin{bmatrix}(\nabla _{\theta} log \pi_{\theta}(a|s)Q^{\pi}(s, a)) \end{bmatrix}
#2.2 Alpha Go
Alpha GoにはAlphaGo(Lee)とAlpha Go Zeroの2種類がある。Alpha Goでは状況認識にCNN、次の手の選択にモンテカルロ木を用いている。
#2.2.1 Alpha Go (Lee)
Alpha Goでは以下の手順で学習が行われる。
1.教師あり学習によるRollOutPolicyとPolicyNetの学習
2.強化学習によるPolicyNetの学習
3.強化学習によるValueNetの学習
#2.2.1.1 Policy Netの教師あり学習
棋譜データ3000万局面分の教師を用意し、同じ着手を予測できるように学習を行っている。精度は57%程度である。
#2.2.1.2 Policy Netの強化学習
現状のPolicy NetとPolicy Poolからランダムに選択されたPolicy Netと対局シミュレーションを行い、方策勾配法で学習を行った。Policy Poolに保存されているものと対局する理由は、過学習を防ぐためである。
#2.2.1.3 Value Netの学習
Policy Netを使用し、対局シミュレーションを行い結果の勝敗を教師として学習させている。以下の手順で教師を作成した。
1.Policy Net(教師あり学習)でN手まで打つ。
2.N+1手目の手をランダムに選択し、その手で進めた局面をS(N+1)とする。
3.SからRL Policy Netで終局まで打ち、勝敗報酬をRとする。
#2.2.1.4 モンテカルロ木探索
盤面評価値に頼らず末端評価値のみで探索を行う探索法のこと。該当手のシミュレーション回数が一定回数を超えたら、その手を着手したあとの局面をシミュレーション開始局面とするように探索木を成長させる。
#2.2.2 Alpha Go Zero
#2.2.2.1 Alpha GoとAlpha Go Zeroの違い
1.教師あり学習を行わない。
2.特徴入力からヒューリスティックな要素を排除し、石の配置のみにしている。
3.Policy NetとValue Netを1つのネットワークに統合している。
4.Residual Netを導入している。
5.モンテカルロ木探索からRoll Outシミュレーションをなくしている。
#2.2.2.2 Residual Network
ネットワークにショートカット構造を追加し、勾配爆発、焼失を抑える効果をもたせたもの。基本構造はConvolution → Batch Norm → ReLU → Convolution → Batch Norm → Add → ReLUのブロックを単位として積み重ねている。
#2.3 軽量化・高速化技術
深層学習には大量にデータを使用するため、パラメータの調整に多くの時間を使用するため、高速な計算が求められる。
毎年10倍のペースで機械学習の計算量が増えているが、18か月で2倍の性能のPCしかでない。
そのため、複数のPCを使用し、並列的にニューラルネットを構成し、効率の良い学習を行う。
#2.3.1 データ並列化
親モデルをワーカー(PC)に子モデルとしてコピーし、データを分割して、各ワーカー毎に計算させること。
#2.3.1.1 同期型
各ワーカーが計算し終わるのを待ち、全ワーカーの勾配が出たところで勾配の平均を計算し、親モデルのパラメータを更新する。
#2.3.1.2 非同期型
計算し終わるのを待たず、各子モデル毎にパラメータを更新する。
学習が終わった子モデルはパラメータサーバにPushされ、新たに学習を始めるときはパラメータサーバから新しいモデルをPopし、学習する。
#2.3.1.3 比較
非同期型の方が処理が速い。ベストのモデルパラメータを利用できないため、同期型の方が精度がいい。主流は同期型だが、場面で利用される手法が変わる。
自分たちでサーバーを用意する場合は同期型。
世界中のものを使って学習する場合は非同期型。
#2.3.2 モデル並列化
親モデルを各ワーカーに分割し、それぞれのモデルを学習させ、すべてのデータで学習が終わった後に復元すること。モデルのパラメータ数が多いほど効率が良くなる。
モデルが大きい場合、モデル並列化を利用し、データが大きい場合はデータ並列化を利用するとよい。
#2.3.3 GPUによる高速化
#2.3.3.1 GPGPU
元々の使用用途であるグラフィック以外の用途に用いられるGPUの総称のこと。利用する際は下記の開発環境を利用する。
#2.3.3.1.1 CUDA
NVIDIA社製のGPUでのみ使用可能である。また、Deep Learning用であるため、使いやすい。
#2.3.3.1.2 OpenCL
NVIDIA社製以外のGPUでも使用可能であるが、Deep Learningに特化していない。
#2.3.3.2 CPU
高性能なコア少数からなるプロセッサのことである。簡単な処理だけでなく、複雑で連続的な処理をすることができる。
#2.3.3.3 GPU
GPUは画像処理に特化したプロセッサである。
CPUに比べ、比較的低性能なコアが多数存在し、簡単な並列処理が得意である。ニューラルネットの学習は単純な行列計算が多いため、GPUを用いると高速な学習を行うことができる。
#2.3.4 モデルの軽量化
モデルの精度を維持しつつ、計算回数を低減する手法の総称である。軽量化を行うことで近年大きく普及しているモバイル端末や、IoT機器との相性が良い。
#2.3.4.1 量子化
ネットワークが大きくなると、大量のパラメータが必要になり、学習、推論に多くのメモリ、演算処理が必要となる。パラメータのbit数を落とすことでメモリの演算処理と削減を実現する手法のこと。
(64bit→8byte、32bit→4byte)
メリット
・高速化
・省メモリ化
欠点
・精度の低下(実際は64bitを32bitにしてもあまり精度は変わらない。)
#2.3.4.2 蒸留
精度の高いモデルはニューロンの規模が大規模になっているため、推論に多くのメモリと演算処理必要となる。規模の大きなモデルの知識を継承し、軽量なモデルの作成を行う手法のこと。
#2.3.4.3 プルーニング
ネットワークが大きくなると大量のパラメータが全てのニューロンの計算が精度に寄与しているわけではない。モデルの精度に寄与が少ないニューロンを削減し、モデルの軽量化、高速化を行う手法のこと。
#2.4 応用モデル
#2.4.1 Mobile Net
画像認識モデルの一種である。ディープラーニングモデルは精度が良いが、ネットワークが深くなり計算量が増えてしまう。そのため、多くの計算リソースが必要であり、お金がかかる。こういった問題を解決し、軽量化、高速化、高精度化を実現したモデルである。
Depthwise Convolution(フィルタの数を1に固定)とPointwise Convolution(1*1でフィルタ)を組み合わせたDepthwise Separatable Convolutionを用いて実現した。
#2.4.2 Dense Net
画像認識モデルの一種である。Dense Blockというモジュールを用いてディープラーニングモデルの問題を解決した。
Dense Blockとは、出力層に前の層の結果を付け加えていくブロックのことである。層が進むにつれてチャンネル数が増えていき、増える量が成長率といい、ハイパーパラメータとなる。
増えたチャンネルはTransition Layerで削減することができる。
#2.4.3 Layer正規化/ Instance正規化
#2.4.3.1 Layer Norm
1つの画像に対してH、W、C全てを正規化する手法のこと。
#2.4.3.2 Instance Norm
1つの画像に対してC(チャンネル)のみを正規化する手法のこと。
#2.4.4 Wave Net
音声生成モデルの一種である。Pixel CNNを音声に適用したものである。時系列データに対して畳み込み演算を行っているが、層が深くあるにつれて畳み込むリンクを話すことで受容野を増やしている。
#2.5 Transformer
Transformerとは従来のEncoder-Decoderモデルの問題点である、翻訳元の分の内容を一つのベクトルで表現しているため、文が長いと表現力が足りなくなってしまうという点に着目し、解決するためにAttention(注意機構)を導入し、RNNを使用しないモデルである。
#2.5.1 Attention
Attentionとは辞書オブジェクトのことであり、検索クエリに一致するkeyを索引し、対応するvalueを取り出す操作の機能である。RNNでは文長が長くなっても翻訳精度が落ちてしまうが、Attentionを用いたものでは精度が落ちない。
Attentionには大きく分けて以下の2種類が存在する。
・ソース・ターゲット注意機構
・自己注意機構
#2.5.1.1 Transformer-Encoder
自己注意機構により文脈を考慮して各単語をエンコードするEncoderである。
#2.5.1.2 Self-Attention
検索クエリに全ての入力を与え、学習的に注意箇所を決めていく。
#2.5.1.3 Position-Wise Feed-Forward Networks
位置情報を保持したまま順伝搬をさせるネットワークである。
2層の全結合NNになっており、線形変換、ReLu、線形変換と行っていく。
#2.5.1.4 Scaled dot product attention
全単語に関するAttentionをまとめて計算する機構である。
#2.5.1.5 Multi-Head attention
重みパラメータの異なる8個のヘッドを使用する機構である。
8個のScaled Dot-Product Attentionの出力をConcatする。それぞれのヘッドが異なる種類の情報を収集してくれる。
#2.5.1.6 DecoderはEncoderと同じ6層であり、仕組みは殆どEncoderと同じである。
#2.6 物体検知・セグメンテーション
物体検知には以下の4つのタスクがある。下のタスクの方が難しい。
・Classification 分類
・Object Detection 物体検知
・Semantic Segmentation 意味領域分割
・Instance Segmentation 個体領域分割
#2.6.1 Data Set
#2.6.1.1 VOC12 (Visual Object Classes)
クラス数:20
Train+Val:11,540
Box/画像:2.4
Instance Annotation:あり
#2.6.1.2 ILSVRC17(ImageNet Scale Visual Recognition Challenge)
クラス数:20
Train+Val:476,668
Box/画像:1.1
Instance Annotation:なし
#2.6.1.3 MS COCO18(COmmon Object in Context)
クラス数:80
Train+Val:123,287
Box/画像:7.3
Instance Annotation:あり
#2.6.1.4 OICOD18(Open Images Challenge Object Detection)
クラス数:500
Train+Val:1,743,042
Box/画像:7.0
Instance Annotation:あり
TP:真値と予測値の一致している箇所
FP:真値内で予測値に含まれていない箇所
FN:予測値内で真値に含まれていない箇所
TN:真値が全く予測されていない箇所
IoU(Intersection over Union)は物体位置の予測精度も評価できる。confidenceとIoUを用いて正しく検出できているかを評価する。
Precision = \frac{TP}{TP+FP}\\
Recall = \frac{TP}{TP+FN}\\
IoU = \frac{TP}{TP + FP + FN}
#2.6.2.1 mAP(mean Average Precision)
IoUの閾値を0.5に固定し、各クラスに対してPrecisionを計算し、平均した値のこと。
#2.6.2.2 FPS(Flame per Second)
検出速度のこと。
#2.6.3 物体検知の大枠
2012年にAlexNetが登場し、時代がSIFTからDCNNに移行した。
SIFT:Scale Invariant Feature Transform
DCNN:Deep Convolutional Neural Network
#2.6.3.1 物体検知のフレームワーク
#2.6.3.1.1 2段階検出器(Two-stage detector)
領域検出とクラス推定を別々に行う検出器のこと。
・1段階に比べ、精度が高い
・1段階に比べ、計算量が多く、推論も遅い
#2.6.3.1.2 1段階検出器(one-stage detector)
領域検出とクラス推定を同時に行う検出器のこと。
・2段階に比べ、精度が低い
・1段階に比べ、計算量が少なく、推論も早い
#2.6.4 SSD(Single Shot Detector)
1段階検出器の一つであり、VGG16の構造を基本としてる。
デフォルトボックスを用意し、変形することで各クラスに対するconfidenceを出力する。
多数のデフォルトボックスを用意することで生じる問題への対処法を以下に示す。
・Non-Maximum Suppression
複数のデフォルトボックスからConfidenceが最大のものみを残す。
・Hard Negative Mining
背景を切り出すボックスの数を背景以外のボックス数の3倍までルールを設け、情報量を減らす。
#2.6.5 Semantic Segmentation
convolutionやpoolingを行うと入力画像の解像度が落ちて行ってしまう問題がある。Semantic Segmentationでは入力サイズと同じサイズの画像でクラス分類が行われる。そのため落ちた解像度を戻すためにUp-Samplingが行われる。
#2.6.5.1 Up-Sampling
convolutionやpoolnigで落ちた解像度を元に戻す手法である。正しく認識するためにある程度の受容野が必要となるため必要となる。
#2.6.5.2 Deconcolution
Up-Sampling手法の一つである。以下の手順で解像度を元に戻す。
1.特徴マップのpixel感覚をstrideだけ空ける。
2.特徴マップの周りに(kernelsize -1) - paddingだけ余白を作る。
3. 畳み込み演算を行う。
poolingで失われた情報が復元されるわけではない。
#2.6.5.3 Dilated Convolution
Convolutionの段階で受容野を広げるための工夫のこと。kernelの隙間を広げることで受容野を広げる。
#3 深層学習Day3の実装演習
#3.1 再帰型ニューラルネットワークの概念の実装
from google.colab import drive
drive.mount('/content/drive')
import sys
sys.path.append('/content/drive/MyDrive/DNN3-4')
import numpy as np
from common import functions
import matplotlib.pyplot as plt
# データを用意
# 2進数の桁数
binary_dim = 8
# 最大値 + 1
largest_number = pow(2, binary_dim)
# largest_numberまで2進数を用意
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)
input_layer_size = 2
hidden_layer_size = 16
output_layer_size = 1
weight_init_std = 1
learning_rate = 0.1
iters_num = 10000
plot_interval = 100
# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)
# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)
u = np.zeros((hidden_layer_size, binary_dim + 1))
z = np.zeros((hidden_layer_size, binary_dim + 1))
y = np.zeros((output_layer_size, binary_dim))
delta_out = np.zeros((output_layer_size, binary_dim))
delta = np.zeros((hidden_layer_size, binary_dim + 1))
all_losses = []
for i in range(iters_num):
# A, B初期化 (a + b = d)
a_int = np.random.randint(largest_number/2)
a_bin = binary[a_int] # binary encoding
b_int = np.random.randint(largest_number/2)
b_bin = binary[b_int] # binary encoding
# 正解データ
d_int = a_int + b_int
d_bin = binary[d_int]
# 出力バイナリ
out_bin = np.zeros_like(d_bin)
# 時系列全体の誤差
all_loss = 0
# 時系列ループ
for t in range(binary_dim):
# 入力値
X = np.array([a_bin[ - t - 1], b_bin[ - t - 1]]).reshape(1, -1)
# 時刻tにおける正解データ
dd = np.array([d_bin[binary_dim - t - 1]])
u[:,t+1] = np.dot(X, W_in) + np.dot(z[:,t].reshape(1, -1), W)
z[:,t+1] = functions.sigmoid(u[:,t+1])
y[:,t] = functions.sigmoid(np.dot(z[:,t+1].reshape(1, -1), W_out))
#誤差
loss = functions.mean_squared_error(dd, y[:,t])
delta_out[:,t] = functions.d_mean_squared_error(dd, y[:,t]) * functions.d_sigmoid(y[:,t])
all_loss += loss
out_bin[binary_dim - t - 1] = np.round(y[:,t])
for t in range(binary_dim)[::-1]:
X = np.array([a_bin[-t-1],b_bin[-t-1]]).reshape(1, -1)
delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_sigmoid(u[:,t+1])
# 勾配更新
W_out_grad += np.dot(z[:,t+1].reshape(-1,1), delta_out[:,t].reshape(-1,1))
W_grad += np.dot(z[:,t].reshape(-1,1), delta[:,t].reshape(1,-1))
W_in_grad += np.dot(X.T, delta[:,t].reshape(1,-1))
# 勾配適用
W_in -= learning_rate * W_in_grad
W_out -= learning_rate * W_out_grad
W -= learning_rate * W_grad
W_in_grad *= 0
W_out_grad *= 0
W_grad *= 0
if(i % plot_interval == 0):
all_losses.append(all_loss)
print("iters:" + str(i))
print("Loss:" + str(all_loss))
print("Pred:" + str(out_bin))
print("True:" + str(d_bin))
out_int = 0
for index,x in enumerate(reversed(out_bin)):
out_int += x * pow(2, index)
print(str(a_int) + " + " + str(b_int) + " = " + str(out_int))
print("------------")
lists = range(0, iters_num, plot_interval)
plt.plot(lists, all_losses, label="loss")
plt.show()
イテレーション5000程度からlossが殆ど0になっているため、学習が成功していることがわかる。
#3.2 LSTMの実装
from google.colab import drive
drive.mount('/content/drive')
import sys
sys.path.append('/content/drive/MyDrive/DNN3-4')
import numpy as np
from common import functions
import matplotlib.pyplot as plt
def d_tanh(x):
return 1/(np.cosh(x) ** 2)
# データを用意
# 2進数の桁数
binary_dim = 8
# 最大値 + 1
largest_number = pow(2, binary_dim)
# largest_numberまで2進数を用意
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)
input_layer_size = 2
hidden_layer_size = 16
output_layer_size = 1
weight_init_std = 1
learning_rate = 0.1
iters_num = 10000
plot_interval = 100
# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)
# Xavier
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size))
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size))
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size))
# He
# W_in = np.random.randn(input_layer_size, hidden_layer_size) / (np.sqrt(input_layer_size)) * np.sqrt(2)
# W_out = np.random.randn(hidden_layer_size, output_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)
# W = np.random.randn(hidden_layer_size, hidden_layer_size) / (np.sqrt(hidden_layer_size)) * np.sqrt(2)
# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)
u = np.zeros((hidden_layer_size, binary_dim + 1))
z = np.zeros((hidden_layer_size, binary_dim + 1))
y = np.zeros((output_layer_size, binary_dim))
delta_out = np.zeros((output_layer_size, binary_dim))
delta = np.zeros((hidden_layer_size, binary_dim + 1))
all_losses = []
for i in range(iters_num):
# A, B初期化 (a + b = d)
a_int = np.random.randint(largest_number/2)
a_bin = binary[a_int] # binary encoding
b_int = np.random.randint(largest_number/2)
b_bin = binary[b_int] # binary encoding
# 正解データ
d_int = a_int + b_int
d_bin = binary[d_int]
# 出力バイナリ
out_bin = np.zeros_like(d_bin)
# 時系列全体の誤差
all_loss = 0
# 時系列ループ
for t in range(binary_dim):
# 入力値
X = np.array([a_bin[ - t - 1], b_bin[ - t - 1]]).reshape(1, -1)
# 時刻tにおける正解データ
dd = np.array([d_bin[binary_dim - t - 1]])
u[:,t+1] = np.dot(X, W_in) + np.dot(z[:,t].reshape(1, -1), W)
z[:,t+1] = functions.sigmoid(u[:,t+1])
# z[:,t+1] = functions.relu(u[:,t+1])
# z[:,t+1] = np.tanh(u[:,t+1])
y[:,t] = functions.sigmoid(np.dot(z[:,t+1].reshape(1, -1), W_out))
#誤差
loss = functions.mean_squared_error(dd, y[:,t])
delta_out[:,t] = functions.d_mean_squared_error(dd, y[:,t]) * functions.d_sigmoid(y[:,t])
all_loss += loss
out_bin[binary_dim - t - 1] = np.round(y[:,t])
for t in range(binary_dim)[::-1]:
X = np.array([a_bin[-t-1],b_bin[-t-1]]).reshape(1, -1)
delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_sigmoid(u[:,t+1])
# delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_relu(u[:,t+1])
# delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * d_tanh(u[:,t+1])
# 勾配更新
W_out_grad += np.dot(z[:,t+1].reshape(-1,1), delta_out[:,t].reshape(-1,1))
W_grad += np.dot(z[:,t].reshape(-1,1), delta[:,t].reshape(1,-1))
W_in_grad += np.dot(X.T, delta[:,t].reshape(1,-1))
# 勾配適用
W_in -= learning_rate * W_in_grad
W_out -= learning_rate * W_out_grad
W -= learning_rate * W_grad
W_in_grad *= 0
W_out_grad *= 0
W_grad *= 0
if(i % plot_interval == 0):
all_losses.append(all_loss)
print("iters:" + str(i))
print("Loss:" + str(all_loss))
print("Pred:" + str(out_bin))
print("True:" + str(d_bin))
out_int = 0
for index,x in enumerate(reversed(out_bin)):
out_int += x * pow(2, index)
print(str(a_int) + " + " + str(b_int) + " = " + str(out_int))
print("------------")
lists = range(0, iters_num, plot_interval)
plt.plot(lists, all_losses, label="loss")
plt.show()
同様に学習できていることがわかる。
#3.3 GRUの実装
%tensorflow_version 1.x
from google.colab import drive
drive.mount('/content/drive')
import os
os.chdir('drive/My Drive/DNN3-4/lesson_3/3_2_tf_languagemodel/')
import tensorflow as tf
import numpy as np
import re
import glob
import collections
import random
import pickle
import time
import datetime
import os
# logging levelを変更
tf.logging.set_verbosity(tf.logging.ERROR)
class Corpus:
def __init__(self):
self.unknown_word_symbol = "<???>" # 出現回数の少ない単語は未知語として定義しておく
self.unknown_word_threshold = 3 # 未知語と定義する単語の出現回数の閾値
self.corpus_file = "./corpus/**/*.txt"
self.corpus_encoding = "utf-8"
self.dictionary_filename = "./data_for_predict/word_dict.dic"
self.chunk_size = 5
self.load_dict()
words = []
for filename in glob.glob(self.corpus_file, recursive=True):
with open(filename, "r", encoding=self.corpus_encoding) as f:
# word breaking
text = f.read()
# 全ての文字を小文字に統一し、改行をスペースに変換
text = text.lower().replace("\n", " ")
# 特定の文字以外の文字を空文字に置換する
text = re.sub(r"[^a-z '\-]", "", text)
# 複数のスペースはスペース一文字に変換
text = re.sub(r"[ ]+", " ", text)
# 前処理: '-' で始まる単語は無視する
words = [ word for word in text.split() if not word.startswith("-")]
self.data_n = len(words) - self.chunk_size
self.data = self.seq_to_matrix(words)
def prepare_data(self):
"""
訓練データとテストデータを準備する。
data_n = ( text データの総単語数 ) - chunk_size
input: (data_n, chunk_size, vocabulary_size)
output: (data_n, vocabulary_size)
"""
# 入力と出力の次元テンソルを準備
all_input = np.zeros([self.chunk_size, self.vocabulary_size, self.data_n])
all_output = np.zeros([self.vocabulary_size, self.data_n])
# 準備したテンソルに、コーパスの one-hot 表現(self.data) のデータを埋めていく
# i 番目から ( i + chunk_size - 1 ) 番目までの単語が1組の入力となる
# このときの出力は ( i + chunk_size ) 番目の単語
for i in range(self.data_n):
all_output[:, i] = self.data[:, i + self.chunk_size] # (i + chunk_size) 番目の単語の one-hot ベクトル
for j in range(self.chunk_size):
all_input[j, :, i] = self.data[:, i + self.chunk_size - j - 1]
# 後に使うデータ形式に合わせるために転置を取る
all_input = all_input.transpose([2, 0, 1])
all_output = all_output.transpose()
# 訓練データ:テストデータを 4 : 1 に分割する
training_num = ( self.data_n * 4 ) // 5
return all_input[:training_num], all_output[:training_num], all_input[training_num:], all_output[training_num:]
def build_dict(self):
# コーパス全体を見て、単語の出現回数をカウントする
counter = collections.Counter()
for filename in glob.glob(self.corpus_file, recursive=True):
with open(filename, "r", encoding=self.corpus_encoding) as f:
# word breaking
text = f.read()
# 全ての文字を小文字に統一し、改行をスペースに変換
text = text.lower().replace("\n", " ")
# 特定の文字以外の文字を空文字に置換する
text = re.sub(r"[^a-z '\-]", "", text)
# 複数のスペースはスペース一文字に変換
text = re.sub(r"[ ]+", " ", text)
# 前処理: '-' で始まる単語は無視する
words = [word for word in text.split() if not word.startswith("-")]
counter.update(words)
# 出現頻度の低い単語を一つの記号にまとめる
word_id = 0
dictionary = {}
for word, count in counter.items():
if count <= self.unknown_word_threshold:
continue
dictionary[word] = word_id
word_id += 1
dictionary[self.unknown_word_symbol] = word_id
print("総単語数:", len(dictionary))
# 辞書を pickle を使って保存しておく
with open(self.dictionary_filename, "wb") as f:
pickle.dump(dictionary, f)
print("Dictionary is saved to", self.dictionary_filename)
self.dictionary = dictionary
print(self.dictionary)
def load_dict(self):
with open(self.dictionary_filename, "rb") as f:
self.dictionary = pickle.load(f)
self.vocabulary_size = len(self.dictionary)
self.input_layer_size = len(self.dictionary)
self.output_layer_size = len(self.dictionary)
print("総単語数: ", self.input_layer_size)
def get_word_id(self, word):
# print(word)
# print(self.dictionary)
# print(self.unknown_word_symbol)
# print(self.dictionary[self.unknown_word_symbol])
# print(self.dictionary.get(word, self.dictionary[self.unknown_word_symbol]))
return self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])
# 入力された単語を one-hot ベクトルにする
def to_one_hot(self, word):
index = self.get_word_id(word)
data = np.zeros(self.vocabulary_size)
data[index] = 1
return data
def seq_to_matrix(self, seq):
print(seq)
data = np.array([self.to_one_hot(word) for word in seq]) # (data_n, vocabulary_size)
return data.transpose() # (vocabulary_size, data_n)
class Language:
"""
input layer: self.vocabulary_size
hidden layer: rnn_size = 30
output layer: self.vocabulary_size
"""
def __init__(self):
self.corpus = Corpus()
self.dictionary = self.corpus.dictionary
self.vocabulary_size = len(self.dictionary) # 単語数
self.input_layer_size = self.vocabulary_size # 入力層の数
self.hidden_layer_size = 30 # 隠れ層の RNN ユニットの数
self.output_layer_size = self.vocabulary_size # 出力層の数
self.batch_size = 128 # バッチサイズ
self.chunk_size = 5 # 展開するシーケンスの数。c_0, c_1, ..., c_(chunk_size - 1) を入力し、c_(chunk_size) 番目の単語の確率が出力される。
self.learning_rate = 0.005 # 学習率
self.epochs = 1000 # 学習するエポック数
self.forget_bias = 1.0 # LSTM における忘却ゲートのバイアス
self.model_filename = "./data_for_predict/predict_model.ckpt"
self.unknown_word_symbol = self.corpus.unknown_word_symbol
def inference(self, input_data, initial_state):
"""
:param input_data: (batch_size, chunk_size, vocabulary_size) 次元のテンソル
:param initial_state: (batch_size, hidden_layer_size) 次元の行列
:return:
"""
# 重みとバイアスの初期化
hidden_w = tf.Variable(tf.truncated_normal([self.input_layer_size, self.hidden_layer_size], stddev=0.01))
hidden_b = tf.Variable(tf.ones([self.hidden_layer_size]))
output_w = tf.Variable(tf.truncated_normal([self.hidden_layer_size, self.output_layer_size], stddev=0.01))
output_b = tf.Variable(tf.ones([self.output_layer_size]))
# BasicLSTMCell, BasicRNNCell は (batch_size, hidden_layer_size) が chunk_size 数ぶんつながったリストを入力とする。
# 現時点での入力データは (batch_size, chunk_size, input_layer_size) という3次元のテンソルなので
# tf.transpose や tf.reshape などを駆使してテンソルのサイズを調整する。
input_data = tf.transpose(input_data, [1, 0, 2]) # 転置。(chunk_size, batch_size, vocabulary_size)
input_data = tf.reshape(input_data, [-1, self.input_layer_size]) # 変形。(chunk_size * batch_size, input_layer_size)
input_data = tf.matmul(input_data, hidden_w) + hidden_b # 重みWとバイアスBを適用。 (chunk_size, batch_size, hidden_layer_size)
input_data = tf.split(input_data, self.chunk_size, 0) # リストに分割。chunk_size * (batch_size, hidden_layer_size)
# RNN のセルを定義する。RNN Cell の他に LSTM のセルや GRU のセルなどが利用できる。
cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size)
outputs, states = tf.nn.static_rnn(cell, input_data, initial_state=initial_state)
# 最後に隠れ層から出力層につながる重みとバイアスを処理する
# 最終的に softmax 関数で処理し、確率として解釈される。
# softmax 関数はこの関数の外で定義する。
output = tf.matmul(outputs[-1], output_w) + output_b
return output
def loss(self, logits, labels):
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))
return cost
def training(self, cost):
# 今回は最適化手法として Adam を選択する。
# ここの AdamOptimizer の部分を変えることで、Adagrad, Adadelta などの他の最適化手法を選択することができる
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(cost)
return optimizer
def train(self):
# 変数などの用意
input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
actual_labels = tf.placeholder("float", [None, self.output_layer_size])
initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
prediction = self.inference(input_data, initial_state)
cost = self.loss(prediction, actual_labels)
optimizer = self.training(cost)
correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(actual_labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# TensorBoard で可視化するため、クロスエントロピーをサマリーに追加
tf.summary.scalar("Cross entropy: ", cost)
summary = tf.summary.merge_all()
# 訓練・テストデータの用意
# corpus = Corpus()
trX, trY, teX, teY = self.corpus.prepare_data()
training_num = trX.shape[0]
# ログを保存するためのディレクトリ
timestamp = time.time()
dirname = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S")
# ここから実際に学習を走らせる
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
summary_writer = tf.summary.FileWriter("./log/" + dirname, sess.graph)
# エポックを回す
for epoch in range(self.epochs):
step = 0
epoch_loss = 0
epoch_acc = 0
# 訓練データをバッチサイズごとに分けて学習させる (= optimizer を走らせる)
# エポックごとの損失関数の合計値や(訓練データに対する)精度も計算しておく
while (step + 1) * self.batch_size < training_num:
start_idx = step * self.batch_size
end_idx = (step + 1) * self.batch_size
batch_xs = trX[start_idx:end_idx, :, :]
batch_ys = trY[start_idx:end_idx, :]
_, c, a = sess.run([optimizer, cost, accuracy],
feed_dict={input_data: batch_xs,
actual_labels: batch_ys,
initial_state: np.zeros([self.batch_size, self.hidden_layer_size])
}
)
epoch_loss += c
epoch_acc += a
step += 1
# コンソールに損失関数の値や精度を出力しておく
print("Epoch", epoch, "completed ouf of", self.epochs, "-- loss:", epoch_loss, " -- accuracy:",
epoch_acc / step)
# Epochが終わるごとにTensorBoard用に値を保存
summary_str = sess.run(summary, feed_dict={input_data: trX,
actual_labels: trY,
initial_state: np.zeros(
[trX.shape[0],
self.hidden_layer_size]
)
}
)
summary_writer.add_summary(summary_str, epoch)
summary_writer.flush()
# 学習したモデルも保存しておく
saver = tf.train.Saver()
saver.save(sess, self.model_filename)
# 最後にテストデータでの精度を計算して表示する
a = sess.run(accuracy, feed_dict={input_data: teX, actual_labels: teY,
initial_state: np.zeros([teX.shape[0], self.hidden_layer_size])})
print("Accuracy on test:", a)
def predict(self, seq):
"""
文章を入力したときに次に来る単語を予測する
:param seq: 予測したい単語の直前の文字列。chunk_size 以上の単語数が必要。
:return:
"""
# 最初に復元したい変数をすべて定義してしまいます
tf.reset_default_graph()
input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
prediction = tf.nn.softmax(self.inference(input_data, initial_state))
predicted_labels = tf.argmax(prediction, 1)
# 入力データの作成
# seq を one-hot 表現に変換する。
words = [word for word in seq.split() if not word.startswith("-")]
x = np.zeros([1, self.chunk_size, self.input_layer_size])
for i in range(self.chunk_size):
word = seq[len(words) - self.chunk_size + i]
index = self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])
x[0][i][index] = 1
feed_dict = {
input_data: x, # (1, chunk_size, vocabulary_size)
initial_state: np.zeros([1, self.hidden_layer_size])
}
# tf.Session()を用意
with tf.Session() as sess:
# 保存したモデルをロードする。ロード前にすべての変数を用意しておく必要がある。
saver = tf.train.Saver()
saver.restore(sess, self.model_filename)
# ロードしたモデルを使って予測結果を計算
u, v = sess.run([prediction, predicted_labels], feed_dict=feed_dict)
keys = list(self.dictionary.keys())
# コンソールに文字ごとの確率を表示
for i in range(self.vocabulary_size):
c = self.unknown_word_symbol if i == (self.vocabulary_size - 1) else keys[i]
print(c, ":", u[0][i])
print("Prediction:", seq + " " + ("<???>" if v[0] == (self.vocabulary_size - 1) else keys[v[0]]))
return u[0]
def build_dict():
cp = Corpus()
cp.build_dict()
if __name__ == "__main__":
#build_dict()
ln = Language()
# 学習するときに呼び出す
#ln.train()
# 保存したモデルを使って単語の予測をする
ln.predict("some of them looks like")
結果は長いため省略する。
#3.4 双方向RNNの実装
#Bidirectional(双方向RNN)
import keras
from tensorflow.keras.layers import Embedding, Dense, Bidirectional, LSTM
model = keras.Sequential()
#mask_zero = True(0を0埋め用の数値として扱ってくれる)
model.add(Embedding(17781, 64, mask_zero = True))
#LSTM層(return_sequences=True:完全な系列を返す(Flase:最後の出力を返す(LSTMを多層でできる)))
model.add(Bidirectional(LSTM(64, return_sequences=True)))
model.add(Bidirectional(LSTM(32)))
model.add(Dense(5, activation = 'softmax'))
#3.5
#4 深層学習Day4の実装演習
#4.1 Transformerの実装
! wget https://www.dropbox.com/s/9narw5x4uizmehh/utils.py
! mkdir images data
# data取得
! wget https://www.dropbox.com/s/o4kyc52a8we25wy/dev.en -P data/
! wget https://www.dropbox.com/s/kdgskm5hzg6znuc/dev.ja -P data/
! wget https://www.dropbox.com/s/gyyx4gohv9v65uh/test.en -P data/
! wget https://www.dropbox.com/s/hotxwbgoe2n013k/test.ja -P data/
! wget https://www.dropbox.com/s/5lsftkmb20ay9e1/train.en -P data/
! wget https://www.dropbox.com/s/ak53qirssci6f1j/train.ja -P data/
import time
import numpy as np
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from nltk import bleu_score
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from utils import Vocab
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(1)
random_state = 42
print(torch.__version__)
PAD = 0
UNK = 1
BOS = 2
EOS = 3
PAD_TOKEN = '<PAD>'
UNK_TOKEN = '<UNK>'
BOS_TOKEN = '<S>'
EOS_TOKEN = '</S>'
def load_data(file_path):
"""
テキストファイルからデータを読み込む
:param file_path: str, テキストファイルのパス
:return data: list, 文章(単語のリスト)のリスト
"""
data = []
for line in open(file_path, encoding='utf-8'):
words = line.strip().split() # スペースで単語を分割
data.append(words)
return data
train_X = load_data('./data/train.en')
train_Y = load_data('./data/train.ja')
# 訓練データと検証データに分割
train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
# データセットの中身を確認
print('train_X:', train_X[:5])
print('train_Y:', train_Y[:5])
MIN_COUNT = 2 # 語彙に含める単語の最低出現回数
word2id = {
PAD_TOKEN: PAD,
BOS_TOKEN: BOS,
EOS_TOKEN: EOS,
UNK_TOKEN: UNK,
}
vocab_X = Vocab(word2id=word2id)
vocab_Y = Vocab(word2id=word2id)
vocab_X.build_vocab(train_X, min_count=MIN_COUNT)
vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT)
vocab_size_X = len(vocab_X.id2word)
vocab_size_Y = len(vocab_Y.id2word)
def sentence_to_ids(vocab, sentence):
"""
単語のリストをインデックスのリストに変換する
:param vocab: Vocabのインスタンス
:param sentence: list of str
:return indices: list of int
"""
ids = [vocab.word2id.get(word, UNK) for word in sentence]
ids = [BOS] + ids + [EOS] # EOSを末尾に加える
return ids
train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X]
train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y]
valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X]
valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
class DataLoader(object):
def __init__(self, src_insts, tgt_insts, batch_size, shuffle=True):
"""
:param src_insts: list, 入力言語の文章(単語IDのリスト)のリスト
:param tgt_insts: list, 出力言語の文章(単語IDのリスト)のリスト
:param batch_size: int, バッチサイズ
:param shuffle: bool, サンプルの順番をシャッフルするか否か
"""
self.data = list(zip(src_insts, tgt_insts))
self.batch_size = batch_size
self.shuffle = shuffle
self.start_index = 0
self.reset()
def reset(self):
if self.shuffle:
self.data = shuffle(self.data, random_state=random_state)
self.start_index = 0
def __iter__(self):
return self
def __next__(self):
def preprocess_seqs(seqs):
# パディング
max_length = max([len(s) for s in seqs])
data = [s + [PAD] * (max_length - len(s)) for s in seqs]
# 単語の位置を表現するベクトルを作成
positions = [[pos+1 if w != PAD else 0 for pos, w in enumerate(seq)] for seq in data]
# テンソルに変換
data_tensor = torch.tensor(data, dtype=torch.long, device=device)
position_tensor = torch.tensor(positions, dtype=torch.long, device=device)
return data_tensor, position_tensor
# ポインタが最後まで到達したら初期化する
if self.start_index >= len(self.data):
self.reset()
raise StopIteration()
# バッチを取得して前処理
src_seqs, tgt_seqs = zip(*self.data[self.start_index:self.start_index+self.batch_size])
src_data, src_pos = preprocess_seqs(src_seqs)
tgt_data, tgt_pos = preprocess_seqs(tgt_seqs)
# ポインタを更新する
self.start_index += self.batch_size
return (src_data, src_pos), (tgt_data, tgt_pos)
def position_encoding_init(n_position, d_pos_vec):
"""
Positional Encodingのための行列の初期化を行う
:param n_position: int, 系列長
:param d_pos_vec: int, 隠れ層の次元数
:return torch.tensor, size=(n_position, d_pos_vec)
"""
# PADがある単語の位置はpos=0にしておき、position_encも0にする
position_enc = np.array([
[pos / np.power(10000, 2 * (j // 2) / d_pos_vec) for j in range(d_pos_vec)]
if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)])
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
return torch.tensor(position_enc, dtype=torch.float)
pe = position_encoding_init(50, 256).numpy()
plt.figure(figsize=(16,8))
sns.heatmap(pe, cmap='Blues')
plt.show()
class ScaledDotProductAttention(nn.Module):
def __init__(self, d_model, attn_dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param attn_dropout: float, ドロップアウト率
"""
super(ScaledDotProductAttention, self).__init__()
self.temper = np.power(d_model, 0.5) # スケーリング因子
self.dropout = nn.Dropout(attn_dropout)
self.softmax = nn.Softmax(dim=-1)
def forward(self, q, k, v, attn_mask):
"""
:param q: torch.tensor, queryベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:param k: torch.tensor, key,
size=(n_head*batch_size, len_k, d_model/n_head)
:param v: torch.tensor, valueベクトル,
size=(n_head*batch_size, len_v, d_model/n_head)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(n_head*batch_size, len_q, len_k)
:return output: 出力ベクトル,
size=(n_head*batch_size, len_q, d_model/n_head)
:return attn: Attention
size=(n_head*batch_size, len_q, len_k)
"""
# QとKの内積でAttentionの重みを求め、スケーリングする
attn = torch.bmm(q, k.transpose(1, 2)) / self.temper # (n_head*batch_size, len_q, len_k)
# Attentionをかけたくない部分がある場合は、その部分を負の無限大に飛ばしてSoftmaxの値が0になるようにする
attn.data.masked_fill_(attn_mask, -float('inf'))
attn = self.softmax(attn)
attn = self.dropout(attn)
output = torch.bmm(attn, v)
return output, attn
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
"""
:param n_head: int, ヘッド数
:param d_model: int, 隠れ層の次元数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(MultiHeadAttention, self).__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
# 各ヘッドごとに異なる重みで線形変換を行うための重み
# nn.Parameterを使うことで、Moduleのパラメータとして登録できる. TFでは更新が必要な変数はtf.Variableでラップするのでわかりやすい
self.w_qs = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_ks = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float))
self.w_vs = nn.Parameter(torch.empty([n_head, d_model, d_v], dtype=torch.float))
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.w_qs)
nn.init.xavier_normal_(self.w_ks)
nn.init.xavier_normal_(self.w_vs)
self.attention = ScaledDotProductAttention(d_model)
self.layer_norm = nn.LayerNorm(d_model) # 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正則化
self.proj = nn.Linear(n_head*d_v, d_model) # 複数ヘッド分のAttentionの結果を元のサイズに写像するための線形層
# nn.init.xavier_normal_で重みの値を初期化
nn.init.xavier_normal_(self.proj.weight)
self.dropout = nn.Dropout(dropout)
def forward(self, q, k, v, attn_mask=None):
"""
:param q: torch.tensor, queryベクトル,
size=(batch_size, len_q, d_model)
:param k: torch.tensor, key,
size=(batch_size, len_k, d_model)
:param v: torch.tensor, valueベクトル,
size=(batch_size, len_v, d_model)
:param attn_mask: torch.tensor, Attentionに適用するマスク,
size=(batch_size, len_q, len_k)
:return outputs: 出力ベクトル,
size=(batch_size, len_q, d_model)
:return attns: Attention
size=(n_head*batch_size, len_q, len_k)
"""
d_k, d_v = self.d_k, self.d_v
n_head = self.n_head
# residual connectionのための入力 出力に入力をそのまま加算する
residual = q
batch_size, len_q, d_model = q.size()
batch_size, len_k, d_model = k.size()
batch_size, len_v, d_model = v.size()
# 複数ヘッド化
# torch.repeat または .repeatで指定したdimに沿って同じテンソルを作成
q_s = q.repeat(n_head, 1, 1) # (n_head*batch_size, len_q, d_model)
k_s = k.repeat(n_head, 1, 1) # (n_head*batch_size, len_k, d_model)
v_s = v.repeat(n_head, 1, 1) # (n_head*batch_size, len_v, d_model)
# ヘッドごとに並列計算させるために、n_headをdim=0に、batch_sizeをdim=1に寄せる
q_s = q_s.view(n_head, -1, d_model) # (n_head, batch_size*len_q, d_model)
k_s = k_s.view(n_head, -1, d_model) # (n_head, batch_size*len_k, d_model)
v_s = v_s.view(n_head, -1, d_model) # (n_head, batch_size*len_v, d_model)
# 各ヘッドで線形変換を並列計算(p16左側`Linear`)
q_s = torch.bmm(q_s, self.w_qs) # (n_head, batch_size*len_q, d_k)
k_s = torch.bmm(k_s, self.w_ks) # (n_head, batch_size*len_k, d_k)
v_s = torch.bmm(v_s, self.w_vs) # (n_head, batch_size*len_v, d_v)
# Attentionは各バッチ各ヘッドごとに計算させるためにbatch_sizeをdim=0に寄せる
q_s = q_s.view(-1, len_q, d_k) # (n_head*batch_size, len_q, d_k)
k_s = k_s.view(-1, len_k, d_k) # (n_head*batch_size, len_k, d_k)
v_s = v_s.view(-1, len_v, d_v) # (n_head*batch_size, len_v, d_v)
# Attentionを計算(p16.左側`Scaled Dot-Product Attention * h`)
outputs, attns = self.attention(q_s, k_s, v_s, attn_mask=attn_mask.repeat(n_head, 1, 1))
# 各ヘッドの結果を連結(p16左側`Concat`)
# torch.splitでbatch_sizeごとのn_head個のテンソルに分割
outputs = torch.split(outputs, batch_size, dim=0) # (batch_size, len_q, d_model) * n_head
# dim=-1で連結
outputs = torch.cat(outputs, dim=-1) # (batch_size, len_q, d_model*n_head)
# residual connectionのために元の大きさに写像(p16左側`Linear`)
outputs = self.proj(outputs) # (batch_size, len_q, d_model)
outputs = self.dropout(outputs)
outputs = self.layer_norm(outputs + residual)
return outputs, attns
class PositionwiseFeedForward(nn.Module):
"""
:param d_hid: int, 隠れ層1層目の次元数
:param d_inner_hid: int, 隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
def __init__(self, d_hid, d_inner_hid, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
# window size 1のconv層を定義することでPosition wiseな全結合層を実現する.
self.w_1 = nn.Conv1d(d_hid, d_inner_hid, 1)
self.w_2 = nn.Conv1d(d_inner_hid, d_hid, 1)
self.layer_norm = nn.LayerNorm(d_hid)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
def forward(self, x):
"""
:param x: torch.tensor,
size=(batch_size, max_length, d_hid)
:return: torch.tensor,
size=(batch_size, max_length, d_hid)
"""
residual = x
output = self.relu(self.w_1(x.transpose(1, 2)))
output = self.w_2(output).transpose(2, 1)
output = self.dropout(output)
return self.layer_norm(output + residual)
def get_attn_padding_mask(seq_q, seq_k):
"""
keyのPADに対するattentionを0にするためのマスクを作成する
:param seq_q: tensor, queryの系列, size=(batch_size, len_q)
:param seq_k: tensor, keyの系列, size=(batch_size, len_k)
:return pad_attn_mask: tensor, size=(batch_size, len_q, len_k)
"""
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(PAD).unsqueeze(1) # (N, 1, len_k) PAD以外のidを全て0にする
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # (N, len_q, len_k)
return pad_attn_mask
_seq_q = torch.tensor([[1, 2, 3]])
_seq_k = torch.tensor([[4, 5, 6, 7, PAD]])
_mask = get_attn_padding_mask(_seq_q, _seq_k) # 行がquery、列がkeyに対応し、key側がPAD(=0)の時刻だけ1で他が0の行列ができる
print('query:\n', _seq_q)
print('key:\n', _seq_k)
print('mask:\n', _mask)
def get_attn_subsequent_mask(seq):
"""
未来の情報に対するattentionを0にするためのマスクを作成する
:param seq: tensor, size=(batch_size, length)
:return subsequent_mask: tensor, size=(batch_size, length, length)
"""
attn_shape = (seq.size(1), seq.size(1))
# 上三角行列(diagonal=1: 対角線より上が1で下が0)
subsequent_mask = torch.triu(torch.ones(attn_shape, dtype=torch.uint8, device=device), diagonal=1)
subsequent_mask = subsequent_mask.repeat(seq.size(0), 1, 1)
return subsequent_mask
_seq = torch.tensor([[1,2,3,4]])
_mask = get_attn_subsequent_mask(_seq) # 行がquery、列がkeyに対応し、queryより未来のkeyの値が1で他は0の行列ができいる
print('seq:\n', _seq)
print('mask:\n', _mask)
class EncoderLayer(nn.Module):
"""Encoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(EncoderLayer, self).__init__()
# Encoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(
n_head, d_model, d_k, d_v, dropout=dropout)
# Postionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
"""
:param enc_input: tensor, Encoderの入力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attn: tensor, EncoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてEncoderの入力(enc_input)が入る
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, attn_mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
class Encoder(nn.Module):
"""EncoderLayerブロックからなるEncoderのクラス"""
def __init__(
self, n_src_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Encoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=PAD)
# EncoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, src_seq, src_pos):
"""
:param src_seq: tensor, 入力系列,
size=(batch_size, max_length)
:param src_pos: tensor, 入力系列の各単語の位置情報,
size=(batch_size, max_length)
:return enc_output: tensor, Encoderの最終出力,
size=(batch_size, max_length, d_model)
:return enc_slf_attns: list, EncoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
enc_input = self.src_word_emb(src_seq)
# Positional EncodingのEmbeddingを加算する
enc_input += self.position_enc(src_pos)
enc_slf_attns = []
enc_output = enc_input
# key(=enc_input)のPADに対応する部分のみ1のマスクを作成
enc_slf_attn_mask = get_attn_padding_mask(src_seq, src_seq)
# n_layers個のEncoderLayerに入力を通す
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(
enc_output, slf_attn_mask=enc_slf_attn_mask)
enc_slf_attns += [enc_slf_attn]
return enc_output, enc_slf_attns
class DecoderLayer(nn.Module):
"""Decoderのブロックのクラス"""
def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1):
"""
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param dropout: float, ドロップアウト率
"""
super(DecoderLayer, self).__init__()
# Decoder内のSelf-Attention
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Encoder-Decoder間のSource-Target Attention
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
# Positionwise FFN
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout)
def forward(self, dec_input, enc_output, slf_attn_mask=None, dec_enc_attn_mask=None):
"""
:param dec_input: tensor, Decoderの入力,
size=(batch_size, max_length, d_model)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:param dec_enc_attn_mask: tensor, Soutce-Target Attentionの行列にかけるマスク,
size=(batch_size, len_q, len_k)
:return dec_output: tensor, Decoderの出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attn: tensor, DecoderのSelf Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
:return dec_enc_attn: tensor, DecoderのSoutce-Target Attentionの行列,
size=(n_head*batch_size, len_q, len_k)
"""
# Self-Attentionのquery, key, valueにはすべてDecoderの入力(dec_input)が入る
dec_output, dec_slf_attn = self.slf_attn(
dec_input, dec_input, dec_input, attn_mask=slf_attn_mask)
# Source-Target-AttentionのqueryにはDecoderの出力(dec_output), key, valueにはEncoderの出力(enc_output)が入る
dec_output, dec_enc_attn = self.enc_attn(
dec_output, enc_output, enc_output, attn_mask=dec_enc_attn_mask)
dec_output = self.pos_ffn(dec_output)
return dec_output, dec_slf_attn, dec_enc_attn
class Decoder(nn.Module):
"""DecoderLayerブロックからなるDecoderのクラス"""
def __init__(
self, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64,
d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1):
"""
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
"""
super(Decoder, self).__init__()
n_position = max_length + 1
self.max_length = max_length
self.d_model = d_model
# Positional Encodingを用いたEmbedding
self.position_enc = nn.Embedding(
n_position, d_word_vec, padding_idx=PAD)
self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec)
# 一般的なEmbedding
self.tgt_word_emb = nn.Embedding(
n_tgt_vocab, d_word_vec, padding_idx=PAD)
self.dropout = nn.Dropout(dropout)
# DecoderLayerをn_layers個積み重ねる
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
def forward(self, tgt_seq, tgt_pos, src_seq, enc_output):
"""
:param tgt_seq: tensor, 出力系列,
size=(batch_size, max_length)
:param tgt_pos: tensor, 出力系列の各単語の位置情報,
size=(batch_size, max_length)
:param src_seq: tensor, 入力系列,
size=(batch_size, n_src_vocab)
:param enc_output: tensor, Encoderの出力,
size=(batch_size, max_length, d_model)
:return dec_output: tensor, Decoderの最終出力,
size=(batch_size, max_length, d_model)
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
:return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト
"""
# 一般的な単語のEmbeddingを行う
dec_input = self.tgt_word_emb(tgt_seq)
# Positional EncodingのEmbeddingを加算する
dec_input += self.position_enc(tgt_pos)
# Self-Attention用のマスクを作成
# key(=dec_input)のPADに対応する部分が1のマスクと、queryから見たkeyの未来の情報に対応する部分が1のマスクのORをとる
dec_slf_attn_pad_mask = get_attn_padding_mask(tgt_seq, tgt_seq) # (N, max_length, max_length)
dec_slf_attn_sub_mask = get_attn_subsequent_mask(tgt_seq) # (N, max_length, max_length)
dec_slf_attn_mask = torch.gt(dec_slf_attn_pad_mask + dec_slf_attn_sub_mask, 0) # ORをとる
# key(=dec_input)のPADに対応する部分のみ1のマスクを作成
dec_enc_attn_pad_mask = get_attn_padding_mask(tgt_seq, src_seq) # (N, max_length, max_length)
dec_slf_attns, dec_enc_attns = [], []
dec_output = dec_input
# n_layers個のDecoderLayerに入力を通す
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output,
slf_attn_mask=dec_slf_attn_mask,
dec_enc_attn_mask=dec_enc_attn_pad_mask)
dec_slf_attns += [dec_slf_attn]
dec_enc_attns += [dec_enc_attn]
return dec_output, dec_slf_attns, dec_enc_attns
class Transformer(nn.Module):
"""Transformerのモデル全体のクラス"""
def __init__(
self, n_src_vocab, n_tgt_vocab, max_length, n_layers=6, n_head=8,
d_word_vec=512, d_model=512, d_inner_hid=1024, d_k=64, d_v=64,
dropout=0.1, proj_share_weight=True):
"""
:param n_src_vocab: int, 入力言語の語彙数
:param n_tgt_vocab: int, 出力言語の語彙数
:param max_length: int, 最大系列長
:param n_layers: int, レイヤー数
:param n_head: int, ヘッド数
:param d_k: int, keyベクトルの次元数
:param d_v: int, valueベクトルの次元数
:param d_word_vec: int, 単語の埋め込みの次元数
:param d_model: int, 隠れ層の次元数
:param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数
:param dropout: float, ドロップアウト率
:param proj_share_weight: bool, 出力言語の単語のEmbeddingと出力の写像で重みを共有する
"""
super(Transformer, self).__init__()
self.encoder = Encoder(
n_src_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.decoder = Decoder(
n_tgt_vocab, max_length, n_layers=n_layers, n_head=n_head,
d_word_vec=d_word_vec, d_model=d_model,
d_inner_hid=d_inner_hid, dropout=dropout)
self.tgt_word_proj = nn.Linear(d_model, n_tgt_vocab, bias=False)
nn.init.xavier_normal_(self.tgt_word_proj.weight)
self.dropout = nn.Dropout(dropout)
assert d_model == d_word_vec # 各モジュールの出力のサイズは揃える
if proj_share_weight:
# 出力言語の単語のEmbeddingと出力の写像で重みを共有する
assert d_model == d_word_vec
self.tgt_word_proj.weight = self.decoder.tgt_word_emb.weight
def get_trainable_parameters(self):
# Positional Encoding以外のパラメータを更新する
enc_freezed_param_ids = set(map(id, self.encoder.position_enc.parameters()))
dec_freezed_param_ids = set(map(id, self.decoder.position_enc.parameters()))
freezed_param_ids = enc_freezed_param_ids | dec_freezed_param_ids
return (p for p in self.parameters() if id(p) not in freezed_param_ids)
def forward(self, src, tgt):
src_seq, src_pos = src
tgt_seq, tgt_pos = tgt
src_seq = src_seq[:, 1:]
src_pos = src_pos[:, 1:]
tgt_seq = tgt_seq[:, :-1]
tgt_pos = tgt_pos[:, :-1]
enc_output, *_ = self.encoder(src_seq, src_pos)
dec_output, *_ = self.decoder(tgt_seq, tgt_pos, src_seq, enc_output)
seq_logit = self.tgt_word_proj(dec_output)
return seq_logit
def compute_loss(batch_X, batch_Y, model, criterion, optimizer=None, is_train=True):
# バッチの損失を計算
model.train(is_train)
pred_Y = model(batch_X, batch_Y)
gold = batch_Y[0][:, 1:].contiguous()
# gold = batch_Y[0].contiguous()
loss = criterion(pred_Y.view(-1, pred_Y.size(2)), gold.view(-1))
if is_train: # 訓練時はパラメータを更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
gold = gold.data.cpu().numpy().tolist()
pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().tolist()
return loss.item(), gold, pred
MAX_LENGTH = 20
batch_size = 64
num_epochs = 15
lr = 0.001
ckpt_path = 'transformer.pth'
max_length = MAX_LENGTH + 2
model_args = {
'n_src_vocab': vocab_size_X,
'n_tgt_vocab': vocab_size_Y,
'max_length': max_length,
'proj_share_weight': True,
'd_k': 32,
'd_v': 32,
'd_model': 128,
'd_word_vec': 128,
'd_inner_hid': 256,
'n_layers': 3,
'n_head': 6,
'dropout': 0.1,
}
# DataLoaderやモデルを定義
train_dataloader = DataLoader(
train_X, train_Y, batch_size
)
valid_dataloader = DataLoader(
valid_X, valid_Y, batch_size,
shuffle=False
)
model = Transformer(**model_args).to(device)
optimizer = optim.Adam(model.get_trainable_parameters(), lr=lr)
criterion = nn.CrossEntropyLoss(ignore_index=PAD, size_average=False).to(device)
def calc_bleu(refs, hyps):
"""
BLEUスコアを計算する関数
:param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:param hyps: list, モデルの生成した訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...])
:return: float, BLEUスコア(0~100)
"""
refs = [[ref[:ref.index(EOS)]] for ref in refs]
hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps]
return 100 * bleu_score.corpus_bleu(refs, hyps)
# 訓練
best_valid_bleu = 0.
for epoch in range(1, num_epochs+1):
start = time.time()
train_loss = 0.
train_refs = []
train_hyps = []
valid_loss = 0.
valid_refs = []
valid_hyps = []
# train
for batch in train_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, optimizer, is_train=True
)
train_loss += loss
train_refs += gold
train_hyps += pred
# valid
for batch in valid_dataloader:
batch_X, batch_Y = batch
loss, gold, pred = compute_loss(
batch_X, batch_Y, model, criterion, is_train=False
)
valid_loss += loss
valid_refs += gold
valid_hyps += pred
# 損失をサンプル数で割って正規化
train_loss /= len(train_dataloader.data)
valid_loss /= len(valid_dataloader.data)
# BLEUを計算
train_bleu = calc_bleu(train_refs, train_hyps)
valid_bleu = calc_bleu(valid_refs, valid_hyps)
# validationデータでBLEUが改善した場合にはモデルを保存
if valid_bleu > best_valid_bleu:
ckpt = model.state_dict()
torch.save(ckpt, ckpt_path)
best_valid_bleu = valid_bleu
elapsed_time = (time.time()-start) / 60
print('Epoch {} [{:.1f}min]: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format(
epoch, elapsed_time, train_loss, train_bleu, valid_loss, valid_bleu))
print('-'*80)
def test(model, src, max_length=20):
# 学習済みモデルで系列を生成する
model.eval()
src_seq, src_pos = src
batch_size = src_seq.size(0)
enc_output, enc_slf_attns = model.encoder(src_seq, src_pos)
tgt_seq = torch.full([batch_size, 1], BOS, dtype=torch.long, device=device)
tgt_pos = torch.arange(1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
# 時刻ごとに処理
for t in range(1, max_length+1):
dec_output, dec_slf_attns, dec_enc_attns = model.decoder(
tgt_seq, tgt_pos, src_seq, enc_output)
dec_output = model.tgt_word_proj(dec_output)
out = dec_output[:, -1, :].max(dim=-1)[1].unsqueeze(1)
# 自身の出力を次の時刻の入力にする
tgt_seq = torch.cat([tgt_seq, out], dim=-1)
tgt_pos = torch.arange(t+1, dtype=torch.long, device=device)
tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1)
return tgt_seq[:, 1:], enc_slf_attns, dec_slf_attns, dec_enc_attns
def ids_to_sentence(vocab, ids):
# IDのリストを単語のリストに変換する
return [vocab.id2word[_id] for _id in ids]
def trim_eos(ids):
# IDのリストからEOS以降の単語を除外する
if EOS in ids:
return ids[:ids.index(EOS)]
else:
return ids
# 学習済みモデルの読み込み
model = Transformer(**model_args).to(device)
ckpt = torch.load(ckpt_path)
model.load_state_dict(ckpt)
# テストデータの読み込み
test_X = load_data('./data/dev.en')
test_Y = load_data('./data/dev.ja')
test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X]
test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
test_dataloader = DataLoader(
test_X, test_Y, 1,
shuffle=False
)
src, tgt = next(test_dataloader)
src_ids = src[0][0].cpu().numpy()
tgt_ids = tgt[0][0].cpu().numpy()
print('src: {}'.format(' '.join(ids_to_sentence(vocab_X, src_ids[1:-1]))))
print('tgt: {}'.format(' '.join(ids_to_sentence(vocab_Y, tgt_ids[1:-1]))))
preds, enc_slf_attns, dec_slf_attns, dec_enc_attns = test(model, src)
pred_ids = preds[0].data.cpu().numpy().tolist()
print('out: {}'.format(' '.join(ids_to_sentence(vocab_Y, trim_eos(pred_ids)))))
# BLEUの評価
test_dataloader = DataLoader(
test_X, test_Y, 128,
shuffle=False
)
refs_list = []
hyp_list = []
for batch in test_dataloader:
batch_X, batch_Y = batch
preds, *_ = test(model, batch_X)
preds = preds.data.cpu().numpy().tolist()
refs = batch_Y[0].data.cpu().numpy()[:, 1:].tolist()
refs_list += refs
hyp_list += preds
bleu = calc_bleu(refs_list, hyp_list)
print(bleu)
#5 深層学習Day3の確認テスト
#5.1 確認テスト1
サイズ55の入力画像を、サイズ33のフィルタで畳み込んだ時の出力画像のサイズを答えよ。なおストライドは2、パディングは1とする。
3*3
#5.2 確認テスト2
RNNのネットワークには大きくわけて3つの重みがある。1つは入力から現在の中間層を定義する際にかけられる重み、1つは中間層から出力を定義する際にかけられる重みである。残り1つの重みについて説明せよ。
前の中間層の重み。
#5.3 確認テスト3
微分の連鎖率の原理を使い、dz/dxを求めよ。
z = t^2\\
t = x + y\\
\frac{dz}{dx} = \frac{dz}{dt} \frac{dt}{dx} = 2t * 1 = 2(x+y)
#5.4 確認テスト4
下図のy1をx・s0・s1・win・w・woutを用いて数式で表せ。※バイアスは任意の文字で定義せよ。※また中間層の出力にシグモイド関数g(x)を作用させよ。
y_1 = g(w_{out} z_1 + b)\\
z_1 = f(wz_0 + w_{in}x_1 + c)
#5.5 確認テスト5
シグモイド関数を微分した時、入力値が0の時に最大値をとる。その値として正しいものを選択肢から選べ。
(2) 0.25
#5.6 確認テスト6
以下の文章をLSTMに入力し空欄に当てはまる単語を予測したいとする。文中の「とても」という言葉は空欄の予測においてなくなっても影響を及ぼさないと考えられる。このような場合、どのゲートが作用すると考えられるか。
「映画おもしろかったね。ところで、とてもお腹が空いたから何か____。」
忘却ゲート
#5.7 確認テスト7
LSTMとCECが抱える課題について、それぞれ簡潔に述べよ。
CECには学習機能がない。学習能力を持たせるためにLSTMでは、入力ゲート、出力ゲート、忘却ゲートがあり、パラメータが多いため、計算負荷が高い。
#5.8 確認テスト8
LSTMとGRUの違いを簡潔に述べよ。
GRUはLSTMにあるCECがなく、更新ゲート、とリセットゲートがあり、ゲート数等からパラメータが少なく計算負荷が低い。
#5.9 確認テスト9
下記の選択肢から、seq2seqについて説明しているものを選べ。
(1)時刻に関して順方向と逆方向のRNNを構成し、それら2つの中間層表現を特徴量として利用するものである。
(2)RNNを用いたEncoder-Decoderモデルの一種であり、機械翻訳などのモデルに使われる。
(3)構文木などの木構造に対して、隣接単語から表現ベクトル(フレーズ)を作るという演算を再帰的に行い(重みは共通)、文全体の表現ベクトルを得るニューラルネットワークである。
(4)RNNの一種であり、単純なRNNにおいて問題となる勾配消失問題をCECとゲートの概念を導入することで解決したものである。
(2)
#5.10 確認テスト10
seq2seqとHRED、HREDとVHREDの違いを簡潔に述べよ。
HREDはseq2seqと違い、過去の文脈を考慮できる。
HREDは相槌程度の答えしか出せないのに対し、VHREDはバリエーションをもった応対ができる。
#5.11 確認テスト11
VAEに関する下記の説明文中の空欄に当てはまる言葉を答えよ。
自己符号化器の潜在変数に____を導入したもの。
確率分布
#5.12 確認テスト12
RNNとword2vec、seq2seqとAttentionの違いを簡潔に述べよ。
RNNは時系列データのニューラルネット、word2vecは単語の分散表現ベクトルを得る手法。
seq2seqは1つの時系列データから別の時系列データを得るネットワーク、Attentionは時系列データの中身に対し、関連性に重みを付ける手法。
#6 深層学習Day4の確認テスト
#6.1 確認テスト1
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/1591221/66eba308-9df7-9d46-ed89-
596e19291904.png)
(い)H * W * C * K * K
(う)H * W * C * M
Dilated casual convolution
パラメータに対する受容野が広い