DQN

DQN + Batch Normalization + Experience Replay

Experience Replayを使用した場合、学習がすすまない不具合あり。

network.py


# -- coding: UTF-8 --

from future import absolute_import
import numpy as np
import random
import math
from network.function import *
from network.optimizer import *

# 隠れ層が2つ 合計5層のニューラルネットワーク

class Network:
    # Batch normalization
    eps = 1e-8

u1 = None
u2 = None

cache1 = None
cache2 = None
cache3 = None

params = None
bn_params = None

def __init__(self,
             learning_rate=None,
             num_input=None,
             num_hidden1=None,
             num_hidden2=None,
             num_output=None,
             load_weight_flg=True,
             batch_norm_flg=False
             ):

    self.num_input = num_input
    self.num_hidden1 = num_hidden1
    self.num_hidden2 = num_hidden2
    self.num_output = num_output
    self.load_weight_flg = load_weight_flg
    self.batch_norm_flg = batch_norm_flg
    self.fn = ActivationFunction()
    self.fn.batch_norm

    self.optimizer = Optimiser(learning_rate).RMSPropGraves

    if self.load_weight_flg:
        self.load_weight()
    else:
        self.create_weight()

# 順伝播
def fw(self, X, train_flg=True):

    params = self.params

    W1 = params["W1"]
    W2 = params["W2"]
    W3 = params["W3"]

    b1 = params["b1"]
    b2 = params["b2"]
    b3 = params["b3"]

    # BatchNorm
    if self.batch_norm_flg:
        bn_params = self.bn_params
        gamma1 = params["gamma1"]
        gamma2 = params["gamma2"]
        beta1 = params["beta1"]
        beta2 = params["beta2"]

    # Activate function
    u1 = np.dot(X, W1) + b1
    h1 = u1

    # BatchNorm
    if self.batch_norm_flg:
        # BatchNorm training forward propagation(mean)
        # BatchNorm training forward propagation(variance)
        if train_flg:
            h1, self.cache1, mu, var = self.fn.batch_norm.fw(u1, gamma1, beta1)
            bn_params['bn1_mean'] = .9 * bn_params['bn1_mean'] + .1 * mu
            bn_params['bn1_var'] = .9 * bn_params['bn1_var'] + .1 * var

        else:
            h1 = (h1 - bn_params['bn2_mean']) / np.sqrt(bn_params['bn2_var'] + 1e-8)
            h1 = gamma2 * h1 + beta2

    # h1 = self.fn.dropout.forward(h1, train_flg=train_flg)

    h1 = self.fn.ReLU.fw(h1)

    u2 = np.dot(h1, W2) + b2
    h2 = u2

    # BatchNorm
    if self.batch_norm_flg:

        if train_flg:
            h2, self.cache2, mu, var = self.fn.batch_norm.fw(u2, gamma2, beta2)
            bn_params['bn2_mean'] = .9 * bn_params['bn2_mean'] + .1 * mu
            bn_params['bn2_var'] = .9 * bn_params['bn2_var'] + .1 * var
            self.bn_params = bn_params
        else:
            h2 = (h2 - bn_params['bn2_mean']) / np.sqrt(bn_params['bn2_var'] + 1e-8)
            h2 = gamma2 * h2 + beta2

    # h2 = self.fn.dropout.forward(h2, train_flg=train_flg)

    h2 = self.fn.ReLU.fw(h2)

    out = np.dot(h2, W3) + b3
    out = self.fn.identity.fw(out)

    self.params = params


    self.u1 = u1
    self.u2 = u2

    return h1, h2, out

# 誤差逆伝播
def bw(self, x, t, train_flg=False):

    params = self.params
    W1 = params["W1"]
    W2 = params["W2"]
    W3 = params["W3"]

    # 入力を順伝播させて中間層の出力を計算
    h1, h2, y = self.fw(x, train_flg=train_flg)
    u1 = self.u1
    u2 = self.u2

    # Back Propagation
    # 出力層の誤差を計算(交差エントロピー誤差関数を使用)
    dout = y - t
    db3 = np.sum(dout, axis=0)
    dW3 = dout * 1

    # 1. 誤差を逆伝播させて隠れ層の誤差(勾配)を計算
    dh2 = np.dot(dW3, W3.T)
    dh2 = self.fn.ReLU.bw(u2) * dh2

    # BatchNorm
    if self.batch_norm_flg:
        dh2, dgamma2, dbeta2 = self.fn.batch_norm.bw(dh2, self.cache2)
    db2 = np.sum(dh2, axis=0)
    dW2 = dh2 * 1

    dh1 = np.dot(dW2, W2.T)
    dh1 = self.fn.ReLU.bw(u1) * dh1

    # BatchNorm
    if self.batch_norm_flg:
        dh1, dgamma1, dbeta1 = self.fn.batch_norm.bw(dh1, self.cache1)
    db1 = np.sum(dh1, axis=0)
    dW1 = dh1 * 1

    # 1. 出力層の誤差を用いて出力層の重みを更新
    # 2. 行列演算になるので2次元ベクトルに変換する必要がある
    # 3. 隠れ層の誤差を用いて隠れ層の重みを更新
    grads = {}
    grads["W3"] = np.dot(h2.T, dW3)
    grads["W2"] = np.dot(h1.T, dW2)
    grads["W1"] = np.dot(x.T, dW1)

    grads["b3"] = db3
    grads["b2"] = db2
    grads["b1"] = db1

    # BatchNorm
    if self.batch_norm_flg:
        # Batch normalization の scale, shift を更新
        grads["gamma1"] = dgamma1
        grads["gamma2"] = dgamma2
        grads["beta1"] = dbeta1
        grads["beta2"] = dbeta2

    self.optimizer.update(params, grads)


    # _, _, y = self.fw(x, train_flg=True)
    # loss = np.sum((y - t)*(y - t)) * 0.5
    # return loss

# バッチ学習
def train(self, X, T, experience_replay=True):

    if experience_replay:
        # print("Experience replay data.")
        X_batch = X
        T_batch = T

        # Backward process
        self.bw(X_batch, T_batch, experience_replay)

        # while loss > 0.001:
        #     loss = self.bw(X_batch, T_batch)

    else:
        print("Select data randomly from input.")

        # 訓練データからランダムに選択する
        for i in range(self.epochs):
            idx = np.random.randint(X.shape[0])

            x = np.array([X[idx]])
            t = np.array([T[idx]])

            # Backward
            self.bw(x, t)

def predict(self, x):

    # BatchNorm
    if self.batch_norm_flg:
        # BatchNorm inference forward propagation
        # h2 = (h2 - bn_params['bn2_mean']) / np.sqrt(bn_params['bn2_var'] + 1e-8)
        # h2 = gamma2 * h2 + beta2
        pass

    _, _, y = self.fw(x)

    return y

# パラメーターの生成
def create_weight(self):

    W1 = np.random.randn(self.num_input, self.num_hidden1).astype(np.float32)
    W2 = np.random.randn(self.num_hidden1, self.num_hidden2).astype(np.float32)
    W3 = np.random.randn(self.num_hidden2, self.num_output).astype(np.float32)

    b1 = np.zeros(self.num_hidden1, dtype=np.float32)
    b2 = np.zeros(self.num_hidden2, dtype=np.float32)
    b3 = np.zeros(self.num_output, dtype=np.float32)

    # Xavier initialization : For linear function.
    # W1 /= np.sqrt(self.numInp)
    # W2 /= np.sqrt(self.numHid1)
    # W3 /= np.sqrt(self.numHid2)
    # W4 /= np.sqrt(self.numHid3)

    # Batch Normalization で、これさへも必要なくなる?
    # He initialization : For LeRU function
    W1 *= np.sqrt(2.0 / (self.num_input))
    W2 *= np.sqrt(2.0 / (self.num_hidden1))
    W3 *= np.sqrt(2.0 / (self.num_hidden2))

    params = {}
    params["W1"] = W1
    params["W2"] = W2
    params["W3"] = W3
    params["b1"] = b1
    params["b2"] = b2
    params["b3"] = b3

    # Batch normalization
    if self.batch_norm_flg:
        params["gamma1"] = np.ones(self.num_hidden1, dtype=np.float32)
        params["gamma2"] = np.ones(self.num_hidden2, dtype=np.float32)
        params["beta1"] = np.zeros(self.num_hidden1, dtype=np.float32)
        params["beta2"] = np.zeros(self.num_hidden2, dtype=np.float32)

        bn_params = {}
        bn_params['bn1_mean'] = 0.0
        bn_params['bn1_var'] = 0.0
        bn_params['bn2_mean'] = 0.0
        bn_params['bn2_var'] = 0.0
        self.bn_params = bn_params

    self.params = params

    print("Weight created.")