1.はじめに
今回 Aidemyのpremium planで三か月の間、自然言語処理を学びその知識の定着、復習の意を込め成果物として深層学習によるチャットボットを作成してみました。
Qiitaによる投稿も初めてですので拙い文章だとは思いますが、それでも良い方はご覧ください!
2.使用環境
今回は処理するデータ量が多いためGoogle Colaboratory環境で、無料で使えるGPUを使用しました。
フレームワークのバージョンは以下の通りです。
・janome 0.4.1
・torchvision 0.7.0
・torchtext 0.7.0
・PyTorch 1.6.0
3.記事の内容
タイトルにもあるように、今回は会話の入力に対して、深層学習を用いて応答を予測し出力させるモデルを構築し、ノートブック上で繰り返し会話できるボットを作ることを目標にコーディングしました。
深層学習のフレームワークとしてKeras、Tensorflow等、いろいろなものがありますが今回は最近流行りのPyTorchを使いました。
PyTorchを選んだ理由は、以下のように考えたからです。
① コードが比較的短く、理解すれば便利である
② 入力と正解のペアをまとめて学習できるため、会話を成立させるという
点で有利に働く
③ 今後の開発につながる
また、学習するにあたりデータの前処理、PyTorchによる深層学習はudemyの講座を参考にさせていただきました。
PyTorchによる学習は初めてだったので、かなりつまずきながら手探り状態でしたが、自分でコーディングすることでPyTorchの概要、実装方法などかなり実践的な知識が身についたと思います。
4.実装の手順
それではコードのほうに移っていきます。
工程は大まかに分けて3つです。
① データの前処理
今回訓練に使用した会話コーパスは、NTT ドコモが一般公開している雑談会話コーパスになります。ライセンスはダウンロードページ記載のものに準拠します。
こちらは約1100もの対話が収録されている会話コーパスで、雑談対話APIと人間との間で会話が行われているため、明らかに破綻している会話も収録されているという点も考慮して前処理を行います。
また、チャットボットの作成には約10万もの会話データが必要になるといわれているので今回は一つの入力文に対し、重要である単語を含んだ複数の応答文でペアを作るという方法を用いてデータの水増しを行っています。
以下が、コードになります。
!pip install janome==0.4.1
!pip install torchvision==0.7.0
!pip install torchtext==0.7.0
!pip install torch==1.6.0
# google driveをマウントします
from google.colab import drive
drive.mount("/content/drive/")
# 先に必要なものをインポートしときます
import glob
import json
import re
from janome.tokenizer import Tokenizer
import csv
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import dill
path = "/content/drive/My Drive/対話コーパス 前処理/NTT data/projectnextnlp-chat-dialogue-corpus/json"
# 成り立っている会話を訓練用データとテストデータに分割して、保存します。
files = glob.glob(path + "/*/*.json")
dialogues = []
for file in files:
with open(file, "r") as f:
json_dic = json.load(f)
dialogue = []
for turn in json_dic["turns"]: #会話データの["turns"]には各項目が格納されている
annotations = turn["annotations"] #注釈
speaker = turn["speaker"] #話者
utterance = turn["utterance"] #会話内容
utterance = utterance.replace(".", "。").replace(",", "、") # 全角を統一
utterance = utterance.replace(".", "。").replace(",", "、") # 半角を統一
utterance = utterance.split("。")[0] #ここでは"。"が文中にある長い文章を短くする
if speaker=="U": # "U"=人間
dialogue.append(utterance)
else:
is_wrong = False
for annotation in annotations:
breakdown = annotation["breakdown"] #会話が破綻しているかどうか
if breakdown=="X":
is_wrong = True
break
if is_wrong:
dialogue.clear() #破綻していたらデータをクリア
else:
dialogue.append(utterance)
if len(dialogue) >= 2: #会話成立で格納
dialogues.append(dialogue.copy())
dialogue.pop(0)
# 会話の分かち書きを行い、入力と応答それぞれのリストに格納します。
re_kanji = re.compile(r"^[\u4E00-\u9FD0]+$") # 漢字の検出用
re_katakana = re.compile(r"[\u30A1-\u30F4]+") # カタカナの検出用
janome_tk = Tokenizer()
def tokenizer(text):
return [token for token in janome_tk.tokenize(text, wakati=True)]
wakati_input = [] #水増しのために入力文と応答文を分けます
wakati_reply = []
for dialogue in dialogues:
wakati_input.append(tokenizer(dialogue[0])[:10])
wakati_reply.append(tokenizer(dialogue[1])[:10])
# 入力分を複数の応答分と組み合わせてデータの水増しを行います。
dialogues_mizumasi = [] #ここで水増しにより会話データ約10万個
for i, word_input in enumerate(wakati_input):
inp_count = 0
for j, word_reply in enumerate(wakati_reply):
if i==j:
dialogues_mizumasi.append(["".join(word_input), "".join(word_reply)])
continue
#入力と応答の類似度を判断し、類似度が1より大きいものを会話に追加します
similarity = 0
for word in word_input: #カタカナや漢字を重要付けている
if (word in word_reply) and (re_kanji.fullmatch(word) or re_katakana.fullmatch(word)):
similarity += 1
if similarity >= 1:
dialogue_mizumasi = ["".join(word_input), "".join(word_reply)]
if dialogue_mizumasi not in dialogues_mizumasi:
dialogues_mizumasi.append(dialogue_mizumasi)
inp_count += 1
if inp_count >= 12: #1つの入力に対し、対応する応答文は12まで
break
dialogues = dialogues_mizumasi
# 作成した会話のリストを訓練用データとテストデータに分けて格納します。
dialogues_train, dialogues_test = train_test_split(dialogues, shuffle=True, test_size=0.05)
# 入力分と応答分のデータセットの列を定義します
input_field = torchtext.data.Field(
sequential=True, #文章データのため可変にする
tokenize=tokenizer, #分かち書き
batch_first=True,
lower=True
)
reply_field = torchtext.data.Field(
sequential=True,
tokenize=tokenizer,
init_token="<sos>", #文のはじめに加える
eos_token="<eos>", #文の終わりに加える
batch_first=True,
lower=True
)
path2 = "/content/drive/My Drive/対話コーパス 前処理/NTT data/"
# データセットを作成します
train_data, test_data = torchtext.data.TabularDataset.splits(
path=path2,
train=mizumasi_dialogues_train,
test=mizumasi_dialogues_test,
format="csv",
fields=[("input_text", input_field), ("reply_text", reply_field)]
)
# 辞書の作成をします(出現頻度が3以下のものは除く)
input_field.build_vocab(
train_data,
min_freq=3
)
reply_field.build_vocab(
train_data,
min_freq=3,
)
# データセットを保存します
torch.save(train_data.examples, path+"mizumasi_train_examples.pkl", pickle_module=dill)
torch.save(test_data.examples, path+"mizumasi_test_examples.pkl", pickle_module=dill)
torch.save(input_field, path2+"mizumasi_input.pkl", pickle_module=dill)
torch.save(reply_field, path2+"mizumasi_reply.pkl", pickle_module=dill)
②訓練モデルを作成し、学習させる
次に、訓練モデルの作成と学習を行います。
流れとしては、
1.Encoder,Decoder,Seq2Seqのクラス作成
2.誤差を測定する評価関数を定義
3.これらを用いて、誤差が小さくなるようモデルにデータを学習させる
となります。
以下がコードになります。
#Batchの設定を行います
batch_size = 32
train_iterator = torchtext.data.Iterator(
train_data,
batch_size=batch_size,
train=True
)
test_iterator = torchtext.data.Iterator(
test_data,
batch_size=batch_size,
train=False,
sort=False
)
#Encoderクラスを作成します#
class Encoder(nn.Module):
def __init__(self, n_h, n_vocab, n_emb, num_layers=1, bidirectional=False, dropout=0):
super().__init__()
self.n_h = n_h
self.num_layers = num_layers
self.bidirectional = bidirectional
self.dropout = dropout
self.embedding = nn.Embedding(n_vocab, n_emb)
self.embedding_dropout = nn.Dropout(self.dropout)
self.gru = nn.GRU(
input_size=n_emb, #入力サイズ
hidden_size=n_h,#隠れ層のサイズ(ニューロン数)
batch_first=True, #(バッチサイズ、時系列、入力数)
num_layers=num_layers,
bidirectional=bidirectional,
)
#順伝播の関数を定義
def forward(self, x):
index_pad = input_field.vocab.stoi["<pad>"] #"pad"のインデックス取得
sentence_lengths = x.size()[1] - (x == index_pad).sum(dim=1) #pad部分を引いて本来の文の長さを得る。
y = self.embedding(x) #埋め込みベクトル化
y = self.embedding_dropout(y)#過学習対策
y = nn.utils.rnn.pack_padded_sequence(
y,
sentence_lengths,
batch_first=True,
enforce_sorted=False
) #rnnに入れるためにpackedsequence型にする。
y, h = self.gru(y) #encoderでの出力と隠れ層の値を取得
y, _ = nn.utils.rnn.pad_packed_sequence(y, batch_first=True) #テンソルに戻す
if self.bidirectional:
#会話データにおける最後の重みが大きくなってしまうため
y = y[:, :, :self.n_h] + y[:, :, self.n_h:]#ここでは出力と隠れ層の値は双方向の時間の値を足したものになる。
h = h[:self.num_layers] + h[self.num_layers:]
return y,h
# Decoderクラスを作成します
class Decoder(nn.Module):
def __init__(self, n_h, n_out, n_vocab, n_emb, num_layers=1, dropout=0):
super().__init__()
self.n_h = n_h
self.n_out = n_out
self.num_layers = num_layers
self.dropout = dropout
self.embedding = nn.Embedding(n_vocab, n_emb)
self.embedding_dropout = nn.Dropout(self.dropout)
self.gru = nn.GRU(
input_size=n_emb,
hidden_size=n_h,
batch_first=True,
num_layers=num_layers,
)
self.fc = nn.Linear(n_h*2, self.n_out) #全結合層の導入
def forward(self, x, h_encoder, y_encoder):
y = self.embedding(x) # 単語をベクトルに変換
y = self.embedding_dropout(y)
y, h = self.gru(y, h_encoder) #ここでは出力と最後の時刻の隠れ層の値が渡される
# Attention
y_tr = torch.transpose(y, 1, 2) # 次元1と次元2を入れ替える
ed_mat = torch.bmm(y_encoder, y_tr) # バッチごとに行列積
attn_weight = F.softmax(ed_mat, dim=1) # attention weightの計算
attn_weight_tr = torch.transpose(attn_weight, 1, 2) # 次元1と次元2を入れ替える
context = torch.bmm(attn_weight_tr, y_encoder) # コンテキストベクトルの計算
y = torch.cat([y, context], dim=2) # 出力とコンテキストベクトルの合流
y = self.fc(y)
y = F.softmax(y, dim=2)
return y, h
# seq2seqクラスを作成します
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, is_gpu=True):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.is_gpu = is_gpu
if self.is_gpu:
self.encoder.cuda()
self.decoder.cuda()
def forward(self, x_encoder, x_decoder): #順伝播メソッドの設定 訓練時につかう
if self.is_gpu:
x_encoder = x_encoder.cuda()
x_decoder = x_decoder.cuda()
batch_size = x_decoder.shape[0] #x_decoderは(バッチサイズ,時系列の数,ニューロン数)
n_time = x_decoder.shape[1]
y_encoder, h = self.encoder(x_encoder)
y_decoder = torch.zeros(batch_size, n_time, self.decoder.n_out) #バッチサイズ*時系列の数*出力数のすべて0のテンソル
if self.is_gpu:
y_decoder = y_decoder.cuda()
# 教師強制と呼ばれる方法で各時刻の出力が次の入力に近くなるよう学習していきます。
for t in range(0, n_time): #各時刻で処理
x = x_decoder[:, t:t+1]
y, h = self.decoder(x, h, y_encoder)
y_decoder[:, t:t+1, :] = y #得られた出力yを各時刻ごとにy_encoderに格納
return y_decoder
def predict(self, x_encoder): #予測に使用 文章の生成、評価するときに使う
if self.is_gpu:
x_encoder = x_encoder.cuda()
batch_size = x_encoder.shape[0]
n_time = x_encoder.shape[1]
y_encoder, h = self.encoder(x_encoder)
y_decoder = torch.zeros(batch_size, n_time, dtype=torch.long) #ここではint型を用いるためtorch.long
if self.is_gpu:
y_decoder = y_decoder.cuda()
y = torch.ones(batch_size, 1, dtype=torch.long) * input_field.vocab.stoi["<sos>"] #予測用では各時刻で処理を行う必要がある。出力が次の入力になるから
for t in range(0, n_time):
x = y #ここでまえの出力を入力に
if self.is_gpu:
x = x.cuda()
y, h = self.decoder(x, h, y_encoder)
y = y.argmax(2) #yの中で最も大きい数値のインデックス得ることで最適な文章が生成
y_decoder[:, t:t+1] = y
return y_decoder
# 評価関数を定義します
def evaluate(model, iterator):
model.eval() # 評価モードにできる
batch = next(iter(iterator))
x = batch.inp_text
y = model.predict(x)
for i in range(x.size()[0]):
inp_text = ""
for j in range(x.size()[1]):
word = input_field.vocab.itos[x[i][j]]
if word=="<pad>":
break
inp_text += word
rep_text = ""
for j in range(y.size()[1]):
word = reply_field.vocab.itos[y[i][j]]
if word=="<eos>":
break
rep_text += word
print("input:", inp_text)
print("reply:", rep_text)
print()
is_gpu = True # GPUを使用するかどうか
n_h = 800 #隠れ層のニューロン数
n_vocab_inp = len(input_field.vocab.itos) #入力文の長さ
n_vocab_rep = len(reply_field.vocab.itos) #応答文の長さ
n_emb = 300 #埋め込みベクトルの要素数
n_out = n_vocab_rep #出力の数
early_stop_patience = 5 # 早期終了のタイミング(誤差の最小値が何回更新されなかったら終了か)
num_layers = 1 #中間層の数
bidirectional = True
dropout = 0.1
clip = 100 #勾配の上限
# Seq2Seqのモデルを構築します
encoder = Encoder(n_h, n_vocab_inp, n_emb, num_layers, bidirectional, dropout=dropout)
decoder = Decoder(n_h, n_out, n_vocab_rep, n_emb, num_layers, dropout=dropout)
seq2seq = Seq2Seq(encoder, decoder, is_gpu=is_gpu)
# 誤差関数(今回は分類問題なのでクロスエントロピー誤差を使います)
loss_fnc = nn.CrossEntropyLoss(ignore_index=reply_field.vocab.stoi["<pad>"])
# 最適化アルゴリズム
optimizer_enc = optim.Adam(seq2seq.parameters(), lr=0.0001)
optimizer_dec = optim.Adam(seq2seq.parameters(), lr=0.0005)
record_loss_train = []
record_loss_test = []
min_losss_test = 0.0
# 学習
for i in range(1000): #1000epoch 学習
seq2seq.train()
loss_train = 0
for j, batch in enumerate(train_iterator):
inp, rep = batch.inp_text, batch.rep_text
x_enc = inp
x_dec = rep[:, :-1]
y_dec = seq2seq(x_enc, x_dec)
t_dec = rep[:, 1:]
t_dec = t_dec.cuda() if is_gpu else t_dec
loss = loss_fnc(
y_dec.view(-1, y_dec.size()[2]),
t_dec.reshape(-1)
)
loss_train += loss.item()
optimizer_enc.zero_grad()
optimizer_dec.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(encoder.parameters(), clip)
nn.utils.clip_grad_norm_(decoder.parameters(), clip)
optimizer_enc.step()
optimizer_dec.step()
if j%1000==0:
print("batch:", str(j)+"/"+str(len(train_data)//batch_size+1), "loss:", loss.item())
loss_train /= j+1
record_loss_train.append(loss_train)
# 評価モードにします
seq2seq.eval()
loss_test = 0
for j, batch in enumerate(test_iterator):
inp, rep = batch.inp_text, batch.rep_text
x_enc = inp
x_dec = torch.ones(rep.size(), dtype=torch.long) * reply_field.vocab.stoi["<sos>"]
x_dec[:, 1:] = rep[:, :-1]
y_dec = seq2seq(x_enc, x_dec)
t_dec = rep.cuda() if is_gpu else rep
loss = loss_fnc(
y_dec.view(-1, y_dec.size()[2]),
t_dec.view(-1)
)
loss_test += loss.item()
loss_test /= j+1
record_loss_test.append(loss_test)
if i%1 == 0:
print("Epoch:", i, "Loss_Train:", loss_train, "Loss_Test:", loss_test)
print()
evaluate(seq2seq, test_iterator)
#早期終了の設定をします
latest_min = min(record_loss_test[-(early_stop_patience):]) # 直近の最小値
if len(record_loss_test) >= early_stop_patience:
if latest_min > min_loss_test: # 直近で最小値が更新されていなければ
print("Early stopping!")
break
min_loss_test = latest_min
else:
min_loss_test = latest_min
torch.save(seq2seq.state_dict(), path2+"chat_bot.pth") #学習済みのモデルのステータスを保存
③学習済みモデルを使ってユーザーの入力に対し応答を予測し、出力
最後に、ユーザーの入力を解析し、応答を出力するといった簡易的なチャットボットをノートブック上で動かします。
今回はユーザーが"さようなら"と入力するまで会話を続けることにしました。
以下が、コードになります。
# モデルの読み込み
import dill
path2 = "/content/drive/My Drive/対話コーパス 前処理/NTT data/"
input_field = torch.load(path2+"input_field.pkl", pickle_module=dill) #インデックスと単語の対応付け
reply_field = torch.load(path2+"reply_field.pkl", pickle_module=dill)
is_gpu = True
n_h = 800
n_vocab_inp = len(input_field.vocab.itos)
n_vocab_rep = len(reply_field.vocab.itos)
n_emb = 300
n_out = n_vocab_rep
early_stop_patience = 5 # 早期終了のタイミング(誤差の最小値が何回更新されなかったら終了か)
num_layers = 1
bidirectional = True
dropout = 0.0
clip = 100
encoder = Encoder(n_h, n_vocab_inp, n_emb, num_layers, bidirectional)
decoder = Decoder(n_h, n_out, n_vocab_rep, n_emb, num_layers, dropout=dropout)
seq2seq = Seq2Seq(encoder, decoder, is_gpu=is_gpu)
seq2seq.load_state_dict(torch.load(path2+"chat_bot.pth", map_location=torch.device("cpu"))) #パラメータ読み込んでCPU対応
janome_tk = Tokenizer()
# 入力に対して応答文生成する関数 定義
def reply(inp_text, tokenizer, max_length=10):
wakati_list = [token for token in janome_tk.tokenize(inp_text, wakati=True)]
word_index = []
for word in wakati_list:
index = input_field.vocab.stoi[word]
word_index.append(index)
x = torch.tensor(word_index) #テンソル型に変換する
x = x.view(1, -1) #バッチサイズの1に調整
y = seq2seq.predict(x, max_length) #予測文がインデックスで出力
reply_text = ""
for j in range(y.size()[1]):
word = reply_field.vocab.itos[y[0][j]]#インデックスを単語に変換
if word=="<eos>":
break
reply_text += word
reply_text = reply_text.replace("<sos>", "") #不要なものを削除
reply_text = reply_text.replace("<eos>", "")
reply_text = reply_text.replace("<pad>", "")
reply_text = reply_text.replace("<unk>", "")
return reply_text
# ノートブック上でAIと会話してみる
bot_name = "chat AI bot"
your_name = input("あなたの名前を教えてください:")
print()
print(bot_name + ": " + "こんにちは!" + your_name + "さん" )
message = ""
while message != "さようなら":
message = input(your_name+ ": ")
response = reply(message, janome_tk
, max_length=10)
print(bot_name + ": " + response)
print(bot_name + ": 会話が出来て嬉しかったです!")
5.結果
結果は以下のような感じになりました。
あなたの名前を教えてください:mrtts0622
chat AI bot: こんにちは!mrtts0622さん
mrtts0622: こんにちは
chat AI bot: こんにちはこんにちは
mrtts0622: かなり暖かくなってきましたね
chat AI bot:
mrtts0622: お元気ですか?
chat AI bot: 今日はいいですね
mrtts0622: 夏の風物詩といえば?
chat AI bot: 海水浴スイカも好きです
mrtts0622: 少しおなかがすきました
chat AI bot: 海水浴か?
mrtts0622: いや、なんでやねん
chat AI bot: が好きです
mrtts0622: さようなら
chat AI bot:
chat AI bot: 会話が出来て嬉しかったです!
話し手が相手の意図を読み取ることで何とか会話が成立していますね。さすがにツッコミにまでは対応できなかったようです( ´∀` )
また、会話のデータに偏りがあったのか夏の話題には強いようで会話の節々に夏好きがあふれている気がします。
6.まとめ
PyTorchによるチャットボットの作成はかなり複雑で一行一行なにを行っているか理解するのにかなり時間がかかってしまいました。
また、精度を上げることもなかなか骨が折れる作業だったのでやはりLINEやGoogleはとても偉大であるということが実感できる制作でもありました。
今後の課題としては、
・Bertをつかって前後の文脈も考慮したより自然なボットを作る。
・twitterなどで、より自然でかつ大量の会話データを用意する。
などが挙げられそうです。
また、僕自身の課題として来月はKaggleにも挑戦しようと思っています。
最後までお付き合いいただきありがとうございました!
参考にさせてもらった文献
雑談会話コーパス-対話破綻検出チャレンジhttps://sites.google.com/site/dialoguebreakdowndetection/chat-dialogue-corpus
○ 雑談対話APIサイト
https://www.nttdocomo.co.jp/service/developer/smart_phone/analysis/chat/
udemyの講座:人工知能(AI)を搭載したTwitterボットを作ろう
https://www.udemy.com/
PyTorchチュートリアル(日本語翻訳版)
https://yutaroogawa.github.io/pytorch_tutorials_jp/