Help us understand the problem. What is going on with this article?

すべてがFになる

More than 1 year has passed since last update.

最初に

ネタ枠です。
一応、この記事のためのいくつかの準備はしていたのですが、夏休み最終日のごとくクリスマス・イブに大慌てで色々動かしたので、全然間に合ってないです。

概要

LSTMとAttentionによるネットワークと、それよりも強いTransformerを動かします。
Transformerで記事を書こうと思っていましたが、とてもわかりやすい記事がすでにあったので、慌てて修正しました。

すべてがFになる

上で挙げたネットワークは翻訳などの自然言語のタスクによく使われています。今回もそんな感じのタスクに挑戦します。

たぶん、説明するより、例を見たほうが早いですね。

入力例
入力 : 富士通を退職しました
出力 : 富士通に就職しました

入力 : 富士通が嫌いです
出力 : 富士通が好きです

詳しく説明すると後で叱られる可能性が高いのが、うーんって感じですが、つまりは特定の会社に対するネガティブな発言をポジティブに変えます。
これを以下では「Fになる」と表現します。マイルドになってとてもよいですね。

ネットワーク

LSTM + Attention

まずはLSTM+Attentionのネットワークで学習させてみます。

lstm_with_attention.py
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import dtypes
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.layers import Input, Lambda, Concatenate
from tensorflow.python.keras.layers import Embedding, Dense, LSTM, Activation
from tensorflow.python.keras import backend as K 
from tensorflow.python.keras.initializers import Initializer
from tensorflow.python.keras.losses import categorical_crossentropy

class MY_INIT(Initializer):
    def __init__(self, weight):
        self.weight = weight

    def __call__(self, shape, dtype=None, partition_info=None):
        return tf.convert_to_tensor(self.weight, dtype=tf.float32)

    def get_config(self):
        return {}

class Seq2Seq:
    def __init__(self, vocab_size, wordvec_size, hidden_size, input_encoder_length, input_decoder_length, embed_weight=None):
        self.vocab_size = vocab_size
        self.wordvec_size = wordvec_size
        self.hidden_size = hidden_size
        self.input_encoder_length = input_encoder_length
        self.input_decoder_length = input_decoder_length
        self.embed_weight = embed_weight
        self.model = self.build_model()
        self.model.compile(loss='categorical_crossentropy', optimizer='Adam')
        self.model.summary()

    def attention_weight(self, args):
        encoder_hiddens, decoder_hiddens = args
        enc_hs = K.reshape(encoder_hiddens, shape=(-1, 1, self.input_encoder_length, self.hidden_size))
        enc_hs = K.repeat_elements(enc_hs, self.input_decoder_length, axis=1)
        dec_hs = K.reshape(decoder_hiddens, shape=(-1, self.input_decoder_length, 1, self.hidden_size))
        dec_hs = K.repeat_elements(dec_hs, self.input_encoder_length, axis=2)

        t = enc_hs * dec_hs
        s = K.sum(t, axis=-1)
        a = K.softmax(s, axis=-1)
        return a

    def weight_sum(self, args):
        encoder_hiddens, attention_weights = args
        enc_hs = K.reshape(encoder_hiddens, shape=(-1, 1, self.input_encoder_length, self.hidden_size))
        enc_hs = K.repeat_elements(enc_hs, self.input_decoder_length, axis=1)
        attention_weights = K.reshape(attention_weights, shape=(-1, self.input_decoder_length, self.input_encoder_length, 1))
        attention_weights = K.repeat_elements(attention_weights, self.hidden_size, axis=3)

        t = enc_hs * attention_weights
        c = K.sum(t, axis=2)
        return c

    def build_model(self):
        _input = Input(shape=(self.input_encoder_length,))
        if self.embed_weight is None:
            embed = Embedding(self.vocab_size, self.wordvec_size, input_length=self.input_encoder_length)(_input)
        else:
            print("load pretrain model on embedding layer")
            embed = Embedding(self.vocab_size, self.wordvec_size, input_length=self.input_encoder_length, embeddings_initializer=MY_INIT(self.embed_weight))(_input)
        enc_hidden, state_h, state_c = LSTM(self.hidden_size, return_sequences=True, return_state=True)(embed)

        encoder_states = [state_h, state_c]
        _input2 = Input(shape=(self.input_decoder_length,))
        if self.embed_weight is None:
            embed = Embedding(self.vocab_size, self.wordvec_size, input_length=self.input_decoder_length)(_input2)
        else:
            print("load pretrain model on embedding layer")
            embed = Embedding(self.vocab_size, self.wordvec_size, input_length=self.input_decoder_length, embeddings_initializer=MY_INIT(self.embed_weight))(_input2)
        dec_hidden = LSTM(self.hidden_size, return_sequences=True)(embed, initial_state=encoder_states)

        attention_weights = Lambda(function=self.attention_weight)([enc_hidden, dec_hidden])
        c = Lambda(function=self.weight_sum)([enc_hidden, attention_weights])

        concat = Concatenate(axis=-1)([c, dec_hidden])

        #predict = Dense(self.vocab_size, activation='softmax')(concat)
        predict = Dense(118327, activation='softmax')(concat)

        return Model([_input, _input2], predict)

    def predict(self, xs, char2id, eos_mark='_'):
        ys = np.ones((xs.shape[0], self.input_decoder_length), dtype=np.int32)
        ys[:] = char2id[eos_mark]

        for i in range(self.input_decoder_length - 1):
            raw_preds = self.model.predict([xs, ys])
            preds = raw_preds.argmax(axis=-1)
            ys[:, i + 1] = preds[:, i]

        return ys

学習はEncoderの入力に「ネガティブ文」を入れて、Decoderの入力には先頭に”EOS”を加えた「ポジティブ文」が入ります。”EOS”がここから推論が始まるよの合図です。
推論時は”EOS”を入れたときのDecoderの出力を、次のステップにおけるDecoderの入力に入れます。

main.py
s2s = Seq2Seq(
    vocab_size=vocab_size,
    wordvec_size=wordvec_size,
    hidden_size=hidden_size,
    input_encoder_length=in_length,
    input_decoder_length=out_length,
    embed_weight=wv_vectors
    )

s2s.model.fit(
    x=[encoder_in_x_train, decoder_in_t_train],
    y=decoder_out_t_train,
    batch_size=batch_size,
    epochs=n_epochs,
    shuffle=True,
    validation_data=[[encoder_in_x_test, decoder_in_t_test], decoder_out_t_test]
    )

model.fit()の入力であるxには、Encoderへの入力とDecoderへの入力に対応するデータを指定します。
yにはDecoderの出力に対応するデータを指定します。
decoder_in_t_testとdecoder_out_t_testは同じものを入れるのではなく、decoder_in_t_testは、「ポジティブ文」を右に1つずつシフトさせて、空いた先頭に"EOS"に対応するidを入れたものである必要があります。
decoder_out_t_testは、「ポジティブ文」をそのままidで表現したものになります。
具体的には以下のとおりです。

decoder_in_t_test : <eos> 富士通 太郎 です
decoder_out_t_test : 富士通 太郎 です

Transformer

この記事を見た瞬間、やばいと思いましたね。すごく理解が深まりました。圧倒的感謝です。

作って理解する Transformer / Attention

Google Colaboratory上で気軽に試せるので、面倒なところをすっ飛ばした上で作って理解できるのが最高です!
今回実装したTransformerも基本的にはこの記事の中にあるコードを流用し、インターフェースの部分をKerasベースにしました。

transformer.py
class Encoder(Model):
    '''
    トークン列をベクトル列にエンコードする Encoder です。
    '''
    def __init__(self, vocab_size: int, hopping_num: int, head_num: int, hidden_dim: int, dropout_rate: float, max_length: int, embed_weight=None, *args, **kwargs) -> None:
        super().__init__()
        self.hopping_num = hopping_num
        self.head_num = head_num
        self.hidden_dim = hidden_dim
        self.dropout_rate = dropout_rate

        if embed_weight is None:
            self.token_embedding = Embedding(vocab_size, hidden_dim, input_length=max_length)
        else:
            self.token_embedding = Embedding(vocab_size, hidden_dim, input_length=max_length, embeddings_initializer=MY_INIT(embed_weight))

        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = Dropout(dropout_rate)

        self.attention_block_list: List[List[tf.keras.models.Model]] = []
        for _ in range(hopping_num):
            attention_layer = SelfAttention(hidden_dim, head_num, dropout_rate, name='self_attention')
            ffn_layer = FeedForwardNetwork(hidden_dim, dropout_rate, name='ffn')
            self.attention_block_list.append([
                ResidualNormalizationWrapper(attention_layer, dropout_rate, name='self_attention_wrapper'),
                ResidualNormalizationWrapper(ffn_layer, dropout_rate, name='ffn_wrapper'),
            ])
        self.output_normalization = LayerNormalization()

    def call(self, input, self_attention_mask, training):
        '''
        モデルを実行します

        :param input: shape = [batch_size, length]
        :param training: 学習時は True
        :return: shape = [batch_size, length, hidden_dim]
        '''
        # [batch_size, length, hidden_dim]
        embedded_input = self.token_embedding(input)
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input, training=training)

        for i, layers in enumerate(self.attention_block_list):
            attention_layer, ffn_layer = tuple(layers)
            with tf.name_scope(f'hopping_{i}'):
                query = attention_layer(query, attention_mask=self_attention_mask, training=training)
                query = ffn_layer(query, training=training)
        # [batch_size, length, hidden_dim]
        return self.output_normalization(query)


class Decoder(Model):
    '''
    エンコードされたベクトル列からトークン列を生成する Decoder です。
    '''
    def __init__(self, vocab_size: int, hopping_num: int, head_num: int, hidden_dim: int, dropout_rate: float, max_length: int, embed_weight=None, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.hopping_num = hopping_num
        self.head_num = head_num
        self.hidden_dim = hidden_dim
        self.dropout_rate = dropout_rate

        if embed_weight is None:
            self.token_embedding = Embedding(vocab_size, hidden_dim, input_length=max_length)
        else:
            self.token_embedding = Embedding(vocab_size, hidden_dim, input_length=max_length, embeddings_initializer=MY_INIT(embed_weight))

        self.add_position_embedding = AddPositionalEncoding()
        self.input_dropout_layer = Dropout(dropout_rate)

        self.attention_block_list: List[List[Model]] = []
        for _ in range(hopping_num):
            self_attention_layer = SelfAttention(hidden_dim, head_num, dropout_rate, name='self_attention2')
            enc_dec_attention_layer = MultiheadAttention(hidden_dim, head_num, dropout_rate, name='enc_dec_attention2')
            ffn_layer = FeedForwardNetwork(hidden_dim, dropout_rate, name='ffn2')
            self.attention_block_list.append([
                ResidualNormalizationWrapper(self_attention_layer, dropout_rate, name='self_attention_wrapper2'),
                ResidualNormalizationWrapper(enc_dec_attention_layer, dropout_rate, name='enc_dec_attention_wrapper2'),
                ResidualNormalizationWrapper(ffn_layer, dropout_rate, name='ffn_wrapper2'),
            ])
        self.output_normalization = LayerNormalization()
        # 注:本家ではここは TokenEmbedding の重みを転置したものを使っている
        self.output_dense_layer = Dense(vocab_size, use_bias=False)

    def call(self, input, encoder_output, self_attention_mask, enc_dec_attention_mask, training):
        '''
        モデルを実行します
        :param input: shape = [batch_size, length]
        :param training: 学習時は True
        :return: shape = [batch_size, length, hidden_dim]
        '''
        # [batch_size, length, hidden_dim]
        embedded_input = self.token_embedding(input)
        embedded_input = self.add_position_embedding(embedded_input)
        query = self.input_dropout_layer(embedded_input, training=training)

        for i, layers in enumerate(self.attention_block_list):
            self_attention_layer, enc_dec_attention_layer, ffn_layer = tuple(layers)
            with tf.name_scope(f'hopping_{i}'):
                query = self_attention_layer(query, attention_mask=self_attention_mask, training=training)
                query = enc_dec_attention_layer(query, memory=encoder_output,
                                                attention_mask=enc_dec_attention_mask, training=training)
                query = ffn_layer(query, training=training)

        query = self.output_normalization(query)  # [batch_size, length, hidden_dim]
        return self.output_dense_layer(query)  # [batch_size, length, vocab_size]


class Transformer:
    def __init__(self, vocab_size: int, hidden_dim: int, enc_input_length: int, dec_input_length: int, embed_weight=None, training: bool = False) -> None:
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.enc_input_length = enc_input_length
        self.dec_input_length = dec_input_length
        self.embed_weight = embed_weight
        self.training = training
        self.model = self.build_model()
        self.model.compile(
            optimizer=Adam(lr=1e-04, beta_2=0.98), 
            loss='categorical_crossentropy')
        self.model.summary()

    def build_model(self):
        # encoderへの入力
        enc_input = Input(shape=(self.enc_input_length,))
        enc_mask = Input(shape=(self.enc_input_length, self.enc_input_length))

        # decoderへの入力
        dec_input = Input(shape=(self.dec_input_length,))
        dec_mask = Input(shape=(self.dec_input_length, self.dec_input_length))
        enc_dec_mask = Input(shape=(self.dec_input_length, self.enc_input_length))

        # reshape
        _enc_mask = Lambda(lambda x: tf.reshape(x, (-1, 1, self.enc_input_length, self.enc_input_length)))(enc_mask)
        _dec_mask = Lambda(lambda x: tf.reshape(x, (-1, 1, self.dec_input_length, self.dec_input_length)))(dec_mask)
        _enc_dec_mask = Lambda(lambda x: tf.reshape(x, (-1, 1, self.dec_input_length, self.enc_input_length)))(enc_dec_mask)

        encoder = Encoder(vocab_size=self.vocab_size, hopping_num=1, head_num=4, hidden_dim=self.hidden_dim, dropout_rate=0.1, max_length=self.enc_input_length, embed_weight=self.embed_weight)
        encoded = encoder(enc_input, _enc_mask, training=self.training)

        decoder = Decoder(vocab_size=self.vocab_size, hopping_num=1, head_num=4, hidden_dim=self.hidden_dim, dropout_rate=0.1, max_length=self.dec_input_length, embed_weight=self.embed_weight)
        decoded = decoder(dec_input, encoded, _dec_mask, _enc_dec_mask, training=self.training)

        predict = Softmax()(decoded)
        return Model([enc_input, enc_mask, dec_input, dec_mask, enc_dec_mask], predict)

    def predict(self, model_input, char2id, eos_mark='_'):
        enc_in_x, enc_mask, _, dec_mask, enc_dec_mask = model_input
        ys = np.ones((enc_in_x.shape[0], self.dec_input_length), dtype=np.int32)
        ys[:] = char2id[eos_mark]

        for i in range(self.dec_input_length - 1):
            raw_preds = self.model.predict([enc_in_x, enc_mask, ys, dec_mask, enc_dec_mask])
            preds = raw_preds.argmax(axis=-1)
            ys[:, i + 1] = preds[:, i]

        return ys

すべて貼り付けると他人のコード80%くらいになってしまうので、自分が書き換えたり、新たに実装した部分があるクラスのみ掲載します。
コメントアウトからにじみ出る僕じゃない誰かの指紋。本当にありがとうございます。

main.py
transformer = Transformer(
    vocab_size=vocab_size,
    hidden_dim=hidden_size,
    enc_input_length=in_length,
    dec_input_length=out_length,
    embed_weight=wv_vectors
    )

n_train = x_train.shape[0]
enc_mask_train, enc_dec_mask_train, dec_mask_train = make_masks(n_train, in_length, out_length)
enc_mask_test, enc_dec_mask_test, dec_mask_test = make_masks(n_test, in_length, out_length)

transformer.model.fit(
    x=[encoder_in_x_train, enc_mask_train, decoder_in_t_train, dec_mask_train, enc_dec_mask_train],
    y=decoder_out_t_train,
    batch_size=batch_size,
    epochs=n_epochs,
    shuffle=True,
    validation_data=[
        [encoder_in_x_test, enc_mask_test, decoder_in_t_test, dec_mask_test, enc_dec_mask_test],
        decoder_out_t_test
    ]
)

なお、word2vecの行列はとても大きいので、今回は語彙数を減らして実験しています。

共通点

Embedding層は学習済みモデルを使います。
モデルはこのサイトから”20170201.tar.bz2”をダウンロードしました。
ダウンロードしたモデルの読み込みにはgensimを使います。

from gensim.models import KeyedVectors

wv_model = KeyedVectors.load_word2vec_format("entity_vector/entity_vector.model.bin", binary=True)

# idと単語の相互変換
index2word = wv_model.index2word
word2index = {word: i for i, word in enumerate(index2word)}

# word2vecの重み行列。これをEmbedding層の初期重みにする
wv_model.vectors

重みはそのまま初期値にするのではなく、10とか100で割った値にすると学習が安定しやすくなりました。
(今回はデータセットが貧弱なので一般的に言えるかどうかはわかりません)

データセットの詳細

訓練データの文章数40程度。1文章につき5~8単語で構成されています。
1つの「ネガティブ文」に対して1つの「ポジティブ文」が教師信号として割り当てられます。
特定の会社のネガティブ文をポジティブにする文章や、同業他社のポジティブ文を書き換えて、特定の会社から見てポジティブな文章になるパターンなど各種取り揃えました。
30文章ほどエクセルに書き込んだあたりから、「イブの聖夜に何をやっているんだろう」と我に返りました。
この感覚はきっと間違っていないな、と思います。

学習結果

そんなことはさておき、まずは訓練データをそのまま入れてみたいと思います。
ここでうまく推論してくれないとアドベントカレンダーの記事が書けなくなるので大問題です。
学習のepoch数は両方とも500です。

条件 文章    
入力文 富士通には興味がない
出力文(正解) 富士通に興味がある
出力文(LSTM+Attention) 富士通に興味がある
出力文(Transformer) 富士通に興味がある
条件 文章    
入力文 富士通を辞めます
出力文(正解) 富士通に入ります
出力文(LSTM+Attention) 富士通に入ります
出力文(Transformer) 富士通に入ります

訓練データのすべてがFになることを確認しました。データ数も少ないので、たぶん過学習してるのだと思います。
とりあえずアルゴリズムが正しく機能してるっぽいので一安心です。

では、評価データで確認したいと思います。果たしてFになるのでしょうか。
とりあえず「ポジティブ文」になればいいので、たった1つの正解はないものとします。

条件 文章    
入力文 嫌いです、富士通
出力文(LSTM+Attention) 富士通に行きたい
出力文(Transformer) 蘇る富士通
条件 文章    
入力文 富士通は暗い
出力文(LSTM+Attention) 富士通しかない
出力文(Transformer) 富士通しかない
条件 文章    
入力文 富士通に行かない
出力文(LSTM+Attention) 富士通に行こう
出力文(Transformer) 富士通に行きたい
条件 文章    
入力文 富士通よりも富士山
出力文(LSTM+Attention) 富士通は生き残った
出力文(Transformer) 富士通に立ち上がる
条件 文章    
入力文 富士通はどうでもいい
出力文(LSTM+Attention) 富士通を候補に選ぶ
出力文(Transformer) 富士通は楽しいところだ
条件 文章    
入力文 どうでもいい富士通
出力文(LSTM+Attention) 富士通には頭が上がらない
出力文(Transformer) 蘇る富士通

どちらかがずば抜けてよいというわけではないですが、頑張ってFにしてくれている印象があります。
ただ、両モデルとも、データが少ないせいか、訓練データで与えられた「ポジティブ文」のどれかに無理やり合わせようとしている印象を感じます。
データを増やせば、あらゆる文を自然なかたちでFにしてくれるはずです。

今後について

近い将来、社内勉強会でLSTMとかAttention, Transformerについて述べる機会があって、そのための勉強とか資料とかを作っていく予定です。
それらはこの記事を読んでくださった皆様とも共有できたらと思っています。

「戻る」を押さずにここまで頑張って読んでくださったなんて、とても嬉しいです。

さいごに

この記事が消えた時は色々察してください。たぶん、大丈夫だと信じてます!

まあ、何があったとしても、プログラムは整理してgithubに公開する予定です。
この記事で用いたデータがアレなだけで、それ以外は普通ですからね。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away