9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

論文の勉強15 Transformer(Encoderのみ)

Last updated at Posted at 2022-04-04

Transformerについて構造の説明と実装のメモ書きです。
ただし、論文すべてを見るわけでなく構造のところを中心に見ていきます。

transformerは画像認識分野でも使われるようになっています。
ここでは分類モデルを意識してEncoder部分のみの実装となります。

勉強のメモ書き程度でありあまり正確に実装されていませんので、ご了承ください。
自分の実力不足で読み解けなくなってきています。難しいです。

以下の論文について実装を行っていきます。

タイトル:Attentino Is All You Need

以下のサイトや本を参考にしています。

Attention

Encoder-Decoderモデルとなっています。
Encoderでは入力$(x_1,\cdots,x_n)$を中間表現$(z_1,\cdots,z_n)$に変換します。
Decoderでは、それを受け取って出力$(y_1,\cdots,y_n)$を生成します。

image.png

Encoder

$N=6$個の独立したレイヤで構成されます。
それぞれのレイヤは2つのサブレイヤを持ち、1つはmulti-head self-attentionであり、2つ目は全結合のfeed-forward networkとなります。
それぞれ残差接続(residual connection)を持ち、加算後はLayer Normalizationを適用します。
すべてのサブレイヤの出力の次元は$d_{model}=512$となります。

Decoder

$N=6$個の独立したレイヤで構成されます。
Encoder同様の2つのサブレイヤに加えて、3つ目のサブレイヤが挿入されています。
これはEncoderの出力を受け取るmulti-head attentionです。
Decoderにおいては、未来の情報を入れないようにmask処理をします。
この処理は、図でいうとmasked multi-head attentionで行われます。

Attention

attentionはqueryとkey-valueのセットのマッピングとして表されます。
query,key,value,そして出力はすべてベクトルとなります。
出力はvakueのwieghted sum(加重和)であり、重みはqueryとkeyから計算されます。

Scaled Dot-Product Attention

下の図で表されるScaled Dot-Product Attentionを考えます。
queryとkeyの次元は$d_k$、valueの次元は$d_v$です。
queryとkeyのドット積を$\sqrt{d_k}$で割ったものをsoftmax関数に入れることで重みを計算します。
ベクトルであったquery,key,valueをそれぞれまとめて行列$Q,K,V$と表します。
そしてScaled Dot-Product Attentionの出力を
$$
Attention(Q,K,V)=softmax\bigl(\frac{QK^T}{\sqrt{d_k}} \bigr)V
$$
と計算します。

Multi-Head Attention

query,key,valueがすべて$d_{model}$次元で処理するより、線形射影によりそれぞれ$h_k,h_k,h_v$次元にした方が効果的であることが分かりました。これらは$d_{model}$の$h$分の1となっています。
それぞれAttention層で並行して処理をして$d_v$次元の出力を得ます。これらを結合して最終的な出力とします。

$$
MultiHead(Q,K,V)=Concat(head_1,\cdots,head_h)W^O\
where\ head_i=Attention(QW_i^Q,KW_i^K,VW_i^V)
$$
ここで、$W_i^Q\in R^{d_{model}×d_k},W_i^K\in R^{d_{model}×d_k},W_i^V\in R^{d_{model}×d_v}$、そして$W^O=R^{hd_v×d_{model}}$です。
今回は$h=8$で、さらに$d_k=d_v=d_{model}/h=64$とします。

image.png

Position-wise Feed-Foward Networks

もう1つのサブレイヤであるFeed-Foward Networksを説明します。
このレイヤではすべての位置に対して独立に全結合層が適用されます。
つまり1×1のConvolution層が使用されます。
ここでは2つの全結合層を使用して、1つ目のConv層で$d_{ff}=2048$、2つ目のConv層で$d_{model}=512$へと次元を変換します。
また、1つ目のConv層ではReLU関数を使用します。
$$
FFN(x)=max(0,xW_1+b)W_2+b_2
$$

Embedding and Softmax

inputとoutputに対して、$d_{model}$次元へ変換する同じ学習済みの重みを使用します。
decoderの出力に対してはSoftmax関数により次の単語の出現確率を計算します。ここでもinput,outputと同じ重みを使用します。
また、embeddingレイヤでは重みに$\sqrt{d_{model}}$を掛けたものを使用します。

Positional Encoding

RNN・CNNがないため、単語の位置情報を与える必要があります。
input.outputのEmbedding層の後にpositional encodingを追加します。
これは$d_{model}$の次元を持ち、embeddingされたinputやoutputと足し合わせます。
様々なものがありますがここでは,
$$
PE_{(pos,2i)}=\sin{(pos/10000^{2i/d_{model}})}\
PE_{(pos,2i+1)}=\cos{(pos/10000^{2i/d_{model}})}
$$
を使用します。
$pos$は位置を表し、$i$は次元を表します。

学習

最適化手法としてAdamを使用し、$\beta_1=0.9,\beta_2=0.98,\epsilon=10^{-9}$です。
学習率は
$$
lrate=d_{model}^{-0.5}\min(step_num^{-0.5},step_num・warmup_steps^{-1.5})
$$
に従うものとします。
warmup_stepsでは学習率は増加し、その後減少していきます。
今回はwaramup_steps=4000としました。

正則化(dropout・label smoothing)

各サブレイヤの後にdropout層を入れました。また、encoder・decoderそれぞれでembedding・positionnal encodingのあとにもDropoutを入れています。
確率は$P_{drop}=0.1$とします。

label smoothingも採用しています。
$\epsilon_{ls}=0.1$とします。

keras

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Layer, Input, Dense, Conv1D, Activation, Dropout, LayerNormalization, Reshape, Embedding, MultiHeadAttention
from tensorflow.keras import activations
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
import math

MultiHeadAttentionを実装します。
可視化のためにAttention weightの出力を選択できるようにします。

class MultiHeadAttention(Layer):
    '''
    Multi-Head Attentionレイヤ
    
    hidden_dim : Embeddingされた単語ベクトルの長さ
    heads_num : マルチヘッドAttentionのヘッド数
       ※hidden_numはheads_numで割り切れえる値とすること
    drop_rate : 出力のDropout率

    model = MultiheadAttention(
        hidden_dim = 512,
        head_num = 8,
        drop_rate = 0.5
    )
    '''
    def __init__(self, hidden_dim, heads_num, drop_rate=0.5):
        super(MultiHeadAttention, self).__init__()
        # 入力の線形変換
        # 重み行列は[hidden_dim, hidden_dim]
        self.query = Conv1D(hidden_dim, kernel_size=1)
        self.key   = Conv1D(hidden_dim, kernel_size=1)
        self.value = Conv1D(hidden_dim, kernel_size=1)
        
        # 出力の線形変換
        self.projection = Conv1D(hidden_dim, kernel_size=1)
        
        # 出力のDropout
        self.drop = Dropout(drop_rate)
        
        self.nf = hidden_dim
        self.nh = heads_num
    
    def atten(self, query, key, value, attention_mask, training):
        """
        Attention
        
        query, key, value : 入力
        attention_mask : attention weight に適用される mask
        """
        # 各値を取得
        shape = query.shape.as_list()
        batch_size = -1 if shape[0] is None else shape[0]
        token_num = shape[2] # トークン列数
        hidden_dim = shape[1]*shape[3] # 入力チャンネル数
        
        # ここで q と k の内積を取ることで、query と key の単語間の関連度のようなものを計算します。
        # tf.matmulで最後の2成分について積を計算(それ以外は形がそろっている必要あり)
        # transpose_bで転置
        # [token_num, hidden_dim/head_num] @ [hidden_dim/head_num, token_num] = [token_num, token_num]
        scores = tf.matmul(query, key, transpose_b=True)
        
        # scoreをhidden_dimの平方根割る
        scores = tf.multiply(scores, tf.math.rsqrt(tf.cast(hidden_dim, tf.float32)))
        
        # Attention Maskがあればscoreに加算
        # attention_mask: [batch_size, token_num, token_num] 
        # マスク(参照しない部分)の場所に1、使用する部分は0とする
        # 0の部分を -無限大にする(softmax(-無限大)=0となる)
        # 1. PADを無視
        # 2. DecoderのSelf-Attentionで未来の情報を参照できないようにする
        if attention_mask is not None:
            scores += attention_mask * -1e9
        # softmax を取ることで正規化します
        # input(query) の各単語に対して memory(key) の各単語のどこから情報を引いてくるかの重み
        atten_weight = tf.nn.softmax(scores)
        
        # 重みに従って value から情報を引いてきます
        # [token_num, token_num] @ [token_num, hidden_dim/head_num] = [token_num, hidden_dim/head_num]
        # input(query) の単語ごとに memory(value)の各単語 に attention_weight を掛け合わせて足し合わせた ベクトル(分散表現の重み付き和)を計算
        context = tf.matmul(atten_weight, value)
        
        # 各ヘッドの結合(reshape)
        # 入力と同じ形に変換する
        context = tf.transpose(context, [0, 2, 1, 3])
        context = tf.reshape(context, (batch_size, token_num, hidden_dim))
        
        # 線形変換
        context = self.projection(context, training=training)
        
        return self.drop(context, training=training), atten_weight

    def _split(self, x):
        """
        query, key, valueを分割する
        
        入力 shape: [batch_size, length, hidden_dim] の時
        出力 shape: [batch_size, head_num, length, hidden_dim//head_num]
        """
        # 各値を取得
        hidden_dim = self.nf
        heads_num = self.nh
        shape = x.shape.as_list()
        batch_size = -1 if shape[0] is None else shape[0]
        token_num = shape[1] # トークン列数
        
        # [batch_size, token_num, hidden_dim] -> [batch_size, token_num, head_num, hidden_dim/head_num]
        # splitだが実際は次元を拡張する処理
        x = tf.reshape(x, (batch_size, token_num, heads_num, int(hidden_dim/heads_num)))
        
        # [batch_size, token_num, head_num, hidden_dim/head_num] -> [batch_size, head_num, token_num, hidden_dim/head_num]
        x = tf.transpose(x, [0, 2, 1, 3])
        return x
    
    def call(self, x, training, memory=None, attention_mask=None, return_attention_scores=False):
        """
        モデルの実行
        
        input : 入力(query) [batch_size, token_num, hidden_dim]
        memory : 入力(key, value) [batch_size, token_num, hidden_dim]
        attention_mask : attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 1 となるようなもの(Decoderで使用)
        return_attention_scores : attention weightを出力するか
        """
        # memoryが入力されない場合、memory=input(Self Attention)とする
        if memory is None:
            memory = x
        
        # input -> query
        # memory -> key, value
        # [batch_size, token_num, hidden_dim] @ [hidden_dim, hidden_dim] -> [batch_size, token_num, hidden_dim] 
        query = self.query(x)
        key = self.key(memory)
        value = self.value(memory)
        
        # ヘッド数に分割する
        # 実際はreshapeで次数を1つ増やす
        # [batch_size, token_num, hidden_dim] -> [batch_size, head_num, token_num, hidden_dim/head_num]
        query = self._split(query)
        key = self._split(key)
        value = self._split(value)
        
        # attention
        # 入力と同じ形の出力
        # context: [batch_size, token_num, hidden_dim]
        # score_weightsはEncoderではNoneとする
        context, attn_weights = self.atten(query, key, value, attention_mask, training)
        if not return_attention_scores:
            return context
        else:
            return context, attn_weights

FeedForwardNetworkの実装をします。
2つのDense Layerからなります。

class FeedForwardNetwork(Layer):
    '''
    Position-wise Feedforward Neural Network
    transformer blockで使用される全結合層
    '''
    def __init__(self, hidden_dim, drop_rate):
        super().__init__()
        # 2層構造
        # 1層目:チャンネル数を増加させる
        self.filter_dense_layer = Dense(hidden_dim * 4, use_bias=True, activation='relu')
        
        # 2層目:元のチャンネル数に戻す
        self.output_dense_layer = Dense(hidden_dim, use_bias=True)
        self.drop = Dropout(drop_rate)

    def call(self, x, training):
        '''
        入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        '''
        
        # [batch_size, token_num, hidden_dim] -> [batch_size, token_num, 4*hidden_dim]
        x = self.filter_dense_layer(x)
        x = self.drop(x, training=training)
        
        # [batch_size, token_num, 4*hidden_dim] -> [batch_size, token_num, hidden_dim]
        return self.output_dense_layer(x)

残差接続のためのResidualNormalizationWrapperを実装します。

class ResidualNormalizationWrapper(Layer):
    '''
    残差接続
    output: input + SubLayer(input)
    '''
    def __init__(self, layer, drop_rate):
        super().__init__()
        self.layer = layer # SubLayer : ここではAttentionかFFN
        self.layer_normalization = LayerNormalization()
        self.drop = Dropout(drop_rate)

    def call(self, x, training, memory=None, attention_mask=None, return_attention_scores=None):
        """
        AttentionもFFNも入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        """
        
        params = {}
        if memory is not None:
            params['memory'] = memory
        if attention_mask is not None:
            params['attention_mask'] = attention_mask
        if return_attention_scores:
            params['return_attention_scores'] = return_attention_scores
        
        out = self.layer_normalization(x)
        if return_attention_scores:
            out, attn_weights = self.layer(out, training, **params)
            out = self.drop(out, training=training)
            return x + out, attn_weights
        else:
            out = self.layer(out, training, **params)
            out = self.drop(out, training=training)
            return x + out

PositionalEncodingを行うレイヤの実装をします。
いろいろ実装方法はあるようです。

class AddPositionalEncoding(Layer):
    '''
    入力テンソルに対し、位置の情報を付与して返すレイヤー
    see: https://arxiv.org/pdf/1706.03762.pdf

    PE_{pos, 2i}   = sin(pos / 10000^{2i / d_model})
    PE_{pos, 2i+1} = cos(pos / 10000^{2i / d_model})
    '''
    def call(self, inputs):
        fl_type = inputs.dtype
        batch_size, max_length, depth = tf.unstack(tf.shape(inputs))

        depth_counter = tf.range(depth) // 2 * 2  # 0, 0, 2, 2, 4, ...
        depth_matrix = tf.tile(tf.expand_dims(depth_counter, 0), [max_length, 1])  # [max_length, depth]
        depth_matrix = tf.pow(10000.0, tf.cast(depth_matrix / depth, fl_type))  # [max_length, depth]

        # cos(x) == sin(x + π/2)
        phase = tf.cast(tf.range(depth) % 2, fl_type) * math.pi / 2  # 0, π/2, 0, π/2, ...
        phase_matrix = tf.tile(tf.expand_dims(phase, 0), [max_length, 1])  # [max_length, depth]

        pos_counter = tf.range(max_length)
        pos_matrix = tf.cast(tf.tile(tf.expand_dims(pos_counter, 1), [1, depth]), fl_type)  # [max_length, depth]

        positional_encoding = tf.sin(pos_matrix / depth_matrix + phase_matrix)
        # [batch_size, max_length, depth]
        positional_encoding = tf.tile(tf.expand_dims(positional_encoding, 0), [batch_size, 1, 1])

        return inputs + positional_encoding

PositionalEncodingの出力を確認します。

import matplotlib.pyplot as plt

layer = AddPositionalEncoding()
test = tf.constant(np.zeros((32,128,512)), tf.float32)
layer(test).shape

plt.imshow(layer(test).numpy()[0]);

image.png

Embedding レイヤの実装をします。
学習済みの重みを指定できるようにしましたが、今回は使用しません。

class TokenEmbedding(Layer):
    def __init__(self, vocab_size, embedding_dim, embeddings=None, PAD_ID=0):
        # vocab_size: 単語の総数
        # embedding_dim: Embeddingの次数
        super().__init__()
        self.pad_id = PAD_ID
        self.embedding_dim = embedding_dim
        
        self.embedding = Embedding(vocab_size, embedding_dim)
        
        if embeddings is None:
            self.embedding = Embedding(input_dim=vocab_size,
                                       output_dim=embedding_dim,
                                       mask_zero=True,
                                       trainable=True)
        else:
            self.embedding = Embedding(input_dim=embeddings.shape[0],
                                       output_dim=embeddings.shape[1],
                                       mask_zero=True,
                                       trainable=True,
                                       weights=[embeddings])

    def call(self, x):
        embedding = self.embedding(x)
        return embedding * self.embedding_dim ** 0.5

TransformerBlockを実装します。
MultiHeadAttentionとFeedForwardNetworkから構成されます。

class TransformerBlock(Layer):
    """
    transformer block : before ->[attention -> FF]-> next
    それぞれ残差接続とLayerNormalizationの処理が含まれる
    """
    def __init__(self, hidden_dim, heads_num, drop_rate=0.1):
        """
        hidden_numはheads_numで割り切れえる値とすること
        """
        super().__init__()
        self.atten = ResidualNormalizationWrapper(
            layer = MultiHeadAttention(hidden_dim = hidden_dim, heads_num = heads_num, drop_rate = drop_rate),
            drop_rate = drop_rate)
        
        self.ffn = ResidualNormalizationWrapper(
            layer = FeedForwardNetwork(hidden_dim = hidden_dim, drop_rate = drop_rate),
            drop_rate = drop_rate)
    
    def call(self, input, training, memory=None, attention_mask=None, return_attention_scores=False):
        """
        入力と出力で形式が変わらない
        [batch_size, token_num, hidden_dim]
        """
        
        if return_attention_scores:
            x, attn_weights = self.atten(input,training, memory, attention_mask, return_attention_scores)
            x = self.ffn(x)
            return x, attn_weights
        else:
            x = self.atten(input, training, memory, attention_mask, return_attention_scores)
            x = self.ffn(x)
            return x

Encoderの実装をします。
TransformerBlockを繰り替えし適用します。

class Encoder(Layer):
    '''
    TransformerのEncoder
    '''
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            embeddings=None
    ):
        super().__init__()
        self.hopping_num = hopping_num
        
        # Embedding層
        self.token_embedding = TokenEmbedding(vocab_size, hidden_dim, embeddings)
        # Position Embedding
        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = Dropout(drop_rate)

        # Multi-head Attentionの繰り返し(hopping)のリスト
        self.attention_block_list = [TransformerBlock(hidden_dim, heads_num) for _ in range(hopping_num)]
        self.output_normalization = LayerNormalization()

    def call(
            self,
            input,
            training,
            memory=None,
            attention_mask=None,
            return_attention_scores=False
    ):
        '''
        input: 入力 [batch_size, length]
        memory: 入力 [batch_size, length]
        attention_mask: attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 0 となるようなもの(Decoderで使用)
        return_attention_scores : attention weightを出力するか
        出力 [batch_size, length, hidden_dim]
        '''
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        embedded_input = self.token_embedding(input)
        # Positional Embedding
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input, training=training)
        
        
        if return_attention_scores:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query, atten_weights = self.attention_block_list[i](query, training, memory, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query), atten_weights
        else:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query = self.attention_block_list[i](query, training, memory, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query)

Encoderからの出力を全結合層に入力することで、分類モデルを作成します。

class AttentionClassifier(Model):
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            NUMLABELS, # クラス数
            embeddings = None,
            PAD_ID = 0
    ):
        super().__init__()
        self.PAD_ID = PAD_ID
        
        self.encoder = Encoder(vocab_size, hopping_num, heads_num, hidden_dim, token_num, drop_rate, embeddings)
        self.dense1 = Dense(hidden_dim, activation='tanh')
        self.dropout1 = Dropout(drop_rate)   
        self.final_layer = Dense(NUMLABELS, activation='softmax')

    def call(self, x, training, return_attention_scores=False):
        self_attention_mask=self._create_enc_attention_mask(x)
        
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        if return_attention_scores:
            enc_output, atten_weights = self.encoder(x, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        else:
            enc_output = self.encoder(x, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        
        # 文頭の重みを使用 [batch_size, 0, hidden_dim]
        # [batch_size, hidden_dim] -> [batch_size, hidden_dim]
        enc_output = self.dense1(enc_output[:, 0, :])
        enc_output = self.dropout1(enc_output)
        
        # [batch_size, hidden_dim] -> [batch_size, NUMLABELS]
        final_output = self.final_layer(enc_output)

        if return_attention_scores:
            return final_output, atten_weights
        else:
            return final_output
    
    def _create_enc_attention_mask(self, x):
        batch_size, length = tf.unstack(tf.shape(x))
        # マスクする部分を1とする
        pad_array = tf.cast(tf.equal(x, self.PAD_ID), tf.float32)  # [batch_size, token_num]
        
        # shape broadcasting で [batch_size, head_num, token_num, token_num] になる
        return tf.reshape(pad_array, [batch_size, 1, 1, length])

データの準備をします。
自分で集めた居酒屋の口コミデータです。
まずはボキャブラリの関数を定義します。

import re
import pandas as pd
from janome.tokenizer import Tokenizer
j_t = Tokenizer(wakati=True)

class Vocab(object):
    def __init__(self):
        self.w2i = {}
        self.i2w = {}
        self.special_chars = ['<pad>', '<s>', '</s>', '<unk>']
        self.bos_char = self.special_chars[1]
        self.eos_char = self.special_chars[2]
        self.oov_char = self.special_chars[3]

    def fit(self, sentences, path=None):
        self._words = set()

        #with open(path, 'r',encoding="utf-8") as f:
        #    sentences = f.read().splitlines()

        for sentence in sentences:
            #self._words.update(sentence.split())
            self._words.update(sentence)

        self.w2i = {w: (i + len(self.special_chars))
                    for i, w in enumerate(self._words)}

        for i, w in enumerate(self.special_chars):
            self.w2i[w] = i

        self.i2w = {i: w for w, i in self.w2i.items()}

    def transform(self, sentences, path=None, bos=False, eos=False):
        output = []

        #with open(path, 'r',encoding="utf-8") as f:
        #    sentences = f.read().splitlines()

        for sentence in sentences:
            #sentence = sentence.split()
            if bos:
                sentence = [self.bos_char] + sentence
            if eos:
                sentence = sentence + [self.eos_char]
            output.append(self.encode(sentence))

        return output

    def encode(self, sentence):
        output = []

        for w in sentence:
            if w not in self.w2i:
                idx = self.w2i[self.oov_char]
            else:
                idx = self.w2i[w]
            output.append(idx)

        return output

    def decode(self, sentence):
        return [self.i2w[id] for id in sentence]


def tokenizer_janome(text):
    return [tok for tok in j_t.tokenize(text, wakati=True)]

def preprocessing_text(text):
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub(' ', '', text)
    #text = re.sub(' ', '', text)
    
    text = re.sub(r'[0-9 0-9]', '0', text)
    return text

def tokenizer_with_preprocessing(text):
    text = preprocessing_text(text)
    ret = tokenizer_janome(text)
    return ret

データの読み込みをします。

from sklearn.model_selection import train_test_split

path='reviews.csv'

df = pd.read_csv(path)
seq_row = df['Body']
y = df['Rating2']

seq = [tokenizer_with_preprocessing(text) for text in seq_row]

x_train, x_test, y_train, y_test = train_test_split(seq, y, stratify=y)

vocab = Vocab()
vocab.fit(x_train)

x_id_train = vocab.transform(x_train, bos=True)
x_id_test = vocab.transform(x_test, bos=True)

X_train = pad_sequences(x_id_train, padding='post', maxlen=64)
y_train_oht = tf.one_hot(y_train, depth=2, dtype=tf.float32)

ネットワークの定義と、学習の設定をします。

from tensorflow.keras import optimizers

model = AttentionClassifier(
            vocab_size = len(vocab.i2w), # 単語の総数
            hopping_num = 8, # Multi-head Attentionの繰り返し数
            heads_num = 6, # Multi-head Attentionのヘッド数
            hidden_dim = 300, # Embeddingの次数
            drop_rate = 0.1, # ドロップアウトの確率
            token_num = 64,
            NUMLABELS = 2
)

criterion = tf.keras.losses.CategoricalCrossentropy()
optimizer = optimizers.Adam(learning_rate=2e-4,
                           beta_1=0.9, beta_2=0.999, amsgrad=True)

model.compile(loss=criterion, optimizer=optimizer, metrics=['accuracy'])

学習を実行します。

history=model.fit(X_train, y_train_oht, batch_size=32, epochs=20)

image.png

結果の可視化をします。

from IPython.display import HTML
from IPython.display import HTML, display

def mk_html(index, batch, preds, normlized_weights, vocab, labels=["Negative", "Positive"]):
    "HTMLデータを作成する"

    # indexの結果を抽出
    sentence = batch[0][index]  # 文章
    label = batch[1][index] # ラベル
    pred = preds[index]  # 予測
    # ラベルと予測結果を文字に置き換え
    label_str = labels[label]
    pred_str = labels[pred]
    # 表示用のHTMLを作成する
    html = '正解ラベル:{}<br>推論ラベル:{}<br><br>'.format(label_str, pred_str)

    # 12種類のAttentionの平均を求める。最大値で規格化
    all_attens = normlized_weights[0, :, 0, :].sum(axis=0)*0  # all_attensという変数を作成する
    all_attens = np.sum(normlized_weights[index, :, 0, :], axis=0)
    all_attens = (all_attens -all_attens.min()) /  (all_attens.max()-all_attens.min())

    for word, attn in zip(sentence, all_attens):
        # 単語が[SEP]の場合は文章が終わりなのでbreak
        if vocab.decode([word.numpy().tolist()])[0] == "[SEP]":
            break

        # 関数highlightで色をつける、関数tokenizer_bert.convert_ids_to_tokensでIDを単語に戻す
        html += highlight(vocab.decode(
            [word.numpy().tolist()])[0], attn)
    html += "<br><br>"

    return html

def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)
  
def make_html(preds, batch, labels=["Negative", "Positive"]):
  html_output = [mk_html(index=idx,
                         batch=batch, 
                         preds=np.argmax(preds, axis=1),
                         normlized_weights=atten.numpy(),
                         vocab=vocab,
                         labels=labels) for idx in np.arange(len(preds))]
  return  html_output
preds, atten = model(X_test, return_attention_scores=True)
html_results = make_html(preds, batch=(X_test, np.array(y_test)), labels=["Negative", "Positive"])

HTML(html_results[613])

文章の単語ごとにattention weightで色付けしたものです。

image.png

kerasではMultiHeadAttentionが実装されています。
引数名が変わることに注意して書き換えます。

MultiHeadAttention(key_dim = 2, num_heads = heads_num, dropout = drop_rate)

他にも引数名の違いによる書き換えがあります。
詳細の実装・処理結果は以下を参考してください。

pytorch

pytorchでも同様に実装をしていきます。
まずは必要なライブラリのインポートをします。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optimizers

import numpy as np
import math

kerasの実装をできるだけそのままpytorchに移し替えたので内容は変わりません。

MultiHeadAttentionの実装をします。

class MultiHeadAttention(nn.Module):
    '''
    Multi-Head Attentionレイヤ
    
    hidden_dim : Embeddingされた単語ベクトルの長さ
    heads_num : マルチヘッドAttentionのヘッド数
       ※hidden_numはheads_numで割り切れえる値とすること
    drop_rate : 出力のDropout率

    model = MultiheadAttention(
        hidden_dim = 512,
        head_num = 8,
        drop_rate = 0.5
    )
    '''
    def __init__(self, token_num, hidden_dim, heads_num, drop_rate=0.5):
        super(MultiHeadAttention, self).__init__()
        # 入力の線形変換
        # 重み行列は[hidden_dim, hidden_dim]
        self.query = nn.Linear(hidden_dim, hidden_dim)
        self.key   = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, hidden_dim)
        
        # 出力の線形変換
        self.projection = nn.Linear(hidden_dim, hidden_dim)
        
        # 出力のDropout
        self.drop = nn.Dropout(drop_rate)
        
        self.nf = hidden_dim
        self.nh = heads_num
    
    def atten(self, query, key, value, attention_mask):
        """
        Attention
        
        query, key, value : 入力
        attention_mask : attention weight に適用される mask
        """
        # 各値を取得
        shape = query.shape
        batch_size = -1 if shape[0] is None else shape[0]
        token_num = shape[2] # トークン列数
        hidden_dim = shape[1]*shape[3] # 入力チャンネル数
        
        # ここで q と k の内積を取ることで、query と key の単語間の関連度のようなものを計算します。
        # tf.matmulで最後の2成分について積を計算(それ以外は形がそろっている必要あり)
        # transpose_bで転置
        # [token_num, hidden_dim/head_num] @ [hidden_dim/head_num, token_num] = [token_num, token_num]
        scores = torch.matmul(query, key.transpose(-2, -1))
        
        # scoreをhidden_dimの平方根割る
        scores = scores / math.sqrt(hidden_dim)
        
        # Attention Maskがあればscoreに加算
        # attention_mask: [batch_size, token_num, token_num] 
        # マスク(参照しない部分)の場所に1、使用する部分は0とする
        # 0の部分を -無限大にする(softmax(-無限大)=0となる)
        # 1. PADを無視
        # 2. DecoderのSelf-Attentionで未来の情報を参照できないようにする
        if attention_mask is not None:
            scores = scores.masked_fill(attention_mask == 1, -1e9)

        # softmax を取ることで正規化します
        # input(query) の各単語に対して memory(key) の各単語のどこから情報を引いてくるかの重み
        atten_weight = F.softmax(scores, dim = -1)
        #atten_weight = scores / torch.sum(scores, dim=-1, keepdim=True)
        
        # 重みに従って value から情報を引いてきます
        # [token_num, token_num] @ [token_num, hidden_dim/head_num] = [token_num, hidden_dim/head_num]
        # input(query) の単語ごとに memory(value)の各単語 に attention_weight を掛け合わせて足し合わせた ベクトル(分散表現の重み付き和)を計算
        context = torch.matmul(atten_weight, value)
        
        # 各ヘッドの結合(reshape)
        # 入力と同じ形に変換する
        context = context.transpose(1, 2).contiguous()
        context = context.view(batch_size, token_num, hidden_dim)
        
        # 線形変換
        context = self.projection(context)
        
        return self.drop(context), atten_weight

    def _split(self, x):
        """
        query, key, valueを分割する
        
        入力 shape: [batch_size, length, hidden_dim] の時
        出力 shape: [batch_size, head_num, length, hidden_dim//head_num]
        """
        # 各値を取得
        hidden_dim = self.nf
        heads_num = self.nh
        shape = x.shape
        batch_size = -1 if shape[0] is None else shape[0]
        token_num = shape[1] # トークン列数
        
        # [batch_size, token_num, hidden_dim] -> [batch_size, token_num, head_num, hidden_dim/head_num]
        # splitだが実際は次元を拡張する処理
        x = x.view(batch_size, token_num, heads_num, int(hidden_dim/heads_num))
        
        # [batch_size, token_num, head_num, hidden_dim/head_num] -> [batch_size, head_num, token_num, hidden_dim/head_num]
        x = x.transpose(1, 2)
        return x
    
    def forward(self, x, memory=None, attention_mask=None, return_attention_scores=False):
        """
        モデルの実行
        
        input : 入力(query) [batch_size, token_num, hidden_dim]
        memory : 入力(key, value) [batch_size, token_num, hidden_dim]
        attention_mask : attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 1 となるようなもの(Decoderで使用)
        """
        # memoryが入力されない場合、memory=input(Self Attention)とする
        if memory is None:
            memory = x
        
        # input -> query
        # memory -> key, value
        # [batch_size, token_num, hidden_dim] @ [hidden_dim, hidden_dim] -> [batch_size, token_num, hidden_dim] 
        query = self.query(x)
        key = self.key(memory)
        value = self.value(memory)
        
        # ヘッド数に分割する
        # 実際はreshapeで次数を1つ増やす
        # [batch_size, token_num, hidden_dim] -> [batch_size, head_num, token_num, hidden_dim/head_num]
        query = self._split(query)
        key = self._split(key)
        value = self._split(value)
        
        # attention
        # 入力と同じ形の出力
        # context: [batch_size, token_num, hidden_dim]
        # score_weightsはEncoderではNoneとする
        context, atten_weight = self.atten(query, key, value, attention_mask)
        
        if return_attention_scores:
            return context, atten_weight
        else:
            return context

FeedForwardNetworkの実装をします。

class FeedForwardNetwork(nn.Module):
    '''
    Position-wise Feedforward Neural Network
    transformer blockで使用される全結合層
    '''
    def __init__(self, hidden_dim, drop_rate=0.1):
        super().__init__()
        # 2層構造
        # 1層目:チャンネル数を増加させる
        self.filter_dense_layer = nn.Linear(hidden_dim, hidden_dim * 4)
        self.relu1 = nn.ReLU()
        
        # 2層目:元のチャンネル数に戻す
        self.output_dense_layer = nn.Linear(hidden_dim * 4, hidden_dim)
        self.drop = nn.Dropout(drop_rate)

    def forward(self, x):
        '''
        入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        '''
        
        # [batch_size, token_num, hidden_dim] -> [batch_size, token_num, 4*hidden_dim]
        x = self.filter_dense_layer(x)
        x = self.relu1(x)
        x = self.drop(x)
        
        # [batch_size, token_num, 4*hidden_dim] -> [batch_size, token_num, hidden_dim]
        return self.output_dense_layer(x)

残差接続のためのResidualNormalizationWrapperを実装します。

class ResidualNormalizationWrapper(nn.Module):
    '''
    残差接続
    output: input + SubLayer(input)
    '''
    def __init__(self, hidden_dim, layer, drop_rate=0.1):
        super().__init__()
        self.layer = layer # SubLayer : ここではAttentionかFFN
        self.layer_normalization = nn.LayerNorm(hidden_dim)
        self.drop = nn.Dropout(drop_rate)

    def forward(self, x, memory=None, attention_mask=None, return_attention_scores=False):
        """
        AttentionもFFNも入力と出力で形が変わらない
        [batch_size, token_num, hidden_dim]
        """
        
        params = {}
        if memory is not None:
            params['memory'] = memory
        if attention_mask is not None:
            params['attention_mask'] = attention_mask
        if return_attention_scores:
            params['return_attention_scores'] = return_attention_scores
        
        out = self.layer_normalization(x)
        if return_attention_scores:
            out, attn_weights = self.layer(out,**params)
            out = self.drop(out)
            return x + out, attn_weights
        else:
            out = self.layer(out,**params)
            out = self.drop(out)
            return x + out

AddPositionalEncodingを実装します。
これはいろいろ実装の種類があるようです。

class AddPositionalEncoding(nn.Module):
    '''
    入力テンソルに対し、位置の情報を付与して返すレイヤー
    see: https://arxiv.org/pdf/1706.03762.pdf

    PE_{pos, 2i}   = sin(pos / 10000^{2i / d_model})
    PE_{pos, 2i+1} = cos(pos / 10000^{2i / d_model})
    '''
    def forward(self, inputs):
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        fl_type = inputs.dtype
        batch_size, max_length, depth = inputs.shape
        
        depth_counter = torch.div(torch.arange(depth) ,2, rounding_mode='trunc')*2
 
        depth_matrix = torch.tile(torch.unsqueeze(depth_counter, 0), [max_length, 1])  # [max_length, depth]
        depth_matrix = torch.pow(10000.0, depth_matrix / depth)  # [max_length, depth]
        # cos(x) == sin(x + π/2)
        phase = torch.remainder(torch.arange(depth), 2) * math.pi / 2
        phase_matrix = torch.tile(torch.unsqueeze(phase, 0), [max_length, 1])  # [max_length, depth]

        pos_counter = torch.arange(max_length)
        pos_matrix = (torch.tile(torch.unsqueeze(pos_counter, 1), [1, depth]))  # [max_length, depth]

        positional_encoding = torch.sin(pos_matrix / depth_matrix + phase_matrix)
        # [batch_size, max_length, depth]
        positional_encoding = torch.tile(torch.unsqueeze(positional_encoding, 0), [batch_size, 1, 1])
        positional_encoding = positional_encoding.to(device)

        return inputs + positional_encoding

TokenEmbeddingを実装します。
学習済みの重みを使えるようにしていますが、今回は試していません。

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim, pretrained_weight=None):
        # vocab_size: 単語の総数
        # embedding_dim: Embeddingの次数
        super().__init__()
        self.embedding_dim = embedding_dim
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=1)
        self.embedding.weight.requires_grad = True
        
        if pretrained_weight is not None:
            self.embedding.weight.data.copy_(pretrained_weight)

    def forward(self, x):
        # inputのIDに対応したベクトルを持ってくる
        embedding = self.embedding(x)
        
        return embedding * (self.embedding_dim ** 0.5)

TransformerBlockを実装します。

class TransformerBlock(nn.Module):
    """
    transformer block : before ->[attention -> FF]-> next
    それぞれ残差接続とLayerNormalizationの処理が含まれる
    """
    def __init__(self, token_num, hidden_dim, heads_num, drop_rate=0.1):
        """
        hidden_numはheads_numで割り切れえる値とすること
        """
        super().__init__()
        self.atten = ResidualNormalizationWrapper(
            hidden_dim = hidden_dim,
            layer = MultiHeadAttention(token_num=token_num, hidden_dim = hidden_dim, heads_num = heads_num, drop_rate = drop_rate),
            drop_rate = drop_rate)
        
        self.ffn = ResidualNormalizationWrapper(
            hidden_dim = hidden_dim,
            layer = FeedForwardNetwork(hidden_dim = hidden_dim, drop_rate = drop_rate),
            drop_rate = drop_rate)
    
    def forward(self, input, memory=None, attention_mask=None, return_attention_scores=False):
        """
        入力と出力で形式が変わらない
        [batch_size, token_num, hidden_dim]
        """
        if return_attention_scores:
            x, attn_weights = self.atten(input, memory, attention_mask, return_attention_scores)
            x = self.ffn(x)
            return x, attn_weights
        else:
            x = self.atten(input, memory, attention_mask, return_attention_scores)
            x = self.ffn(x)
            return x

TransformerBlockを繰り返し実行してEncoderを定義します。

class Encoder(nn.Module):
    '''
    TransformerのEncoder
    '''
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            pretrained_weight=None
    ):
        super().__init__()
        self.hopping_num = hopping_num
        
        # Embedding層
        self.token_embedding = TokenEmbedding(vocab_size, hidden_dim, pretrained_weight)
        # Position Embedding
        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = nn.Dropout(drop_rate)

        # Multi-head Attentionの繰り返し(hopping)のリスト
        self.attention_block_list = nn.ModuleList([TransformerBlock(token_num, hidden_dim, heads_num) for _ in range(hopping_num)])
        self.output_normalization = nn.LayerNorm(hidden_dim)

    def forward(
            self,
            input,
            memory=None,
            attention_mask=None,
            return_attention_scores=False
    ):
        '''
        input: 入力 [batch_size, length]
        memory: 入力 [batch_size, length]
        attention_mask: attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 0 となるようなもの(Decoderで使用)
        出力 [batch_size, length, hidden_dim]
        '''
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        embedded_input = self.token_embedding(input)
        # Positional Embedding
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input)
        
        if return_attention_scores:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query, atten_weights = self.attention_block_list[i](query, memory, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query), atten_weights
        else:
            # MultiHead Attentionを繰り返し適用
            for i in range(self.hopping_num):
                query = self.attention_block_list[i](query, memory, attention_mask, return_attention_scores)

            # [batch_size, token_num, hidden_dim]
            return self.output_normalization(query)

TransformerBlockの出力を全結合層に入力して分類モデルとします。

class AttentionClassifier(nn.Module):
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            NUMLABELS, # クラス数
            pretrained_weight=None,
            PAD_ID = 1
    ):
        super().__init__()
        self.PAD_ID = PAD_ID
        
        self.encoder = Encoder(vocab_size, hopping_num, heads_num, hidden_dim, token_num, drop_rate, pretrained_weight)
        self.dense1 = nn.Linear(hidden_dim, hidden_dim)
        self.act1 = nn.Tanh()
        self.dropout1 = nn.Dropout(drop_rate)   
        self.final_layer = nn.Linear(hidden_dim, NUMLABELS)
        
        nn.init.normal_(self.dense1.weight, std=0.02)
        nn.init.normal_(self.dense1.bias, std=0)
        nn.init.normal_(self.final_layer.weight, std=0.02)
        nn.init.normal_(self.final_layer.bias, std=0)

    def forward(self, x, return_attention_scores=False):
        self_attention_mask=self._create_enc_attention_mask(x)
        
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        if return_attention_scores:
            enc_output, atten_weights = self.encoder(x, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        else:
            enc_output = self.encoder(x, attention_mask=self_attention_mask,return_attention_scores=return_attention_scores)
        
        # 文頭の重みを使用 [batch_size, 0, hidden_dim]
        # [batch_size, hidden_dim] -> [batch_size, hidden_dim]
        enc_output = self.dense1(enc_output[:, 0, :])
        enc_output = self.act1(enc_output)
        enc_output = self.dropout1(enc_output)
        
        # [batch_size, hidden_dim] -> [batch_size, NUMLABELS]
        final_output = self.final_layer(enc_output)

        if return_attention_scores:
            return final_output, atten_weights
        else:
            return final_output
    
    def _create_enc_attention_mask(self, x):
        batch_size, length = x.shape
        # マスクする部分を1とする
        pad_array = torch.eq(x, self.PAD_ID).to(dtype=torch.int8)  # [batch_size, token_num]
        
        # shape broadcasting で [batch_size, head_num, token_num, token_num] になる
        return pad_array.view([batch_size, 1, 1, length])

データの準備を行います。
自分で集めた居酒屋の口コミデータです。
まずは、前処理を定義します。
今回は、簡単な処理のみとします。

!pip install janome
import re
from janome.tokenizer import Tokenizer
j_t = Tokenizer(wakati=True)

def tokenizer_janome(text):
    return [tok for tok in j_t.tokenize(text, wakati=True)]

def preprocessing_text(text):
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub(' ', '', text)
    text = re.sub(' ', '', text)
    
    text = re.sub(r'[0-9 0-9]', '0', text)
    return text

def tokenizer_with_preprocessing(text):
    text = preprocessing_text(text)
    ret = tokenizer_janome(text)
    return ret

データの読み込みをします。

import torchtext
#from torchtext import data, datasets
from torchtext.legacy import data

max_length = 64
TEXT = data.Field(sequential=True, tokenize=tokenizer_with_preprocessing,
                  use_vocab=True, lower=True, include_lengths=True,
                  batch_first=True, fix_length=max_length,init_token="<eos>",eos_token="<cls>")
LABEL = data.Field(sequential=False, use_vocab=False, preprocessing=None)

dataset = data.TabularDataset(
        path='reviews.csv', format='csv',
        skip_header=True,
        fields=[('Text', TEXT), ('Label', LABEL), ('Label2', LABEL)])

train_dataset, test_dataset = dataset.split(split_ratio=0.7)
train_dataset, val_dataset = train_dataset.split(split_ratio=0.7)

Iteratorの定義をします。

train_iter = data.Iterator(
    train_dataset, batch_size=32, 
    train=True  # train=Trueならシャッフルソートは有効
)
val_iter = data.Iterator(
    val_dataset, batch_size=32, 
    train=False, sort=False
)
test_iter = data.Iterator(
    test_dataset, batch_size=32, 
    train=False, sort=False
)

dataloaders_dict = {"train":train_iter, "val":val_iter}

ネットワークの定義や学習の設定をします。

TEXT.build_vocab(train_dataset)
vocab = TEXT.vocab

net = AttentionClassifier(
            vocab_size = len(vocab), # 単語の総数
            hopping_num = 8, # Multi-head Attentionの繰り返し数
            heads_num = 6, # Multi-head Attentionのヘッド数
            hidden_dim = 300, # Embeddingの次数
            drop_rate = 0.1, # ドロップアウトの確率
            token_num = 64,
            
    pretrained_weight=None,
    NUMLABELS=2
    )

def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)
net.train()
net.apply(weights_init)

criterion = nn.CrossEntropyLoss()

learning_rate = 2e-4
optimizer = optimizers.Adam(net.parameters(), lr=learning_rate, amsgrad=True, eps=1e-07)

訓練用の関数を定義します。

def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("使用デバイス:", device)
    print('--------start--------')
    net.to(device)
    
    torch.backends.cudnn.benchmark = True
    
    for epoch in range(num_epochs):
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()
            else:
                net.eval()
            
            epoch_loss = 0.0
            epoch_corrects = 0
            
            for batch in (dataloaders_dict[phase]):
                inputs = batch.Text[0].to(device)
                labels = batch.Label2.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = net(inputs)
                    loss = criterion(outputs, labels)
                    
                    _, preds = torch.max(outputs, 1)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                    
                    epoch_loss += loss.item() * inputs.size(0)
                    epoch_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double() / len(dataloaders_dict[phase].dataset)
            
            print('Epoch {}/{} | {:.^5} | Loss: {:.4f} Acc: {:.4f}'.format(epoch+1,
                                                                           num_epochs,
                                                                           phase,
                                                                           epoch_loss,
                                                                           epoch_acc))
        
    return net

訓練を実行します。

num_epochs = 20

net_trained = train_model(net, dataloaders_dict, criterion, optimizer, num_epochs=num_epochs)

image.png

結果の可視化を行います。

from IPython.display import HTML
from IPython.display import HTML, display

def mk_html(index, batch, preds, normlized_weights, vocab, labels=["Negative", "Positive"]):
    "HTMLデータを作成する"

    # indexの結果を抽出
    sentence = batch[0][index]  # 文章
    label = batch[1][index] # ラベル
    pred = preds[index]  # 予測
    # ラベルと予測結果を文字に置き換え
    label_str = labels[label]
    pred_str = labels[pred]
    # 表示用のHTMLを作成する
    html = '正解ラベル:{}<br>推論ラベル:{}<br><br>'.format(label_str, pred_str)

    # 12種類のAttentionの平均を求める。最大値で規格化
    all_attens = normlized_weights[0, :, 0, :].sum(axis=0)*0  # all_attensという変数を作成する
    all_attens = np.sum(normlized_weights[index, :, 0, :], axis=0)
    all_attens = (all_attens -all_attens.min()) /  (all_attens.max()-all_attens.min())

    for word, attn in zip(sentence, all_attens):
        # 単語が[SEP]の場合は文章が終わりなのでbreak
        if TEXT.vocab.itos[word] == "<cls>":
            break

        # 関数highlightで色をつける、関数tokenizer_bert.convert_ids_to_tokensでIDを単語に戻す
        html += highlight(TEXT.vocab.itos[word], attn)
    html += "<br><br>"

    return html

def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)
  
def make_html(preds, batch, labels=["Negative", "Positive"]):
  html_output = [mk_html(index=idx,
                         batch=batch, 
                         preds=np.argmax(preds, axis=1),
                         normlized_weights=atten,
                         vocab=vocab,
                         labels=labels) for idx in np.arange(len(preds))]
  return  html_output
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batch = next(iter(test_iter))

preds, atten = net(batch.Text[0].to(device), return_attention_scores=True)
atten = atten.to('cpu').detach().numpy()
preds = preds.to('cpu').detach().numpy()

html_results = make_html(preds, batch=(np.array(batch.Text[0]), np.array(batch.Label2)), labels=["Negative", "Positive"])
HTML(html_results[1])

image.png

pytorchではnn.TransformerEncoderLayerおよびnn.TransformerEncoderが実装されており、これを使えばよりシンプルに実装ができます。
書き換えるのはEncoder部分です。

class Encoder(nn.Module):
    '''
    TransformerのEncoder
    '''
    def __init__(
            self,
            vocab_size, # 単語の総数
            hopping_num, # Multi-head Attentionの繰り返し数
            heads_num, # Multi-head Attentionのヘッド数
            hidden_dim, # Embeddingの次数
            token_num, # 系列長(文章中のトークン数)
            drop_rate, # ドロップアウトの確率
            pretrained_weight=None
    ):
        super().__init__()
        self.hopping_num = hopping_num
        
        # Embedding層
        self.token_embedding = TokenEmbedding(vocab_size, hidden_dim, pretrained_weight)
        # Position Embedding
        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = nn.Dropout(drop_rate)

        # Multi-head Attentionの繰り返し(hopping)のリスト
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=heads_num, dropout=drop_rate, batch_first=True)
        self.attention_block = nn.TransformerEncoder(encoder_layer, num_layers=hopping_num)
        
        self.output_normalization = nn.LayerNorm(hidden_dim)

    def forward(
            self,
            input,
            memory=None,
            attention_mask=None
    ):
        '''
        input: 入力 [batch_size, length]
        memory: 入力 [batch_size, length]
        attention_mask: attention weight に適用される mask
            [batch_size, 1, q_length, k_length] 
            pad 等無視する部分が 0 となるようなもの(Decoderで使用)
        出力 [batch_size, length, hidden_dim]
        '''
        # [batch_size, token_num] -> [batch_size, token_num, hidden_dim]
        embedded_input = self.token_embedding(input)
        # Positional Embedding
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input)
        
        query = self.attention_block(query, src_key_padding_mask=attention_mask)

        # [batch_size, token_num, hidden_dim]
        return self.output_normalization(query)

実装・処理結果は以下を参考してください。

以上で、Transformer(Encoderのみ)の実装を終わります。

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?