LoginSignup
1
5

More than 3 years have passed since last update.

初学者によるチャットボットの作成

Posted at

1.はじめに

今回 Aidemyのpremium planで三か月の間、自然言語処理を学びその知識の定着、復習の意を込め成果物として深層学習によるチャットボットを作成してみました。

Qiitaによる投稿も初めてですので拙い文章だとは思いますが、それでも良い方はご覧ください!

2.使用環境

今回は処理するデータ量が多いためGoogle Colaboratory環境で、無料で使えるGPUを使用しました。
フレームワークのバージョンは以下の通りです。
・janome    0.4.1
・torchvision  0.7.0
・torchtext   0.7.0
・PyTorch    1.6.0

3.記事の内容

タイトルにもあるように、今回は会話の入力に対して、深層学習を用いて応答を予測し出力させるモデルを構築し、ノートブック上で繰り返し会話できるボットを作ることを目標にコーディングしました。

深層学習のフレームワークとしてKeras、Tensorflow等、いろいろなものがありますが今回は最近流行りのPyTorchを使いました。
PyTorchを選んだ理由は、以下のように考えたからです。

  ① コードが比較的短く、理解すれば便利である

  ② 入力と正解のペアをまとめて学習できるため、会話を成立させるという
    点で有利に働く

  ③ 今後の開発につながる

また、学習するにあたりデータの前処理、PyTorchによる深層学習はudemyの講座を参考にさせていただきました。

PyTorchによる学習は初めてだったので、かなりつまずきながら手探り状態でしたが、自分でコーディングすることでPyTorchの概要、実装方法などかなり実践的な知識が身についたと思います。

4.実装の手順

それではコードのほうに移っていきます。
工程は大まかに分けて3つです。

① データの前処理
今回訓練に使用した会話コーパスは、NTT ドコモが一般公開している雑談会話コーパスになります。ライセンスはダウンロードページ記載のものに準拠します。

こちらは約1100もの対話が収録されている会話コーパスで、雑談対話APIと人間との間で会話が行われているため、明らかに破綻している会話も収録されているという点も考慮して前処理を行います。

また、チャットボットの作成には約10万もの会話データが必要になるといわれているので今回は一つの入力文に対し、重要である単語を含んだ複数の応答文でペアを作るという方法を用いてデータの水増しを行っています。

以下が、コードになります。

qiita.py
!pip install janome==0.4.1
!pip install torchvision==0.7.0
!pip install torchtext==0.7.0
!pip install torch==1.6.0

# google driveをマウントします
from google.colab import drive
drive.mount("/content/drive/")

#先に必要なものをインポートしときます
import glob 
import json  
import re
from janome.tokenizer import Tokenizer
import csv
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import dill


path = "/content/drive/My Drive/対話コーパス 前処理/NTT data/projectnextnlp-chat-dialogue-corpus/json"  

# 成り立っている会話を訓練用データとテストデータに分割して、保存します。

files = glob.glob(path + "/*/*.json")  
dialogues = []  
for file in files:
    with open(file, "r") as f:
        json_dic = json.load(f)
        dialogue = []  
        for turn in json_dic["turns"]:   #会話データの["turns"]には各項目が格納されている
            annotations = turn["annotations"]  #注釈
            speaker = turn["speaker"]          #話者
            utterance = turn["utterance"]      #会話内容

            utterance = utterance.replace(".", "。").replace(",", "、")  # 全角を統一
            utterance = utterance.replace(".", "。").replace(",", "、")  # 半角を統一
            utterance = utterance.split("。")[0] #ここでは"。"が文中にある長い文章を短くする



            if speaker=="U":  # "U"=人間
                dialogue.append(utterance) 
            else:  
                is_wrong = False
                for annotation in annotations:
                    breakdown = annotation["breakdown"]  #会話が破綻しているかどうか
                    if breakdown=="X":  
                        is_wrong = True
                        break
                if is_wrong:
                    dialogue.clear()  #破綻していたらデータをクリア
                else:
                    dialogue.append(utterance)  

            if len(dialogue) >= 2:    #会話成立で格納
                dialogues.append(dialogue.copy()) 
                dialogue.pop(0)  


#会話の分かち書きを行い、入力と応答それぞれのリストに格納します。
re_kanji = re.compile(r"^[\u4E00-\u9FD0]+$")  # 漢字の検出用
re_katakana = re.compile(r"[\u30A1-\u30F4]+")  # カタカナの検出用
janome_tk = Tokenizer()

def tokenizer(text):
    return [token for token in janome_tk.tokenize(text, wakati=True)]

wakati_input = []  #水増しのために入力文と応答文を分けます
wakati_reply = []
for dialogue in dialogues:
    wakati_input.append(tokenizer(dialogue[0])[:10])
    wakati_reply.append(tokenizer(dialogue[1])[:10])

#入力分を複数の応答分と組み合わせてデータの水増しを行います。
dialogues_mizumasi = []   #ここで水増しにより会話データ約10万個
for i, word_input in enumerate(wakati_input):
    inp_count = 0
    for j, word_reply in enumerate(wakati_reply):
        if i==j:
            dialogues_mizumasi.append(["".join(word_input), "".join(word_reply)])
            continue
        #入力と応答の類似度を判断し、類似度が1より大きいものを会話に追加します
        similarity = 0
        for word in word_input:   #カタカナや漢字を重要付けている
            if (word in word_reply) and (re_kanji.fullmatch(word) or re_katakana.fullmatch(word)):
                similarity += 1
        if similarity >= 1:
            dialogue_mizumasi = ["".join(word_input), "".join(word_reply)]
            if dialogue_mizumasi not in dialogues_mizumasi:
                dialogues_mizumasi.append(dialogue_mizumasi)
                inp_count += 1
                if inp_count >= 12:   #1つの入力に対し、対応する応答文は12まで
                    break

dialogues = dialogues_mizumasi

#作成した会話のリストを訓練用データとテストデータに分けて格納します。
dialogues_train, dialogues_test = train_test_split(dialogues, shuffle=True, test_size=0.05)
#入力分と応答分のデータセットの列を定義します
input_field = torchtext.data.Field(
    sequential=True,   #文章データのため可変にする
    tokenize=tokenizer, #分かち書き
    batch_first=True,
    lower=True
    )

reply_field = torchtext.data.Field(
    sequential=True,
    tokenize=tokenizer,
    init_token="<sos>",  #文のはじめに加える
    eos_token="<eos>",   #文の終わりに加える
    batch_first=True,
    lower=True
    )
path2 = "/content/drive/My Drive/対話コーパス 前処理/NTT data/"
#データセットを作成します
train_data, test_data = torchtext.data.TabularDataset.splits(
    path=path2,
    train=mizumasi_dialogues_train,
    test=mizumasi_dialogues_test,
    format="csv",
    fields=[("input_text", input_field), ("reply_text", reply_field)]
)

#辞書の作成をします(出現頻度が3以下のものは除く)
input_field.build_vocab(
    train_data,
    min_freq=3
)

reply_field.build_vocab(
    train_data,
    min_freq=3,
)
#データセットを保存します


torch.save(train_data.examples, path+"mizumasi_train_examples.pkl", pickle_module=dill)
torch.save(test_data.examples, path+"mizumasi_test_examples.pkl", pickle_module=dill)

torch.save(input_field, path2+"mizumasi_input.pkl", pickle_module=dill)
torch.save(reply_field, path2+"mizumasi_reply.pkl", pickle_module=dill) 

②訓練モデルを作成し、学習させる
次に、訓練モデルの作成と学習を行います。
流れとしては、

1.Encoder,Decoder,Seq2Seqのクラス作成
2.誤差を測定する評価関数を定義
3.これらを用いて、誤差が小さくなるようモデルにデータを学習させる

となります。
以下がコードになります。

qiita2.py
 #Batchの設定を行います
batch_size = 32

train_iterator = torchtext.data.Iterator(
    train_data,
    batch_size=batch_size,
    train=True
)

test_iterator = torchtext.data.Iterator(
    test_data,
    batch_size=batch_size,
    train=False,
    sort=False
)


 #Encoderクラスを作成します#

class Encoder(nn.Module):
    def __init__(self, n_h, n_vocab, n_emb, num_layers=1, bidirectional=False, dropout=0):
        super().__init__()

        self.n_h = n_h
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.dropout = dropout

        self.embedding = nn.Embedding(n_vocab, n_emb)
        self.embedding_dropout = nn.Dropout(self.dropout)

        self.gru = nn.GRU(
            input_size=n_emb,  #入力サイズ
            hidden_size=n_h,#隠れ層のサイズ(ニューロン数)
            batch_first=True,  #(バッチサイズ、時系列、入力数)
            num_layers=num_layers,
            bidirectional=bidirectional,
        )
    #順伝播の関数を定義
    def forward(self, x):
        index_pad = input_field.vocab.stoi["<pad>"] #"pad"のインデックス取得
        sentence_lengths = x.size()[1] - (x == index_pad).sum(dim=1)   #pad部分を引いて本来の文の長さを得る。

        y = self.embedding(x) #埋め込みベクトル化
        y = self.embedding_dropout(y)#過学習対策

        y = nn.utils.rnn.pack_padded_sequence(
            y,
            sentence_lengths,
            batch_first=True,
            enforce_sorted=False
        )                              #rnnに入れるためにpackedsequence型にする。

        y, h = self.gru(y)  #encoderでの出力と隠れ層の値を取得

        y, _ = nn.utils.rnn.pad_packed_sequence(y, batch_first=True)   #テンソルに戻す                  
        if self.bidirectional:
                #会話データにおける最後の重みが大きくなってしまうため
            y = y[:, :, :self.n_h] + y[:, :, self.n_h:]#ここでは出力と隠れ層の値は双方向の時間の値を足したものになる。
            h = h[:self.num_layers] + h[self.num_layers:]

        return y,h

#Decoderクラスを作成します

class Decoder(nn.Module):
    def __init__(self, n_h, n_out, n_vocab, n_emb, num_layers=1, dropout=0):
        super().__init__()

        self.n_h = n_h
        self.n_out = n_out
        self.num_layers = num_layers
        self.dropout = dropout

        self.embedding = nn.Embedding(n_vocab, n_emb)
        self.embedding_dropout = nn.Dropout(self.dropout)

        self.gru = nn.GRU(
            input_size=n_emb,
            hidden_size=n_h,
            batch_first=True,
            num_layers=num_layers,
        )

        self.fc = nn.Linear(n_h*2, self.n_out)  #全結合層の導入



    def forward(self, x, h_encoder, y_encoder):
        y = self.embedding(x)  # 単語をベクトルに変換
        y = self.embedding_dropout(y)
        y, h = self.gru(y, h_encoder) #ここでは出力と最後の時刻の隠れ層の値が渡される

        #  Attention
        y_tr = torch.transpose(y, 1, 2)  # 次元1と次元2を入れ替える
        ed_mat = torch.bmm(y_encoder, y_tr)  # バッチごとに行列積
        attn_weight = F.softmax(ed_mat, dim=1)  # attention weightの計算
        attn_weight_tr = torch.transpose(attn_weight, 1, 2)  # 次元1と次元2を入れ替える
        context = torch.bmm(attn_weight_tr, y_encoder)  # コンテキストベクトルの計算
        y = torch.cat([y, context], dim=2)  # 出力とコンテキストベクトルの合流  
        y = self.fc(y)
        y = F.softmax(y, dim=2)

        return y, h

#seq2seqクラスを作成します

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, is_gpu=True):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.is_gpu = is_gpu
        if self.is_gpu:
            self.encoder.cuda()
            self.decoder.cuda()

    def forward(self, x_encoder, x_decoder): #順伝播メソッドの設定 訓練時につかう
        if self.is_gpu:
            x_encoder = x_encoder.cuda()
            x_decoder = x_decoder.cuda()

        batch_size = x_decoder.shape[0]   #x_decoderは(バッチサイズ,時系列の数,ニューロン数)
        n_time = x_decoder.shape[1]
        y_encoder, h = self.encoder(x_encoder)

        y_decoder = torch.zeros(batch_size, n_time, self.decoder.n_out)   #バッチサイズ*時系列の数*出力数のすべて0のテンソル
        if self.is_gpu:
            y_decoder = y_decoder.cuda()

#教師強制と呼ばれる方法で各時刻の出力が次の入力に近くなるよう学習していきます。
        for t in range(0, n_time):  #各時刻で処理
            x = x_decoder[:, t:t+1]
            y, h = self.decoder(x, h, y_encoder)
            y_decoder[:, t:t+1, :] = y   #得られた出力yを各時刻ごとにy_encoderに格納

            return y_decoder

    def predict(self, x_encoder):    #予測に使用 文章の生成、評価するときに使う
        if self.is_gpu:
            x_encoder = x_encoder.cuda()

        batch_size = x_encoder.shape[0]
        n_time = x_encoder.shape[1]
        y_encoder, h = self.encoder(x_encoder)

        y_decoder = torch.zeros(batch_size, n_time, dtype=torch.long)  #ここではint型を用いるためtorch.long
        if self.is_gpu:
            y_decoder = y_decoder.cuda()

        y = torch.ones(batch_size, 1, dtype=torch.long) * input_field.vocab.stoi["<sos>"]  #予測用では各時刻で処理を行う必要がある。出力が次の入力になるから
        for t in range(0, n_time): 
            x = y     #ここでまえの出力を入力に
            if self.is_gpu:
                x = x.cuda()
            y, h = self.decoder(x, h, y_encoder)
            y = y.argmax(2)   #yの中で最も大きい数値のインデックス得ることで最適な文章が生成
            y_decoder[:, t:t+1] = y

        return y_decoder

#評価関数を定義します
def evaluate(model, iterator):
    model.eval()  # 評価モードにできる

    batch = next(iter(iterator))
    x = batch.inp_text
    y = model.predict(x)
    for i in range(x.size()[0]):
        inp_text = ""
        for j in range(x.size()[1]):
            word = input_field.vocab.itos[x[i][j]]
            if word=="<pad>":
                break
            inp_text += word

        rep_text = ""
        for j in range(y.size()[1]):
            word = reply_field.vocab.itos[y[i][j]]
            if word=="<eos>":
                break
            rep_text += word

        print("input:", inp_text)
        print("reply:", rep_text)
        print()




is_gpu = True  # GPUを使用するかどうか
n_h = 800     #隠れ層のニューロン数
n_vocab_inp = len(input_field.vocab.itos)  #入力文の長さ
n_vocab_rep = len(reply_field.vocab.itos)  #応答文の長さ
n_emb = 300    #埋め込みベクトルの要素数
n_out = n_vocab_rep     #出力の数
early_stop_patience = 5  # 早期終了のタイミング(誤差の最小値が何回更新されなかったら終了か)
num_layers = 1        #中間層の数
bidirectional = True
dropout = 0.1
clip = 100         #勾配の上限

# Seq2Seqのモデルを構築します
encoder = Encoder(n_h, n_vocab_inp, n_emb, num_layers, bidirectional, dropout=dropout)
decoder = Decoder(n_h, n_out, n_vocab_rep, n_emb, num_layers, dropout=dropout)
seq2seq = Seq2Seq(encoder, decoder, is_gpu=is_gpu)

# 誤差関数(今回は分類問題なのでクロスエントロピー誤差を使います)
loss_fnc = nn.CrossEntropyLoss(ignore_index=reply_field.vocab.stoi["<pad>"])

# 最適化アルゴリズム
optimizer_enc = optim.Adam(seq2seq.parameters(), lr=0.0001)
optimizer_dec = optim.Adam(seq2seq.parameters(), lr=0.0005)


record_loss_train = []
record_loss_test = []
min_losss_test = 0.0

# 学習
for i in range(1000):     #1000epoch 学習
    seq2seq.train()

    loss_train = 0
    for j, batch in enumerate(train_iterator):
        inp, rep = batch.inp_text, batch.rep_text
        x_enc = inp
        x_dec = rep[:, :-1]
        y_dec = seq2seq(x_enc, x_dec)

        t_dec = rep[:, 1:]
        t_dec = t_dec.cuda() if is_gpu else t_dec
        loss = loss_fnc(
            y_dec.view(-1, y_dec.size()[2]),
            t_dec.reshape(-1)
            )
        loss_train += loss.item()
        optimizer_enc.zero_grad()
        optimizer_dec.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(encoder.parameters(), clip)
        nn.utils.clip_grad_norm_(decoder.parameters(), clip)
        optimizer_enc.step()
        optimizer_dec.step()

        if j%1000==0:
            print("batch:", str(j)+"/"+str(len(train_data)//batch_size+1), "loss:", loss.item())
    loss_train /= j+1
    record_loss_train.append(loss_train)

    # 評価モードにします
    seq2seq.eval()

    loss_test = 0
    for j, batch in enumerate(test_iterator):
        inp, rep = batch.inp_text, batch.rep_text
        x_enc = inp
        x_dec = torch.ones(rep.size(), dtype=torch.long) * reply_field.vocab.stoi["<sos>"]
        x_dec[:, 1:] = rep[:, :-1]
        y_dec = seq2seq(x_enc, x_dec)

        t_dec = rep.cuda() if is_gpu else rep
        loss = loss_fnc(
            y_dec.view(-1, y_dec.size()[2]),
            t_dec.view(-1)
            )
        loss_test += loss.item()
    loss_test /= j+1
    record_loss_test.append(loss_test)

    if i%1 == 0:
        print("Epoch:", i, "Loss_Train:", loss_train, "Loss_Test:", loss_test)
        print()

    evaluate(seq2seq, test_iterator)

   #早期終了の設定をします
    latest_min = min(record_loss_test[-(early_stop_patience):])  # 直近の最小値
    if len(record_loss_test) >= early_stop_patience:
        if latest_min > min_loss_test:  # 直近で最小値が更新されていなければ
            print("Early stopping!")
            break
        min_loss_test = latest_min
    else:
        min_loss_test = latest_min



torch.save(seq2seq.state_dict(), path2+"chat_bot.pth")    #学習済みのモデルのステータスを保存

③学習済みモデルを使ってユーザーの入力に対し応答を予測し、出力
最後に、ユーザーの入力を解析し、応答を出力するといった簡易的なチャットボットをノートブック上で動かします。
今回はユーザーが"さようなら"と入力するまで会話を続けることにしました。
以下が、コードになります。

qiita3.py
#モデルの読み込み
import dill

path2 = "/content/drive/My Drive/対話コーパス 前処理/NTT data/"

input_field = torch.load(path2+"input_field.pkl", pickle_module=dill)  #インデックスと単語の対応付け
reply_field = torch.load(path2+"reply_field.pkl", pickle_module=dill)

is_gpu = True  
n_h = 800
n_vocab_inp = len(input_field.vocab.itos)
n_vocab_rep = len(reply_field.vocab.itos)
n_emb = 300
n_out = n_vocab_rep
early_stop_patience = 5  # 早期終了のタイミング(誤差の最小値が何回更新されなかったら終了か)
num_layers = 1
bidirectional = True
dropout = 0.0
clip = 100

encoder = Encoder(n_h, n_vocab_inp, n_emb, num_layers, bidirectional)
decoder = Decoder(n_h, n_out, n_vocab_rep, n_emb, num_layers, dropout=dropout)
seq2seq = Seq2Seq(encoder, decoder, is_gpu=is_gpu)

seq2seq.load_state_dict(torch.load(path2+"chat_bot.pth", map_location=torch.device("cpu")))    #パラメータ読み込んでCPU対応


janome_tk = Tokenizer()
#入力に対して応答文生成する関数 定義
def reply(inp_text, tokenizer, max_length=10):    
    wakati_list = [token for token in janome_tk.tokenize(inp_text, wakati=True)]

    word_index = []
    for word in wakati_list:
        index = input_field.vocab.stoi[word]
        word_index.append(index)

    x = torch.tensor(word_index)    #テンソル型に変換する
    x = x.view(1, -1)        #バッチサイズの1に調整
    y = seq2seq.predict(x, max_length)   #予測文がインデックスで出力

    reply_text = ""
    for j in range(y.size()[1]):
        word = reply_field.vocab.itos[y[0][j]]#インデックスを単語に変換
        if word=="<eos>":
            break
        reply_text += word

    reply_text = reply_text.replace("<sos>", "")    #不要なものを削除
    reply_text = reply_text.replace("<eos>", "")
    reply_text = reply_text.replace("<pad>", "")
    reply_text = reply_text.replace("<unk>", "")

    return reply_text


#ノートブック上でAIと会話してみる
bot_name = "chat AI bot"
your_name = input("あなたの名前を教えてください:")
print()

print(bot_name + ": " + "こんにちは!" + your_name + "さん" )
message = ""
while message != "さようなら":
    message = input(your_name+ ": ")

    response = reply(message, janome_tk
                     , max_length=10)
    print(bot_name + ": " + response)

print(bot_name + ": 会話が出来て嬉しかったです!")

5.結果

結果は以下のような感じになりました。

result.py
あなたの名前を教えてください:mrtts0622

chat AI bot: こんにちはmrtts0622さん
mrtts0622: こんにちは
chat AI bot: こんにちはこんにちは
mrtts0622: かなり暖かくなってきましたね
chat AI bot: 
mrtts0622: お元気ですか
chat AI bot: 今日はいいですね
mrtts0622: 夏の風物詩といえば
chat AI bot: 海水浴スイカも好きです
mrtts0622: 少しおなかがすきました
chat AI bot: 海水浴か
mrtts0622: いやなんでやねん
chat AI bot: が好きです
mrtts0622: さようなら
chat AI bot: 
chat AI bot: 会話が出来て嬉しかったです

話し手が相手の意図を読み取ることで何とか会話が成立していますね。さすがにツッコミにまでは対応できなかったようです( ´∀` )
また、会話のデータに偏りがあったのか夏の話題には強いようで会話の節々に夏好きがあふれている気がします。

6.まとめ

PyTorchによるチャットボットの作成はかなり複雑で一行一行なにを行っているか理解するのにかなり時間がかかってしまいました。

また、精度を上げることもなかなか骨が折れる作業だったのでやはりLINEやGoogleはとても偉大であるということが実感できる制作でもありました。

今後の課題としては、
  ・Bertをつかって前後の文脈も考慮したより自然なボットを作る。
  ・twitterなどで、より自然でかつ大量の会話データを用意する。
 などが挙げられそうです。
また、僕自身の課題として来月はKaggleにも挑戦しようと思っています。

最後までお付き合いいただきありがとうございました!

参考にさせてもらった文献

雑談会話コーパス-対話破綻検出チャレンジhttps://sites.google.com/site/dialoguebreakdowndetection/chat-dialogue-corpus
○ 雑談対話APIサイト
https://www.nttdocomo.co.jp/service/developer/smart_phone/analysis/chat/

udemyの講座:人工知能(AI)を搭載したTwitterボットを作ろう
https://www.udemy.com/
PyTorchチュートリアル(日本語翻訳版)
https://yutaroogawa.github.io/pytorch_tutorials_jp/

1
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
5