Keras
TensorFlow
Pyrhon3

kerasを使ってみる

データ取得スクリプトについてはかなり長めのインターバルを入れて作りましたが、割愛します。

関連書籍

ゼロから作るDeep Learning
詳解 ディープラーニング

はじめに

今回はDNNのチュートリアルみたいな物です。RNNではありませんので、ご容赦ください。
資料にあったkerasを使ってみて簡単だったので、時系列データのパターンを使った予測(回帰問題)で試してみようと思います。

description.png

データは沢山あるのですが、とりあえず今回は気象庁で公開されている過去のデータを利用させていただきます。
こういう時には助かります。
(今回は暑さで有名な熊谷を選択)
出典:気象庁ホームページ

kerasの使い方

入力層-隠れ層-出力層の作成

DNN1.png
とりあえず、上記のような入力層に2、隠れ層に3、出力層に1となるニューラルネットワークを作ってみます。

dnn_1
# 入力層
n_in = 2
# 隠れ層
n_hidden = 3
# 出力層
n_out = 1

# モデルの作成
model = Sequential()
# 入力層 から 隠れ層
model.add(Dense(n_hidden, input_shape=(n_in,), kernel_initializer=TruncatedNormal(stddev=0.01)))
model.add(Activation('tanh'))
# 隠れ層 から 出力層 恒等関数の使用
model.add(Dense(n_out,  kernel_initializer=TruncatedNormal(stddev=0.01)))
model.add(Activation('linear'))

kernel_initializerで重みの初期化を自動で行ってます。公式に説明があります。
推奨なので初心者の自分はそのままです。
隠れ層の活性化関数公式もreluとかtanhとか色々ありますが、とりあえずtanh
出力層はliner

隠れ層を追加する

DNN_2.png

dnn_2
# モデルの準備
model = Sequential()
# 入力層 から 隠れ層
model.add(Dense(n_hidden, input_shape=(n_in,), kernel_initializer=TruncatedNormal(stddev=0.01)))
model.add(Activation('tanh'))
# 隠れ層から隠れ層
model.add((Dense(n_hidden,  kernel_initializer=TruncatedNormal(stddev=0.01))))
model.add(Activation('tanh'))
# 隠れ層 から 出力層 恒等関数の使用
model.add(Dense(n_out,  kernel_initializer=TruncatedNormal(stddev=0.01)))
model.add(Activation('linear'))

二層目以降は入力数の指定は必要ないです。

確率的勾配降下法と誤差関数を決めてモデルの設定

compile
# 確率的勾配降下法 Adam
optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999)
# 損失関数 二乗平均誤差
model.compile(loss='mean_squared_error',
              optimizer=optimizer)

ほぼテンプレです。

学習の実施!!

# 学習のエポック数
epoch = 1000
# バッチサイズ
batch_size = 50
# 学習の実施
history = model.fit(X_train, Y_train,
                    epochs=epoch,
                    batch_size=batch_size,
                    validation_data=(X_validation, Y_validation)
                    )

X_trainに学習用データ、Y_trainに正解データを渡します。
X_validation, Y_validationにはそれぞれ検証用に用意した学習用データとは違うデータを渡します。
batch_sizeを指定して、ミニバッチ学習を行います。
historyで返り値を取得しているのは、historyオブジェクトで学習の履歴を見るためですが、割愛します。

作成したモデルの使用

Y = model.predict(X)

Xに学習データと同じ、連続したデータを渡し、Yで予測された正解データに似たデータが取り出せます。

時系列データの準備

データの取得

気象庁の過去の気象データから年ごとの平均気温を取得します。
何度も取得する訳にもいかないので、ファイルに保存します。ファイル名、パスは任意の物へ変更してください。
データ取得スクリプトについてはかなり長めのインターバルを入れて作りましたが、割愛します。

get_data
FILE = 'weather_log.npz'
# 気温のリストを取得
def get_data():
    # 一度ファイルを保存済みであれば、それを読み込む
    if os.path.isfile(FILE):
        result_list = np.load(FILE)['x']
    else:
        ###################################
        # 割愛します。
        ###################################
        result_list = np.array(tmp_list)
        # ファイルへ保存
        np.savez(FILE, x=result_list)
    return result_list

データの正規化、データセットの作成

時系列データなので連続したデータのセットを入力、直後のデータを正解として作成し、連続したデータを与えると直後を予測したデータを返してくれるようにします。
例過去10個の連続したデータセットを与えて、次の結果を予測したい場合は入力層が10としてそこに過去10個のデータを与え、出力層を1とします。
DNN_3.png

# データの正規化
def normalize(x):
    result_list = (x - x.mean()) / x.std()
    return result_list

# 気温データを取得
temp_list = get_data()
# 正規化を実施
normalize_data = normalize(temp_list)
# 入力層の次元数 学習で参考データとする過去の長さ
input_data_length = 10
data = []
target = []
# 学習で使用するデータの作成
for i in range(0, len(normalize_data) - (input_data_length + 1)):
    data.append(normalize_data[i: i + input_data_length])
    target.append(normalize_data[i + input_data_length])
X = np.array(data)
Y = np.array(target)
# 学習データから検証データを作成
X_train, X_validation, Y_train, Y_validation = \
    train_test_split(X, Y, test_size=0.2)

気温データをそのまま使用せず一旦正規化を行っています。
input_data_lengthで参考とする過去のデータ数を決めています。これはニューラルネットワークの入力の数にもなります。
train_test_splitを使用して学習用のデータと検証データを8:2で分けています。

コード全体

非正規化の処理を入れたりとか取得したデータの後半をモデルのテストデータとして利用していますが基本的には説明したコードで構成されています。

learning.py
import requests, bs4, re, time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.layers.core import Dropout
from keras.optimizers import Adam
from keras.initializers import TruncatedNormal
import os.path

FILE = 'weather_log.npz'

# 一部ゴミが入ってしまうので
#  ゴミの削除のための関数
def remove_dust(x):
    tmp_regex = re.compile(r'\d+.+\d')
    tmp = []
    for line in x:
        tmp += tmp_regex.findall(line)
    return tmp


# データの正規化
def normalize(x):
    result_list = (x - x.mean()) / x.std()
    return result_list


# 正規化したデータのリセット
def de_normalize(x, y):
    result_list = y * x.std() + x.mean()
    return result_list


# 気温のリストを取得
def get_data():
    # 一度ファイルを保存済みであれば、それを読み込む
    if os.path.isfile(FILE):
        result_list = np.load(FILE)['x']
    else:
        ###################################
        # 割愛します。
        ###################################
        result_list = np.array(tmp_list)
        # ファイルへ保存
        np.savez(FILE, x=result_list)
    return result_list


# メイン
if __name__ == '__main__':
    # 入力層の次元数 学習で参考データとする過去の長さ
    input_data_length = 10
    data = []
    target = []
    split_line = 1300
    temp_list = get_data()
    normalize_data = normalize(temp_list[:split_line])
    # 学習で使用するデータの作成
    for i in range(0, len(normalize_data) - (input_data_length + 1)):
        data.append(normalize_data[i: i + input_data_length])
        target.append(normalize_data[i + input_data_length])
    X = np.array(data)
    Y = np.array(target)
    # 学習データから検証データを作成
    X_train, X_validation, Y_train, Y_validation = \
        train_test_split(X, Y, test_size=0.2)

    # 隠れ層
    n_hidden = 200
    # 出力層
    n_out = 1
    # 学習のエポック数
    epoch = 1000
    # バッチサイズ
    batch_size = 100
    # モデルの作成
    model = Sequential()
    # 入力層 から 隠れ層
    model.add(Dense(n_hidden, input_shape=(input_data_length,), kernel_initializer=TruncatedNormal(stddev=0.01)))
    model.add(Activation('tanh'))
    model.add(Dropout(0.5))
    # 隠れ層の作成
    model.add((Dense(n_hidden,  kernel_initializer=TruncatedNormal(stddev=0.01))))
    model.add(Activation('tanh'))
    model.add(Dropout(0.5))
    # 隠れ層 から 出力層 恒等関数の使用
    model.add(Dense(n_out,  kernel_initializer=TruncatedNormal(stddev=0.01)))
    model.add(Activation('linear'))
    # 確率的勾配法 Adam
    optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999)
    # 損失関数 二乗和誤差
    model.compile(loss='mean_squared_error',
                  optimizer=optimizer)
    # 学習の実施
    history = model.fit(X_train, Y_train,
                        epochs=epoch,
                        batch_size=batch_size,
                        validation_data=(X_validation, Y_validation)
                        )

    # テスト用データの作成
    normalize_data = normalize(temp_list[split_line:])
    data = []
    target = []
    for i in range(0, len(normalize_data) - (input_data_length + 1)):
        data.append(normalize_data[i: i + input_data_length])
        target.append(normalize_data[i + input_data_length])
    X_test = np.array(data)
    Y_test = np.array(target)

    # 学習済みのモデルで
    # テストデータを使って出力の確認
    predicted = model.predict(X_test)
    test_data = de_normalize(temp_list, Y_test)
    result_data = de_normalize(temp_list, predicted)
    # X軸のリスト作成
    month_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
    tmp_ticks = []
    for line in range(0,len(result_data)//12 + 1):
        tmp_ticks.extend(month_list)
    X_ticks = np.arange(0,len(result_data))
    X_ticks_str = tmp_ticks[12-len(result_data)%12:]
    # 実際のデータ
    plt.plot(test_data, label='test_data')
    # モデルが出力したデータ
    plt.plot(result_data, label='result_data')
    plt.xticks(X_ticks, X_ticks_str)
    plt.legend()
    plt.xlabel("month")
    plt.ylabel("temp")
    plt.show()

表示された結果の一部ですが、こんな感じです。
パターンが簡単なのでい一応予測できています。
result.png