Python
機械学習
RNN
LSTM
TensorFlow

RNNにsin波を学習させて予測してみた

More than 1 year has passed since last update.

0. ざっくり言うと

  • TensorFlowで簡単なRNN(Recurrent Neural Network)を実装した。
  • RNNを使い、sin波を学習させて、sin(t)からsin(t+1)(次ステップ)を予測させた。
  • RNNの出力結果を連鎖させて、sin(t+n)(複数ステップ)の予測を実現できた。
  • RNNのセルにはLSTM(Long Short-Term Memory)を使った。

2016年5月27日追記:

続編『RNNにsin波を学習させて予測してみた:ハイパーパラメータ調整編』を書きました。

1. TensorFlow、RNN、LSTMについて

ざっくり割愛します。TensorFlowのチュートリアルや、そこから参照されている記事などが参考になると思います。

2. 学習データの準備

1サイクル50ステップのsin波を100サイクル、合計5,000ステップ分生成し、学習データとしました。
また、学習データとして、ノイズなし、ノイズありの2種類を用意しました。

学習データはsin(t)(時刻tにおけるsin値)とsin(t+1)(時刻t+1におけるsin値)のペアで構成されています。
学習データの生成の詳細については、ipynbファイル(IPython Notebook)を残していますので、そちらを参照ください。(余談ですが、GitHubでipynbファイルがプレビューされて驚きました)

2.1. ノイズなし

train_data/normal.ipynb

train_data_normal.png

2.2. ノイズあり

train_data/noised.ipynb

train_data_noised.png

3. 学習・予測

今回は1つのコードで学習と予測を行っています。ソースコードは文末の付録に示します。

3.1. 処理の流れ

学習、予測の流れは以下の通り。

  1. 学習データを用いて学習
  2. 初期データ(学習データの先頭部分)を用いてsin(t+1)を予測
  3. 予測したsin(t+1)を用いてsin(t+2)を予測
  4. 3の繰り返し

3.2. ネットワーク構成

「入力層 - 隠れ層 - RNNセル - 出力層」というネットワークを使用しました。
また、RNNセルにはLSTMを使用しました。

3.3. ハイパーパラメータ

学習、予測に用いたハイパーパラメータは以下の通り。

変数名 意味
num_of_input_nodes 入力層のノード数 1 ノード
num_of_hidden_nodes 隠れ層のノード数 2 ノード
num_of_output_nodes 出力層のノード数 1 ノード
length_of_sequences RNNのシーケンス長 50 ステップ
num_of_training_epochs 学習の繰り返し回数 2,000 回
length_of_initial_sequences 初期データのシーケンス長 50 ステップ
num_of_prediction_epochs 予測の繰り返し回数 100 回
size_of_mini_batch ミニバッチあたりのサンプル数 100 サンプル
learning_rate 学習率 0.1
forget_bias (よく分かっていません) 1.0 (デフォルト値)

4. 予測結果

予測結果をプロットした図を以下にに示します。凡例は以下の通りです。

  • 黒の点線: 学習データ
  • 青の実線: 初期データ
  • 緑の実線: 予測データ

4.1. ノイズなし

それっぽい波形が出力されています。全体的に振幅が浅く、頂点が歪み、周波数が少し低くなっています。
具体的な値はbasic/output.ipynbを参照ください。

output_normal.png

4.2. ノイズあり

ノイズなしの場合よりもさらに振幅が浅く、周波数は少し高くなっています。また、学習データに含まれていたノイズ成分が減っているように見えます。
具体的な値はnoised/output.ipynbを参照ください。

output_noised.png

5. 今後の予定

ネットワーク構成やハイパーパラメータを変化させてみて、どんな予測結果になるかを試してみたいと思っています。

2016年5月27日追記:

続編『RNNにsin波を学習させて予測してみた:ハイパーパラメータ調整編』を書きました。

付録: ソースコード

ノイズなし版のソースコードを以下に示します。ノイズあり版のソースコードはGitHubを参照ください。
ノイズなし版とノイズあり版は、入力ファイル名が違うだけです。

rnn.py
import tensorflow as tf
from tensorflow.models.rnn import rnn, rnn_cell
import numpy as np
import random

def make_mini_batch(train_data, size_of_mini_batch, length_of_sequences):
    inputs  = np.empty(0)
    outputs = np.empty(0)
    for _ in range(size_of_mini_batch):
        index   = random.randint(0, len(train_data) - length_of_sequences)
        part    = train_data[index:index + length_of_sequences]
        inputs  = np.append(inputs, part[:, 0])
        outputs = np.append(outputs, part[-1, 1])
    inputs  = inputs.reshape(-1, length_of_sequences, 1)
    outputs = outputs.reshape(-1, 1)
    return (inputs, outputs)

def make_prediction_initial(train_data, index, length_of_sequences):
    return train_data[index:index + length_of_sequences, 0]

train_data_path             = "../train_data/normal.npy"
num_of_input_nodes          = 1
num_of_hidden_nodes         = 2
num_of_output_nodes         = 1
length_of_sequences         = 50
num_of_training_epochs      = 2000
length_of_initial_sequences = 50
num_of_prediction_epochs    = 100
size_of_mini_batch          = 100
learning_rate               = 0.1
forget_bias                 = 1.0
print("train_data_path             = %s" % train_data_path)
print("num_of_input_nodes          = %d" % num_of_input_nodes)
print("num_of_hidden_nodes         = %d" % num_of_hidden_nodes)
print("num_of_output_nodes         = %d" % num_of_output_nodes)
print("length_of_sequences         = %d" % length_of_sequences)
print("num_of_training_epochs      = %d" % num_of_training_epochs)
print("length_of_initial_sequences = %d" % length_of_initial_sequences)
print("num_of_prediction_epochs    = %d" % num_of_prediction_epochs)
print("size_of_mini_batch          = %d" % size_of_mini_batch)
print("learning_rate               = %f" % learning_rate)
print("forget_bias                 = %f" % forget_bias)

train_data = np.load(train_data_path)
print("train_data:", train_data)

# 乱数シードを固定する。
random.seed(0)
np.random.seed(0)
tf.set_random_seed(0)

optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)

with tf.Graph().as_default():
    input_ph      = tf.placeholder(tf.float32, [None, length_of_sequences, num_of_input_nodes], name="input")
    supervisor_ph = tf.placeholder(tf.float32, [None, num_of_output_nodes], name="supervisor")
    istate_ph     = tf.placeholder(tf.float32, [None, num_of_hidden_nodes * 2], name="istate") # 1セルあたり2つの値を必要とする。

    with tf.name_scope("inference") as scope:
        weight1_var = tf.Variable(tf.truncated_normal([num_of_input_nodes, num_of_hidden_nodes], stddev=0.1), name="weight1")
        weight2_var = tf.Variable(tf.truncated_normal([num_of_hidden_nodes, num_of_output_nodes], stddev=0.1), name="weight2")
        bias1_var   = tf.Variable(tf.truncated_normal([num_of_hidden_nodes], stddev=0.1), name="bias1")
        bias2_var   = tf.Variable(tf.truncated_normal([num_of_output_nodes], stddev=0.1), name="bias2")

        in1 = tf.transpose(input_ph, [1, 0, 2])         # (batch, sequence, data) -> (sequence, batch, data)
        in2 = tf.reshape(in1, [-1, num_of_input_nodes]) # (sequence, batch, data) -> (sequence * batch, data)
        in3 = tf.matmul(in2, weight1_var) + bias1_var
        in4 = tf.split(0, length_of_sequences, in3)     # sequence * (batch, data)

        cell = rnn_cell.BasicLSTMCell(num_of_hidden_nodes, forget_bias=forget_bias)
        rnn_output, states_op = rnn.rnn(cell, in4, initial_state=istate_ph)
        output_op = tf.matmul(rnn_output[-1], weight2_var) + bias2_var

    with tf.name_scope("loss") as scope:
        square_error = tf.reduce_mean(tf.square(output_op - supervisor_ph))
        loss_op      = square_error
        tf.scalar_summary("loss", loss_op)

    with tf.name_scope("training") as scope:
        training_op = optimizer.minimize(loss_op)

    summary_op = tf.merge_all_summaries()
    init = tf.initialize_all_variables()

    with tf.Session() as sess:
        saver = tf.train.Saver()
        summary_writer = tf.train.SummaryWriter("data", graph=sess.graph)
        sess.run(init)

        for epoch in range(num_of_training_epochs):
            inputs, supervisors = make_mini_batch(train_data, size_of_mini_batch, length_of_sequences)

            train_dict = {
                input_ph:      inputs,
                supervisor_ph: supervisors,
                istate_ph:     np.zeros((size_of_mini_batch, num_of_hidden_nodes * 2)),
            }
            sess.run(training_op, feed_dict=train_dict)

            if (epoch + 1) % 10 == 0:
                summary_str, train_loss = sess.run([summary_op, loss_op], feed_dict=train_dict)
                summary_writer.add_summary(summary_str, epoch)
                print("train#%d, train loss: %e" % (epoch + 1, train_loss))

        inputs  = make_prediction_initial(train_data, 0, length_of_initial_sequences)
        outputs = np.empty(0)
        states  = np.zeros((num_of_hidden_nodes * 2)),

        print("initial:", inputs)
        np.save("initial.npy", inputs)

        for epoch in range(num_of_prediction_epochs):
            pred_dict = {
                input_ph:  inputs.reshape((1, length_of_sequences, 1)),
                istate_ph: states,
            }
            output, states = sess.run([output_op, states_op], feed_dict=pred_dict)
            print("prediction#%d, output: %f" % (epoch + 1, output))

            inputs  = np.delete(inputs, 0)
            inputs  = np.append(inputs, output)
            outputs = np.append(outputs, output)

        print("outputs:", outputs)
        np.save("output.npy", outputs)

        saver.save(sess, "data/model")