はじめに
初投稿です。よろしくお願いします。
趣味でMastodon上で稼働するbotを作ったりしています。開発する上でQiitaにはいっぱいお世話になったので、得た知識などフィードバックさせていただこうと思います。
開発環境
ubuntu18.04/python3.6/cuda9.0/tensorflow1.9.0(GPU版)
前提条件
DeepLearningやkerasにある程度入門できている方を想定しています。
過去の投稿内容を元に、LSTMを利用して日本語文章を自動生成する
作ったbotの機能の一つとしてテキトーなつぶやき(のようなもの)を自動投稿する、というものがあります。
その実例を元に解説します。
ベースにしたサンプルは keras github(example lstm_text_generation)です。
データを集める
MastodonはAPIが公開されているので、今回はそれを利用して必要な投稿をテキストデータ取得しています。(ここでは詳細は省略しますが、50MB程度のサイズになりました)
モデル構築
文章の自動生成を考える上で、問題を解くためのモデルを考えます。サンプルを見ると以下のような考えのモデルのようです。
- 使える文字の種類はm個
- n個の連続した文字を与えると、続く次の1文字は何かを予想するようなモデル(m個の分類モデル)
- 学習データは、上記で事前準備したテキストを読み込んで「n個の文字、次の1文字(正解ラベル)」の対を作って読み込ませる
素人なりにサンプルをアレンジして以下のようにしてみました。
def lstm_model(maxlen, wl_chars):
model = Sequential()
model.add(CuDNNLSTM(512, return_sequences=True, input_shape=(maxlen, len(wl_chars))))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(128, return_sequences=True))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(64, return_sequences=True))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(32))
model.add(Dropout(0.2))
model.add(Dense(len(wl_chars), activation='softmax'))
return model
maxlen:前述のn個の連続した文字、のnに当たります。サンプル(英文)では40文字でしたが、日本語なら25文字くらいでいいだろうと思いました(勘)。
len(wl_chars):前述の使える文字の種類mに当たります。(wl_charsに文字がリストとして羅列されてるイメージです。事前に頻出する文字をカウントしてTOPの約2000文字を選んでいます)
LSTM層:今回のメインです。効果は不明ですが4層重ねてます。return_sequences=Trueにしてますが、ドキュメントを読んでもあまりよくわかってないです。最後の層は指定いらないようです。
Dropout層:過学習防止のために入れています。0.2が妥当かどうかはわからないですが・・・
Dense層:文字数の種類(m種)分だけ分類します。
Activation層:softmaxで合計1.0になるように各文字の出現割合を計算してくれます。
#学習
##データジェネレータ
学習に使うデータやn,mのサイズによりますが、一度にメモリに展開できなかったため、kerasに用意されている「Sequence」クラスを使ってイテレートの度にデータを返すデータジェネレータを作成します。
def sample(preds, temperature=1.0):
# helper function to sample an index from a probability array
preds = np.asarray(preds).astype('float64')
preds = np.log(preds) / temperature
exp_preds = np.exp(preds)
preds = exp_preds / np.sum(exp_preds)
probas = np.random.multinomial(1, preds, 1)
return np.argmax(probas)
class TextGenerator(Sequence):
def __init__(self, text_path, batch_size, maxlen, wl_chars):
# コンストラクタ
self.text = list(open(text_path).read())
self.batch_size = batch_size
self.maxlen = maxlen
deta_len = len(self.text) - maxlen - 1
self.sample_per_epoch = int(deta_len/self.batch_size) #端数は切り捨て。端数分は・・・
self.wl_chars = wl_chars
self.char_indices = dict((c, i) for i, c in enumerate(self.wl_chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.wl_chars))
def __getitem__(self, idx):
# データの取得実装
sentences = []
next_chars = []
for i in range(self.batch_size*idx,self.batch_size*(idx+1)):
try:
sentences.append(self.text[i: i + self.maxlen])
next_chars.append(self.text[i + self.maxlen])
except:
break
x = np.zeros((len(sentences), self.maxlen, len(self.wl_chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(self.wl_chars)), dtype=np.bool)
for i, (sentence,next_char) in enumerate(zip(sentences,next_chars)):
for t, char in enumerate(sentence):
try:
x[i, t, self.char_indices[char]] = 1
except:
continue
try:
y[i, self.char_indices[next_char]] = 1
except:
continue
return x, y
def __len__(self):
# 全データ数をバッチサイズで割って、何バッチになるか返すよー!
return self.sample_per_epoch
def on_epoch_end(self):
# Function invoked at end of each epoch. Prints generated text.
print()
print('----- Generating text after Epoch')
start_index = random.randint(0, len(self.text) - self.maxlen - 1)
for diversity in [0.25, 0.5, 0.75, 1.0]:
print()
print('----- diversity:', diversity)
generated = ''
sentence = self.text[start_index: start_index + self.maxlen]
print('----- Generating with seed: "' + ''.join(sentence) + '"')
sys.stdout.write(generated)
for i in range(50):
#print('debug1 i=%d' %i)
x_pred = np.zeros((1, self.maxlen, len(self.wl_chars)))
for t, char in enumerate(sentence):
try:
x_pred[0, t, self.char_indices[char]] = 1.
except:
print('error:char=',t,char)
#print('debug2 x_pred=',x_pred)
with graph.as_default():
preds = model.predict(x_pred, verbose=0)[0]
next_index = sample(preds, diversity)
#print('debug3 next_index=',next_index)
next_char = self.indices_char[next_index]
generated += next_char
sentence = sentence[1:]
sentence.append(next_char)
sys.stdout.write(next_char)
sys.stdout.flush()
print()
__getitem__:idx番目のデータを返すロジックを実装します。返却値はnumpy配列です。xはインプットデータから切り出した25文字、yはそのあとに続く1文字をです。つまり、xが学習データ、yが正解データになります。
ここで、バッチサイズを考慮することに注意です。バッチサイズ100なら、idx=0のデータは0~99番目のデータを返す必要があります。
on_epoch_end:1エポック終了後に自動的に実行される処理です。ここでは学習済みモデルの保存と、文章の自動生成テストを行います。
自動生成テストのためにsampleを定義していますが、難しいことをしているように見えます(参考にした英文のサンプルのまま)。ざっくりいうと単純にスコアが最も高かった文字を選ぶのではなく、ある程度スコアを考慮した中からランダムに選ぶイメージです。temperatureが高いと選択の幅が広がり、小さくすると狭まる感じです。
##学習
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str)
parser.add_argument("--model_path", type=str)
parser.add_argument("--gpu", type=str, default='1')
parser.add_argument("--idx", type=int, default=0)
parser.add_argument("--batch_size", type=int, default=256)
args = parser.parse_args()
return args
def on_epoch_end(epoch, logs):
### save
print('----- saving model...')
model.save_weights(args.model_path + 'w')
model.save(args.model_path)
if __name__ == '__main__':
#パラメータ取得
args = get_args()
#GPU設定
config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=False,
visible_device_list=args.gpu
))
session = tf.Session(config=config)
backend.set_session(session)
GPUs = len(args.gpu.split(','))
#辞書読み込み
wl_chars = list(open('wl.txt').read())
wl_chars.append(r'\n')
wl_chars.sort()
p_model = None
model = lstm_model(maxlen, wl_chars)
model.summary()
if os.path.exists(args.model_path):
# loading the model
print('load model...')
# model = load_model(sys.argv[2])
model.load_weights(args.model_path, by_name=False)
model.compile(loss='categorical_crossentropy', optimizer=RMSprop())
m = model
if GPUs > 1:
p_model = multi_gpu_model(model, gpus=GPUs)
p_model.compile(loss='categorical_crossentropy', optimizer=RMSprop())
m = p_model
start_idx = args.idx
batch_size = args.batch_size
generator = TextGenerator(args.input, batch_size, maxlen, wl_chars)
print_callback = LambdaCallback(on_epoch_end=on_epoch_end)
ES = EarlyStopping(monitor='loss', min_delta=0.001, patience=10, verbose=0, mode='auto')
m.fit_generator(generator,
callbacks=[print_callback,ES],
epochs=epochs,
verbose=1,
# steps_per_epoch=60,
initial_epoch=start_idx,
max_queue_size=process_count,
workers=4,
use_multiprocessing=True)
model.compile:損失関数、最適化関数を指定します。(kerasでは「コンパイル」という呼び方をしているようです)
fit_generator:先ほど作成したデータジェネレータを使って学習する機能です。これだけで自動でデータを順次生成しながら学習を進めてくれます。
multi_gpu:複数GPUで分散させる場合に使用します。注意として、重み保存をする場合は元のモデル、学習する際はこの機能が返却するモデルを使用します。この機能が返却するモデルで重みの保存はできないようです。
LambdaCallback:1エポック終了時に実行する処理を定義します。ここではモデルの重み保存を行うようにしています。
EarlyStopping:学習を自動終了させる条件を指定します。
#参考
以下にソースコード全体を載せておきます。
# coding: utf-8
from tensorflow.python.keras.models import Sequential,load_model
from tensorflow.python.keras.callbacks import LambdaCallback,EarlyStopping
from tensorflow.python.keras.layers import Dense, Activation, CuDNNLSTM, Dropout #, Embedding, Conv1D, MaxPooling1D, Flatten, Input
from tensorflow.python.keras.optimizers import RMSprop
from tensorflow.python.keras.utils import Sequence, multi_gpu_model
from tensorflow.python.keras import backend
import multiprocessing
import numpy as np
import random,json
import sys,io,re,os
from time import sleep
import argparse
import tensorflow as tf
graph = tf.get_default_graph()
#変更するとモデル再構築必要
maxlen = 25
#いろいろなパラメータ
epochs = 10000
# 同時実行プロセス数
process_count = multiprocessing.cpu_count() - 1
def lstm_model(maxlen, wl_chars):
model = Sequential()
model.add(CuDNNLSTM(512, return_sequences=True, input_shape=(maxlen, len(wl_chars))))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(128, return_sequences=True))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(64, return_sequences=True))
model.add(Dropout(0.2))
model.add(CuDNNLSTM(32))
model.add(Dropout(0.2))
model.add(Dense(len(wl_chars), activation='softmax'))
return model
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input", type=str)
parser.add_argument("--model_path", type=str)
parser.add_argument("--gpu", type=str, default='1')
parser.add_argument("--idx", type=int, default=0)
parser.add_argument("--batch_size", type=int, default=256)
args = parser.parse_args()
return args
def sample(preds, temperature=1.0):
# helper function to sample an index from a probability array
preds = np.asarray(preds).astype('float64')
preds = np.log(preds) / temperature
exp_preds = np.exp(preds)
preds = exp_preds / np.sum(exp_preds)
probas = np.random.multinomial(1, preds, 1)
return np.argmax(probas)
class TextGenerator(Sequence):
def __init__(self, text_path, batch_size, maxlen, wl_chars):
# コンストラクタ
self.text = list(open(text_path).read())
self.batch_size = batch_size
self.maxlen = maxlen
deta_len = len(self.text) - maxlen - 1
self.sample_per_epoch = int(deta_len/self.batch_size) #端数は切り捨て。端数分は・・・
self.wl_chars = wl_chars
self.char_indices = dict((c, i) for i, c in enumerate(self.wl_chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.wl_chars))
def __getitem__(self, idx):
# データの取得実装
sentences = []
next_chars = []
for i in range(self.batch_size*idx,self.batch_size*(idx+1)):
try:
sentences.append(self.text[i: i + self.maxlen])
next_chars.append(self.text[i + self.maxlen])
except:
break
x = np.zeros((len(sentences), self.maxlen, len(self.wl_chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(self.wl_chars)), dtype=np.bool)
for i, (sentence,next_char) in enumerate(zip(sentences,next_chars)):
for t, char in enumerate(sentence):
try:
x[i, t, self.char_indices[char]] = 1
except:
continue
try:
y[i, self.char_indices[next_char]] = 1
except:
continue
return x, y
def __len__(self):
# 全データ数をバッチサイズで割って、何バッチになるか返すよー!
return self.sample_per_epoch
def on_epoch_end(self):
# Function invoked at end of each epoch. Prints generated text.
print()
print('----- Generating text after Epoch')
start_index = random.randint(0, len(self.text) - self.maxlen - 1)
for diversity in [0.25, 0.5, 0.75, 1.0]:
print()
print('----- diversity:', diversity)
generated = ''
sentence = self.text[start_index: start_index + self.maxlen]
print('----- Generating with seed: "' + ''.join(sentence) + '"')
sys.stdout.write(generated)
for i in range(50):
#print('debug1 i=%d' %i)
x_pred = np.zeros((1, self.maxlen, len(self.wl_chars)))
for t, char in enumerate(sentence):
try:
x_pred[0, t, self.char_indices[char]] = 1.
except:
print('error:char=',t,char)
#print('debug2 x_pred=',x_pred)
with graph.as_default():
preds = model.predict(x_pred, verbose=0)[0]
next_index = sample(preds, diversity)
#print('debug3 next_index=',next_index)
next_char = self.indices_char[next_index]
generated += next_char
sentence = sentence[1:]
sentence.append(next_char)
sys.stdout.write(next_char)
sys.stdout.flush()
print()
def on_epoch_end(epoch, logs):
### save
print('----- saving model...')
model.save_weights(args.model_path + 'w')
model.save(args.model_path)
if __name__ == '__main__':
#パラメータ取得
args = get_args()
#GPU設定
config = tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth=False,
visible_device_list=args.gpu
))
session = tf.Session(config=config)
backend.set_session(session)
GPUs = len(args.gpu.split(','))
#辞書読み込み
wl_chars = list(open('wl.txt').read())
# wl_chars = list(open('../dic/wl.txt').read())
wl_chars.append(r'\n')
wl_chars.sort()
p_model = None
model = lstm_model(maxlen, wl_chars)
model.summary()
if os.path.exists(args.model_path):
# loading the model
print('load model...')
# model = load_model(sys.argv[2])
model.load_weights(args.model_path, by_name=False)
model.compile(loss='categorical_crossentropy', optimizer=RMSprop())
m = model
if GPUs > 1:
p_model = multi_gpu_model(model, gpus=GPUs)
p_model.compile(loss='categorical_crossentropy', optimizer=RMSprop())
m = p_model
start_idx = args.idx
batch_size = args.batch_size
generator = TextGenerator(args.input, batch_size, maxlen, wl_chars)
print_callback = LambdaCallback(on_epoch_end=on_epoch_end)
ES = EarlyStopping(monitor='loss', min_delta=0.001, patience=10, verbose=0, mode='auto')
m.fit_generator(generator,
callbacks=[print_callback,ES],
epochs=epochs,
verbose=1,
# steps_per_epoch=60,
initial_epoch=start_idx,
max_queue_size=process_count,
workers=4,
use_multiprocessing=True)