Edited at

ディープラーニングを実装から学ぶ(8)実装変更

ニューラルネットワークの各手法を試してきました。今後、CNN、RNNに対応するため実装の見直しを行います。

変更の方針


  • 活性化関数、ドロップアウト、バッチ正規化など各層一律の適用でしたが、任意の階層に任意の活性化関数を適用できるようにします。ドロップアウト、バッチ正規化み1つの層とみなします。

  • 指定回数エポック実行後に、追加で実行できるようにします。途中で状態を保存し、後日、続きから実行することも可能にします。

  • 学習率、オプティマイザーを途中から変更できるようにします。


定義


モデル


定義

任意の階層を定義できるようにします。今までの中間層のノード数が100,50の場合は、以下のように定義します。

model = create_model(28*28)

model = add_layer(model, "affine1", affine, 100)
model = add_layer(model, "relu1", relu)
model = add_layer(model, "affine2", affine, 50)
model = add_layer(model, "relu2", relu)
model = add_layer(model, "affine3", affine, 10)
model = set_output(model, softmax)
model = set_error(model, cross_entropy_error)

create_modelでモデルを作成し、add_layerでレイヤーを追加していきます。出力関数と誤差関数は、set_optput,set_errorで設定します。

# モデル作成関数

model = create_model(input_shape)
# 引数
# input_shape : 入力データの次元
# 戻り値
# モデル

# 層追加関数
model = add_layer(model, name, func, d=None, **kwargs)
# 引数
# model : モデル
# name : レイヤの名前
# func : 中間層の関数
# d : ノード数
# kwargs: 中間層の関数のパラメータ
# 戻り値
# モデル

# 出力関数
mode = set_output(model, func, **kwargs)
# 引数
# model : モデル
# func : 出力層の関数
# kwargs: 出力層の関数のパラメータ
# 戻り値
# モデル

# 損失関数
mode = set_error(model, func, **kwargs)
# 引数
# model : モデル
# func : 損失関数
# kwargs: 損失関数のパラメータ
# 戻り値
# モデル

モデルの定義部分の実装です。

from collections import OrderedDict

import pickle

def create_model(d):
# modelの生成
model = {}
model["input"] = {"d":d}
model["layer"] = OrderedDict()
model["weight_decay"] = None
return model

def add_layer(model, name, func, d=None, **kwargs):
# layerの設定
model["layer"][name] = {"func":func, "back_func":eval(func.__name__ + "_back"), "d":d, "params":kwargs, "weights":None}
return model

def set_output(model, func, **kwargs):
# layerの設定
model["output"] = {"func":func, "d":1, "params":kwargs}
return model

def set_error(model, func, **kwargs):
# layerの設定
model["error"] = {"func":func, "d":1, "params":kwargs}
return model

重み減衰は、レイヤではありませんが、modelに設定します。

# 重み減衰関数

mode = set_weight_decay(model, func, **kwargs):
# 引数
# model : モデル
# func : 重み減衰関数
# kwargs: 重み減衰関数のパラメータ
# 戻り値
# モデル

def set_weight_decay(model, func, **kwargs):

model["weight_decay"] = {"func":func, "back_func":eval(func.__name__ + "_back"), "params":kwargs}
return model


初期化

init_modelで、affineの重み、バイアスなど、初期化が必要な変数の初期化を行います。重み、バイアスは、modelの中に格納します。

def init_model(model):

# modelの初期化
# input
d_prev = 0
d_next = model["input"]["d"]
print("input", "-", d_prev, d_next)
# layer
d_prev = d_next
for k, v in model["layer"].items():
# 要素数取得
d = v["d"] if v["d"] != None else d_prev
# 重み、バイアスの初期化
d_next = d
if v["func"].__name__ + "_init_layer" in globals():
init_layer_func = eval(v["func"].__name__ + "_init_layer")
d_next, v["weights"] = init_layer_func(d_prev, d, **v["params"])
# 要素数設定
v["d_prev"], v["d_next"] = d_prev, d_next
# モデル表示
print(k, v["func"].__name__, d_prev, d_next)
d_prev = d_next
# output
model["output"]["d_prev"] = d_prev
print("output", model["output"]["func"].__name__, d_prev)
# error
print("error", model["error"]["func"].__name__)

return model


保存、読み込み

save_model,load_modelでモデルを保存、読み込みができます。

def save_model(model, file_path):

f = open(file_path, "wb")
pickle.dump(model, f)
f.close
return

def load_model(file_path):
f = open(file_path, "rb")
model = pickle.load(f)
f.close
return model


オプティマイザ


定義

オプティマイザを定義します。

Adamの設定例です。

optimizer = create_optimizer(Adam, lr=0.1, beta1=0.9, beta2=0.999)

# オプティマイザ作成関数

optimizer = create_optimizer(func, **kwargs)
# 引数
# func : オプティマイザ関数
# kwargs: オプティマイザ関数のパラメータ
# 戻り値
# オプティマイザ

オプティマイザ定義部分の実装です。

初期化関数でオプティマイザで利用する変数を初期化します。modelと同様に、保存、読み込み関数を定義します。

def create_optimizer(func, **kwargs):

optimizer = {"func":func, "params":kwargs}
return optimizer

def init_optimizer(optimizer, model):
optimizer_stats = {}
for k, v in model["layer"].items():
# オプティマイザの初期化
if v["func"].__name__ + "_init_optimizer" in globals():
init_optimizer_func = eval(v["func"].__name__ + "_init_optimizer")
optimizer_stats[k] = init_optimizer_func()
optimizer["stats"] = optimizer_stats

return optimizer

def save_optimizer(optimizer, file_path):
f = open(file_path, "wb")
pickle.dump(optimizer, f)
f.close
return

def load_optimizer(file_path):
f = open(file_path, "rb")
optimizer = pickle.load(f)
f.close
return optimizer


順伝播、逆伝播、重みの更新


順伝播

モデルの各レイヤを実行します。その後に、出力関数と誤差関数を実行します。

活性化関数には、前層の出力データを渡します。ただし、affineなど重さを利用する場合は、propergation関数を定義し、重さを受け渡しできるようにします。

propergation関数が定義されている場合は、定義されたpropergation関数を呼び出します。

def propagation(model, x, t=None, learn_flag=True):

us = {}
u = x
err = None
weight_decay_sum = 0
# layer
for k, v in model["layer"].items():
# propagation関数設定
propagation_func = middle_propagation
if v["func"].__name__ + "_propagation" in globals():
propagation_func = eval(v["func"].__name__ + "_propagation")
# propagation関数実行
us[k], weight_decay_r = propagation_func(v["func"], u, v["weights"], model["weight_decay"], learn_flag, **v["params"])
u = us[k]["z"]
weight_decay_sum = weight_decay_sum + weight_decay_r
# output
if "output" in model:
propagation_func = output_propagation
# propagation関数実行
us["output"] = propagation_func(model["output"]["func"], u, learn_flag, **model["output"]["params"])
u = us["output"]["z"]
# error
y = u
# 学習時には、誤差は計算しない
if learn_flag == False:
if "error" in model:
if t is not None:
err = model["error"]["func"](y, t)
# 重み減衰
if "weight_decay" is not None:
if learn_flag:
err = err + weight_decay_sum

return y, err, us


逆伝播

逆伝播も順伝播と同様です。

重さを使う場合は、back_propagation関数を定義します。back_propagation関数が定義されている場合は、定義されたback_propagation関数を呼び出します。

def back_propagation(model, x=None, t=None, y=None, us=None, du=None):

dus = {}
if du is None:
# 出力層+誤差勾配関数
output_error_back_func = eval(model["output"]["func"].__name__ + "_" + model["error"]["func"].__name__ + "_back")
du = output_error_back_func(y, us["output"]["u"], t)
dus["output"] = {"du":du}
dz = du
for k, v in reversed(model["layer"].items()):
# back propagation関数設定
back_propagation_func = middle_back_propagation
if v["func"].__name__ + "_back_propagation" in globals():
back_propagation_func = eval(v["func"].__name__ + "_back_propagation")
# back propagation関数実行
dus[k] = back_propagation_func(v["back_func"], dz, us[k], v["weights"], model["weight_decay"], v["calc_du_flag"], **v["params"])
dz = dus[k]["du"]
# du計算フラグがFalseだと以降計算しない
if v["calc_du_flag"] == False:
break

return dz, dus


重みの更新

affineなど重みを更新するレイヤでは、update_weight関数を定義し、重みの更新を行います。

def update_weight(model, dus, optimizer):

for k, v in model["layer"].items():
# 重み更新
if v["func"].__name__ + "_update_weight" in globals():
update_weight_func = eval(v["func"].__name__ + "_update_weight")
v["weights"], optimizer["stats"][k] = update_weight_func(optimizer["func"], dus[k], v["weights"], optimizer["stats"][k], **optimizer["params"])
return model, optimizer


誤差

誤差関数を呼び出します。

def error(model, y, t):

return model["error"]["func"](y, t)


個別実装

それぞれの関数ごとに個別の実装を追加します。

個別に定義する関数名は、以下とします。


  • モデル初期化関数 - 関数名 + "_init_layer"

  • オプティマイザ初期化関数 - 関数名 + "_init_optimizer"

  • 順伝播 - 関数名 + "_propagation"

  • 逆伝播 - 関数名 + "_back_propagation"

  • 重みの更新 - 関数名 + "_update_weight"

必要な関数のみ定義します。


共通

特別の処理が不要な活性化関数は、以下の関数で共通的に対応します。

def middle_propagation(func, u, weights, weight_decay, learn_flag, **params):

z = func(u, **params)
return {"u":u, "z":z}, 0

def middle_back_propagation(back_func, dz, us, weights, weight_decay, calc_du_flag, **params):
du = back_func(dz, us["u"], us["z"], **params)
return {"du":du}

def output_propagation(func, u, learn_flag, **params):
z = func(u, **params)
return {"u":u, "z":z}


affine

重みW、バイアスbを利用しますので、各関数を定義します。

def affine_init_layer(d_prev, d, weight_init_func=he_normal, weight_init_params={}, bias_init_func=zeros_b, bias_init_params={}):

W = weight_init_func(d_prev, d, **weight_init_params)
b = bias_init_func(d, **bias_init_params)
return d, {"W":W, "b":b}

def affine_init_optimizer():
sW = {}
sb = {}
return {"sW":sW, "sb":sb}

def affine_propagation(func, u, weights, weight_decay, learn_flag, **params):
z = func(u, weights["W"], weights["b"])
# 重み減衰対応
weight_decay_r = 0
if weight_decay is not None:
weight_decay_r = weight_decay["func"](weights["W"], **weight_decay["params"])
return {"u":u, "z":z}, weight_decay_r

def affine_back_propagation(back_func, dz, us, weights, weight_decay, calc_du_flag, **params):
du, dW, db = back_func(dz, us["u"], weights["W"], weights["b"], calc_du_flag)
# 重み減衰対応
if weight_decay is not None:
dW = dW + weight_decay["back_func"](weights["W"], **weight_decay["params"])
return {"du":du, "dW":dW, "db":db}

def affine_update_weight(func, du, weights, optimizer_stats, **params):
weights["W"], optimizer_stats["sW"] = func(weights["W"], du["dW"], **params, **optimizer_stats["sW"])
weights["b"], optimizer_stats["sb"] = func(weights["b"], du["db"], **params, **optimizer_stats["sb"])
return weights, optimizer_stats


maxout

重みW、バイアスbを利用しますので、各関数を定義します。

def maxout_init_layer(d_prev, d, unit=1, weight_init_func=he_normal, weight_init_params={}, bias_init_func=zeros_b, bias_init_params={}):

W = weight_init_func(d_prev, d*unit, **weight_init_params)
b = bias_init_func(d*unit, **bias_init_params)
return d, {"W":W, "b":b}

def maxout_init_optimizer():
sW = {}
sb = {}
return {"sW":sW, "sb":sb}

def maxout_propagation(func, u, weights, weight_decay, learn_flag, unit=1, **params):
u_affine = affine(u, weights["W"], weights["b"])
z = func(u_affine, unit)
# 重み減衰対応(未対応)
weight_decay_r = 0
#if weight_decay is not None:
# weight_decay_r = weight_decay["func"](weights["W"], **weight_decay["params"])
return {"u":u, "z":z, "u_affine":u_affine}, weight_decay_r

def maxout_back_propagation(back_func, dz, us, weights, weight_decay, calc_du_flag, unit=1, **params):
du_affine = back_func(dz, us["u_affine"], us["z"], unit)
du, dW, db = affine_back(du_affine, us["u"], weights["W"], weights["b"])
# 重み減衰対応(未対応)
#if weight_decay is not None:
# dW = dW + weight_decay["back_func"](weights["W"], **weight_decay["params"])
return {"du":du, "dW":dW, "db":db}

def maxout_update_weight(func, du, weights, optimizer_stats, **params):
weights["W"], optimizer_stats["sW"] = func(weights["W"], du["dW"], **params, **optimizer_stats["sW"])
weights["b"], optimizer_stats["sb"] = func(weights["b"], du["db"], **params, **optimizer_stats["sb"])
return weights, optimizer_stats


prelu

alphaを学習するため、各関数を定義します。

def prelu_init_layer(d_prev, d):

alpha = np.zeros(d)
return d, {"alpha":alpha}

def prelu_init_optimizer():
salpha = {}
return {"salpha":salpha}

def prelu_propagation(func, u, weights, weight_decay, learn_flag, **params):
z = func(u, weights["alpha"])
return {"u":u, "z":z}, 0

def prelu_back_propagation(back_func, dz, us, weights, weight_decay, calc_du_flag, **params):
du, dalpha = back_func(dz, us["u"], us["z"], weights["alpha"])
return {"du":du, "dalpha":dalpha}

def prelu_update_weight(func, du, weights, optimizer_stats, **params):
weights["alpha"], optimizer_stats["salpha"] = func(weights["alpha"], du["dalpha"], **params, **optimizer_stats["salpha"])
return weights, optimizer_stats


rrelu

実行時に乱数を利用し、alphaを決定するため、各関数を定義します。

def rrelu_init_layer(d_prev, d, min=0.0, max=0.1):

alpha = np.random.uniform(min, max, d)
return d, {"alpha":alpha}

def rrelu_propagation(func, u, weights, weight_decay, learn_flag, min=0.0, max=0.1):
z = func(u, weights["alpha"])
return {"u":u, "z":z}, 0

def rrelu_back_propagation(back_func, dz, us, weights, weight_decay, calc_du_flag, min=0.0, max=0.1):
du = back_func(dz, us["u"], us["z"], weights["alpha"])
return {"du":du}


batch_normalization

gamma、betaを学習するため、各関数を定義します。

今までは、予測時も予測データの平均、分散を利用していましたが、本来は学習時の平均、分散を利用すべきでした。予測時には、学習時の平均、分散を利用するように実装変更しました。

def batch_normalization_init_layer(d_prev, d, batch_norm_node=True, use_gamma_beta=True, use_train_stats=True, alpha=0.1):

# バッチ正規化
dim = d
if type(d) == int:
dim = (d,)
# gamma,beta初期化
if batch_norm_node:
gamma = np.ones((1,) + dim)
beta = np.zeros((1,) + dim)
else:
gamma = np.ones(1)
beta = np.zeros(1)
# 学習時の統計情報初期化
train_mean = None
train_var = None
if