LoginSignup
6
9

More than 5 years have passed since last update.

TensorFlowによる言語予測モデル

Last updated at Posted at 2017-05-24

本記事ではRG (Reber Grammar), ERG (Embeded -), CERG(Continuous -) により生成する言語を予測する学習モデルを紹介する.

LSTMのTensorFlow実装のチュートリアルとしてPTB (Penn Treebank) という言語データセットを用いた学習があるが,入門としては理解が難しい.よって,入門として8つの文字からなる言語をオートマトン的に生成するアルゴリズム(RG, ERG, CERG) から生成される言語を予測するモデルの学習を行う.

RG, ERGについて

alt

https://www.willamette.edu/~gorr/classes/cs449/reber.html
の参考サイトがわかりやすい.
例えば"BTSSXSE.BPVVR."はRGの一つで,"BPBTSXXVVEPE."はERGの一つになる.CERGはERGから終端文字"."を取り除いた言語となる.本記事は8文字からなるこれらの言語を予測することにする.

ソースコード

githubのソースコードを下に載せる.Python3,TensorFlow API r1.1での動作を確認した.TensorFlowはバージョンごとの変更が大きいので,バージョンが違う場合はそのまま動くと思わないほうがいいかも.

RG_prediction_model.py
#! /usr/local/bin/python
# -*- coding:utf-8 -*-

import tensorflow as tf
import numpy as np
import random
from create_RG import ERG_generator

num_of_sample_length = 10000

class RG_predict_model:
  def __init__(self, data_model):
    self.num_of_hidden_nodes = 60
    self.chunk_size = 20
    self.model_file_name = "./tmp/model.ckpt"
    self.batch_size = 100
    self.forget_bias = 0.8
    self.learning_rate = 0.001
    self.num_of_epochs = 50000
    try:
      #train data set
      self.rggen = data_model()
      self.rggen.generate(num_of_sample_length)
      self.num_of_output_nodes = self.rggen.CHAR_VEC
      self.num_of_input_nodes  = self.rggen.CHAR_VEC 
      #test data set
      self.test_rggen = data_model()
      self.test_rggen.generate(num_of_sample_length)
    except:
      print("could not specify generator model")
      raise

  def inference(self, input_ph, istate_ph):
    with tf.name_scope("inference") as scope:
      weight1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
      weight2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
      bias1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes], stddev=0.1), name="bias1")
      bias2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_output_nodes], stddev=0.1), name="bias2")

      in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
      in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
      in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
      in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)

      cell = tf.contrib.rnn.BasicLSTMCell(
          self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
      outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
      output = tf.matmul(outputs[-1], weight2_var) + bias2_var
      return output

  def evaluate(self, output, label):
    with tf.name_scope("evaluate") as scope:
      prediction = tf.nn.softmax(output)
      correct_prediction = tf.equal(tf.argmax(output,1),tf.argmax(label,1))
      accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
      tf.summary.scalar("accuracy", accuracy)
      return prediction, accuracy

  def loss(self, output, label):
    with tf.name_scope("loss") as scope:
      loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(label, output))
      tf.summary.scalar("loss", loss)
      return loss

  def training(self, loss):
    with tf.name_scope("training") as scope:
      optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(loss)
      return optimizer

  def train(self):
    input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
    label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
    istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
    prediction = self.inference(input_ph, istate_ph)
    loss = self.loss(prediction, label_ph)
    optimizer = self.training(loss)
    evaluater = self.evaluate(prediction, label_ph)
    summary = tf.summary.merge_all()

    with tf.Session() as sess:
      summary_writer = tf.summary.FileWriter("./tmp/RG_log", graph=sess.graph)
      sess.run(tf.global_variables_initializer())
      ####### train ########
      for epoch in range(self.num_of_epochs):
        inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
        train_dict = {
          input_ph: inputs,
          label_ph: labels,
          istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
        }
        sess.run([optimizer], feed_dict=train_dict)

        if (epoch) % 100 ==0:
          summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
          print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
          summary_writer.add_summary(summary_str, epoch)

      ####### test #########
      inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
      test_dict = {
        input_ph: inputs,
        label_ph: labels,
        istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
      }
      prediction, acc = sess.run(evaluater, feed_dict=test_dict)
      for pred, label in zip(prediction, labels):
        print(np.argmax(pred) == np.argmax(label))
        print(['{:.2f}'.format(n) for n in pred])
        print(['{:.2f}'.format(n) for n in label])

      ####### save ########
      print("Training has done successfully")
      saver = tf.train.Saver()
      saver.save(sess, self.model_file_name)


if __name__ == '__main__':
  random.seed(0)
  np.random.seed(0)
  tf.set_random_seed(0)
  rg_model = RG_predict_model(ERG_generator)
  rg_model.train()

次に順に詳細を解説する.

def __init__(self, data_model):
    self.num_of_hidden_nodes = 60
    self.chunk_size = 20
    self.model_file_name = "./tmp/model.ckpt"
    self.batch_size = 100
    self.forget_bias = 0.8
    self.learning_rate = 0.001
    self.num_of_epochs = 50000
    try:
      #train data set
      self.rggen = data_model()
      self.rggen.generate(num_of_sample_length)
      self.num_of_output_nodes = self.rggen.CHAR_VEC
      self.num_of_input_nodes  = self.rggen.CHAR_VEC 
      #test data set
      self.test_rggen = data_model()
      self.test_rggen.generate(num_of_sample_length)
    except:
      print("could not specify generator model")
      raise

入力ベクトルはone hotなベクトルで表現するため,8次元ベクトルであらわされる(e.g. B=(1,0,0,0,0,0,0,0))が,LSTMセルに入力する手前でfully connectedな層をはさみ,num_of_hidden_nodes=60次元ベクトルに特徴量増加させる.LSTMはいくつ前の入力までを出力に影響させるかを決めるパラメータが必要で,これをchunk_sizeで指定する.今回は一回の予測に連続した20文字を入力とする.引数のdata_modelにはEG_model,ERG_model,CERG_modelのいずれかが入る.

def inference(self, input_ph, istate_ph):
    with tf.name_scope("inference") as scope:
      weight1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
      weight2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
      bias1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes], stddev=0.1), name="bias1")
      bias2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_output_nodes], stddev=0.1), name="bias2")

      in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
      in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
      in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
      in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)

      cell = tf.contrib.rnn.BasicLSTMCell(
          self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
      outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
      output = tf.matmul(outputs[-1], weight2_var) + bias2_var
      return output

input(8D) -> 全結合層-> (60D) -> LSTM-> (60D) -> 全結合層 -> output(8D)という流れで層を組んでいく.in1~in4はWx+b計算を容易に行うために変換しているだけである.tf.contrib.rnn.static_rnnは第二引数の配列サイズを勝手に取得して,cell(第一引数)を作り結合してくれる.各セルは[内部の特徴量=60]x[バッチサイズ=100]の入力を受け取る.
inference全体:
LSTM_inference.png
rnnの内部(chuk_size分のcellを生成してくれている):
LSTM_cell.png

evaluate,loss,trainingは省略.

def train(self):
    input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
    label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
    istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
    prediction = self.inference(input_ph, istate_ph)
    loss = self.loss(prediction, label_ph)
    optimizer = self.training(loss)
    evaluater = self.evaluate(prediction, label_ph)
    summary = tf.summary.merge_all()

inputと正解ラベル用のインスタンス,LSTMの先頭に入力する状態情報のインスタンス,各出力のインスタンスを定義.summaryはtensorboardなどでデバッグするのを容易にするための結果のログの出力.

####### train ########
      for epoch in range(self.num_of_epochs):
        inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
        train_dict = {
          input_ph: inputs,
          label_ph: labels,
          istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
        }
        sess.run([optimizer], feed_dict=train_dict)

        if (epoch) % 100 ==0:
          summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
          print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
          summary_writer.add_summary(summary_str, epoch)

dictionaryに入力をまとめ,学習を開始.結果をsummaryに追加する.rggen.get_batchは指定した入力データを持ってくる.詳しくはgithubcreate_RG.pyを見よう.

####### test #########
      inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
      test_dict = {
        input_ph: inputs,
        label_ph: labels,
        istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
      }
      prediction, acc = sess.run(evaluater, feed_dict=test_dict)
      for pred, label in zip(prediction, labels):
        print(np.argmax(pred) == np.argmax(label))
        print(['{:.2f}'.format(n) for n in pred])
        print(['{:.2f}'.format(n) for n in label])

同様にtest用の入力辞書を作り,出力を表示する.{:.2f}は小数点第二位まで出力するための記法.#正解ラベルと出力ラベルの表示が縦に揃うように出力してるだけ.

参考サイト:

6
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
9