#LSTM(Long Short Term Memory)
時系列データの分析においても、勾配消失問題(あるいは勾配爆発問題)が起きた。また、「今の時点では関係ないけれど、将来の時点では関係ある」という重みは大きくすべきであり、同時に小さくすべきであるという矛盾を抱えることになった(入力重み衝突)。こうした問題を解決するために考えられたのが、LSTMと呼ばれる手法である。
大まかには2つの機構から成っている。
・誤差を内部にとどまらせるためのセル
・必要な情報を必要なタイミングで保持・忘却させるためのゲート
セルはCEC(Constant Error Carousel)と言われ、誤差を内部にとどめ、勾配消失を防ぐためのものになる。このCECは記憶しかしない。学習をすることはない。
一方、学習する機能を持たせるため、「入力ゲート」「忘却ゲート」「出力ゲート」を設けた。入力重み衝突、出力重み衝突のためのゲート機構、そして忘却ゲートは誤差が過剰にセルに停留するのを防ぐために、リセットの役割を果たすためのゲート機構になる。
答え:忘却ゲート
答え
LSTMの課題:4つの部品「CEC」「入力ゲート」「出力ゲート」「忘却ゲート」があり、多くのパラメータを更新する必要があり、計算量が多くなっていること
CECの課題:記憶機能しかない、つまり学習能力がない。そのため、周囲に「入力ゲート」「出力ゲート」「忘却ゲート」を用意しなければならなくなった。
#GRU(Gated Recurrent Unit)
LSTMは時系列データを扱ううえではデファクトスタンダードになっているモデルとも言える。一方、LSTMはセルやゲートそれぞれを最適化しなくてはならないため、計算量を多く要す。そのため、LSTMを少し簡略化したGRUと呼ばれる手法が代わりに用いられることが多い。
【実装例】単語の予測
import tensorflow as tf
import numpy as np
import re
import glob
import collections
import random
import pickle
import time
import datetime
import os
# logging levelを変更
#tf.logging.set_verbosity(tf.logging.ERROR)
class Corpus:
def __init__(self):
self.unknown_word_symbol = "<???>" # 出現回数の少ない単語は未知語として定義しておく
self.unknown_word_threshold = 3 # 未知語と定義する単語の出現回数の閾値
self.corpus_file = "./corpus/**/*.txt"
self.corpus_encoding = "utf-8"
self.dictionary_filename = "./data_for_predict/word_dict.dic"
self.chunk_size = 5
self.load_dict()
words = []
for filename in glob.glob(self.corpus_file, recursive=True):
with open(filename, "r", encoding=self.corpus_encoding) as f:
# word breaking
text = f.read()
# 全ての文字を小文字に統一し、改行をスペースに変換
text = text.lower().replace("\n", " ")
# 特定の文字以外の文字を空文字に置換する
text = re.sub(r"[^a-z '\-]", "", text)
# 複数のスペースはスペース一文字に変換
text = re.sub(r"[ ]+", " ", text)
# 前処理: '-' で始まる単語は無視する
words = [ word for word in text.split() if not word.startswith("-")]
self.data_n = len(words) - self.chunk_size
self.data = self.seq_to_matrix(words)
def prepare_data(self):
"""
訓練データとテストデータを準備する。
data_n = ( text データの総単語数 ) - chunk_size
input: (data_n, chunk_size, vocabulary_size)
output: (data_n, vocabulary_size)
"""
# 入力と出力の次元テンソルを準備
all_input = np.zeros([self.chunk_size, self.vocabulary_size, self.data_n])
all_output = np.zeros([self.vocabulary_size, self.data_n])
# 準備したテンソルに、コーパスの one-hot 表現(self.data) のデータを埋めていく
# i 番目から ( i + chunk_size - 1 ) 番目までの単語が1組の入力となる
# このときの出力は ( i + chunk_size ) 番目の単語
for i in range(self.data_n):
all_output[:, i] = self.data[:, i + self.chunk_size] # (i + chunk_size) 番目の単語の one-hot ベクトル
for j in range(self.chunk_size):
all_input[j, :, i] = self.data[:, i + self.chunk_size - j - 1]
# 後に使うデータ形式に合わせるために転置を取る
all_input = all_input.transpose([2, 0, 1])
all_output = all_output.transpose()
# 訓練データ:テストデータを 4 : 1 に分割する
training_num = ( self.data_n * 4 ) // 5
return all_input[:training_num], all_output[:training_num], all_input[training_num:], all_output[training_num:]
def build_dict(self):
# コーパス全体を見て、単語の出現回数をカウントする
counter = collections.Counter()
for filename in glob.glob(self.corpus_file, recursive=True):
with open(filename, "r", encoding=self.corpus_encoding) as f:
# word breaking
text = f.read()
# 全ての文字を小文字に統一し、改行をスペースに変換
text = text.lower().replace("\n", " ")
# 特定の文字以外の文字を空文字に置換する
text = re.sub(r"[^a-z '\-]", "", text)
# 複数のスペースはスペース一文字に変換
text = re.sub(r"[ ]+", " ", text)
# 前処理: '-' で始まる単語は無視する
words = [word for word in text.split() if not word.startswith("-")]
counter.update(words)
# 出現頻度の低い単語を一つの記号にまとめる
word_id = 0
dictionary = {}
for word, count in counter.items():
if count <= self.unknown_word_threshold:
continue
dictionary[word] = word_id
word_id += 1
dictionary[self.unknown_word_symbol] = word_id
print("総単語数:", len(dictionary))
# 辞書を pickle を使って保存しておく
with open(self.dictionary_filename, "wb") as f:
pickle.dump(dictionary, f)
print("Dictionary is saved to", self.dictionary_filename)
self.dictionary = dictionary
print(self.dictionary)
def load_dict(self):
with open(self.dictionary_filename, "rb") as f:
self.dictionary = pickle.load(f)
self.vocabulary_size = len(self.dictionary)
self.input_layer_size = len(self.dictionary)
self.output_layer_size = len(self.dictionary)
print("総単語数: ", self.input_layer_size)
def get_word_id(self, word):
# print(word)
# print(self.dictionary)
# print(self.unknown_word_symbol)
# print(self.dictionary[self.unknown_word_symbol])
# print(self.dictionary.get(word, self.dictionary[self.unknown_word_symbol]))
return self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])
# 入力された単語を one-hot ベクトルにする
def to_one_hot(self, word):
index = self.get_word_id(word)
data = np.zeros(self.vocabulary_size)
data[index] = 1
return data
def seq_to_matrix(self, seq):
print(seq)
data = np.array([self.to_one_hot(word) for word in seq]) # (data_n, vocabulary_size)
return data.transpose() # (vocabulary_size, data_n)
class Language:
"""
input layer: self.vocabulary_size
hidden layer: rnn_size = 30
output layer: self.vocabulary_size
"""
def __init__(self):
self.corpus = Corpus()
self.dictionary = self.corpus.dictionary
self.vocabulary_size = len(self.dictionary) # 単語数
self.input_layer_size = self.vocabulary_size # 入力層の数
self.hidden_layer_size = 30 # 隠れ層の RNN ユニットの数
self.output_layer_size = self.vocabulary_size # 出力層の数
self.batch_size = 128 # バッチサイズ
self.chunk_size = 5 # 展開するシーケンスの数。c_0, c_1, ..., c_(chunk_size - 1) を入力し、c_(chunk_size) 番目の単語の確率が出力される。
self.learning_rate = 0.005 # 学習率
self.epochs = 1000 # 学習するエポック数
self.forget_bias = 1.0 # LSTM における忘却ゲートのバイアス
self.model_filename = "./data_for_predict/predict_model.ckpt"
self.unknown_word_symbol = self.corpus.unknown_word_symbol
def inference(self, input_data, initial_state):
"""
:param input_data: (batch_size, chunk_size, vocabulary_size) 次元のテンソル
:param initial_state: (batch_size, hidden_layer_size) 次元の行列
:return:
"""
# 重みとバイアスの初期化
hidden_w = tf.Variable(tf.truncated_normal([self.input_layer_size, self.hidden_layer_size], stddev=0.01))
hidden_b = tf.Variable(tf.ones([self.hidden_layer_size]))
output_w = tf.Variable(tf.truncated_normal([self.hidden_layer_size, self.output_layer_size], stddev=0.01))
output_b = tf.Variable(tf.ones([self.output_layer_size]))
# BasicLSTMCell, BasicRNNCell は (batch_size, hidden_layer_size) が chunk_size 数ぶんつながったリストを入力とする。
# 現時点での入力データは (batch_size, chunk_size, input_layer_size) という3次元のテンソルなので
# tf.transpose や tf.reshape などを駆使してテンソルのサイズを調整する。
input_data = tf.transpose(input_data, [1, 0, 2]) # 転置。(chunk_size, batch_size, vocabulary_size)
input_data = tf.reshape(input_data, [-1, self.input_layer_size]) # 変形。(chunk_size * batch_size, input_layer_size)
input_data = tf.matmul(input_data, hidden_w) + hidden_b # 重みWとバイアスBを適用。 (chunk_size, batch_size, hidden_layer_size)
input_data = tf.split(input_data, self.chunk_size, 0) # リストに分割。chunk_size * (batch_size, hidden_layer_size)
# RNN のセルを定義する。RNN Cell の他に LSTM のセルや GRU のセルなどが利用できる。
cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size)
outputs, states = tf.nn.static_rnn(cell, input_data, initial_state=initial_state)
# 最後に隠れ層から出力層につながる重みとバイアスを処理する
# 最終的に softmax 関数で処理し、確率として解釈される。
# softmax 関数はこの関数の外で定義する。
output = tf.matmul(outputs[-1], output_w) + output_b
return output
def loss(self, logits, labels):
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))
return cost
def training(self, cost):
# 今回は最適化手法として Adam を選択する。
# ここの AdamOptimizer の部分を変えることで、Adagrad, Adadelta などの他の最適化手法を選択することができる
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(cost)
return optimizer
def train(self):
# 変数などの用意
input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
actual_labels = tf.placeholder("float", [None, self.output_layer_size])
initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
prediction = self.inference(input_data, initial_state)
cost = self.loss(prediction, actual_labels)
optimizer = self.training(cost)
correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(actual_labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
# TensorBoard で可視化するため、クロスエントロピーをサマリーに追加
tf.summary.scalar("Cross entropy: ", cost)
summary = tf.summary.merge_all()
# 訓練・テストデータの用意
# corpus = Corpus()
trX, trY, teX, teY = self.corpus.prepare_data()
training_num = trX.shape[0]
# ログを保存するためのディレクトリ
timestamp = time.time()
dirname = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S")
# ここから実際に学習を走らせる
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
summary_writer = tf.summary.FileWriter("./log/" + dirname, sess.graph)
# エポックを回す
for epoch in range(self.epochs):
step = 0
epoch_loss = 0
epoch_acc = 0
# 訓練データをバッチサイズごとに分けて学習させる (= optimizer を走らせる)
# エポックごとの損失関数の合計値や(訓練データに対する)精度も計算しておく
while (step + 1) * self.batch_size < training_num:
start_idx = step * self.batch_size
end_idx = (step + 1) * self.batch_size
batch_xs = trX[start_idx:end_idx, :, :]
batch_ys = trY[start_idx:end_idx, :]
_, c, a = sess.run([optimizer, cost, accuracy],
feed_dict={input_data: batch_xs,
actual_labels: batch_ys,
initial_state: np.zeros([self.batch_size, self.hidden_layer_size])
}
)
epoch_loss += c
epoch_acc += a
step += 1
# コンソールに損失関数の値や精度を出力しておく
print("Epoch", epoch, "completed ouf of", self.epochs, "-- loss:", epoch_loss, " -- accuracy:",
epoch_acc / step)
# Epochが終わるごとにTensorBoard用に値を保存
summary_str = sess.run(summary, feed_dict={input_data: trX,
actual_labels: trY,
initial_state: np.zeros(
[trX.shape[0],
self.hidden_layer_size]
)
}
)
summary_writer.add_summary(summary_str, epoch)
summary_writer.flush()
# 学習したモデルも保存しておく
saver = tf.train.Saver()
saver.save(sess, self.model_filename)
# 最後にテストデータでの精度を計算して表示する
a = sess.run(accuracy, feed_dict={input_data: teX, actual_labels: teY,
initial_state: np.zeros([teX.shape[0], self.hidden_layer_size])})
print("Accuracy on test:", a)
def predict(self, seq):
"""
文章を入力したときに次に来る単語を予測する
:param seq: 予測したい単語の直前の文字列。chunk_size 以上の単語数が必要。
:return:
"""
# 最初に復元したい変数をすべて定義してしまいます
tf.reset_default_graph()
input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
prediction = tf.nn.softmax(self.inference(input_data, initial_state))
predicted_labels = tf.argmax(prediction, 1)
# 入力データの作成
# seq を one-hot 表現に変換する。
words = [word for word in seq.split() if not word.startswith("-")]
x = np.zeros([1, self.chunk_size, self.input_layer_size])
for i in range(self.chunk_size):
word = seq[len(words) - self.chunk_size + i]
index = self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])
x[0][i][index] = 1
feed_dict = {
input_data: x, # (1, chunk_size, vocabulary_size)
initial_state: np.zeros([1, self.hidden_layer_size])
}
# tf.Session()を用意
with tf.Session() as sess:
# 保存したモデルをロードする。ロード前にすべての変数を用意しておく必要がある。
saver = tf.train.Saver()
saver.restore(sess, self.model_filename)
# ロードしたモデルを使って予測結果を計算
u, v = sess.run([prediction, predicted_labels], feed_dict=feed_dict)
keys = list(self.dictionary.keys())
# コンソールに文字ごとの確率を表示
for i in range(self.vocabulary_size):
c = self.unknown_word_symbol if i == (self.vocabulary_size - 1) else keys[i]
print(c, ":", u[0][i])
print("Prediction:", seq + " " + ("<???>" if v[0] == (self.vocabulary_size - 1) else keys[v[0]]))
return u[0]
def build_dict():
cp = Corpus()
cp.build_dict()
if __name__ == "__main__":
#build_dict()
ln = Language()
# 学習するときに呼び出す
#ln.train()
# 保存したモデルを使って単語の予測をする
ln.predict("some of them looks like")
答え
GRUの方が計算量が少なくてすむ。
GRUでは、リセットゲート、更新ゲートという2つのゲートで、LSTMの「入力ゲート」「出力ゲート」「忘却ゲート」の代わりを果たしている