13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Normalized Advantage Functionを用いた連続値での深層強化学習の実装

Last updated at Posted at 2019-09-16

Introduction

深層強化学習の分野において,近年ではDeep Q-Network(DQN)などの手法が活用されています.

DQNはAtariのゲームで従来の人工知能による得点を上回り、さらに幾つかのゲームではプロゲーマーと同等またはそれ以上のパフォーマンスを見せたことは記憶に新しいです.

しかし,DQNは行動空間が離散値であり,
ロボットのマニュピレータの操作等,現実では行動空間が連続値として扱われるものが多いです.
行動空間が連続値である強化学習では、Acter Criticのアプローチに基づく手法であるDeep Deter ministic Policy Gradient (DDPG)などがありますが,
Q学習のアプローチに基づく手法であるNormalized Advantage Function (NAF)を実装していきます.
構造的には意外にシンプルであり,DQNを理解しておられる方であれば問題なく理解できると思います.

プログラムの実装は極力kerasを使うことで200行程度でシンプルに記述しました(多少tensorflowを使用しています).

また,私の理解がまだ及んでいない部分がある可能性が非常に高いため「ココが違う!」等のミスがありましたら修正のリクエストをかけていただけるとありがたいです.


※ 今回はDQNを理解されている方を対象としているため,DQNで用いられているExperience ReplayやTarget Network等に関しては省いて説明します.

Normalized Advantage Function (NAF)とは

NAFは深層強化学習のなかでも連続値の行動空間に対応したQ学習です.

また,DQNをベースとした手法であり、Double DQNを拡張したのがNAFとなります.

DQNからNAFまでの軌跡はこのようになっているようです.
図はQiita - 強化学習の系図から引用させていただきました.大変参考になります.

keizu.png

NAFのメリットとしては,同じ行動空間が連続値の深層強化学習であるDDPGと比べ,収束にかかるエピソード数が少ないと言われています.

NAFの特徴

DQNをベースとしているため,Experience ReplayやTarget Network等が同様に用いられます.

大枠としては,状態と実際にとった行動を入力とし、ネットワーク内で予測された行動のQ値を出力する多入力・単出力型のネットワークとなっています.

F20A71E7-5EA8-4EB3-BE86-5E84652B6498.jpeg

図はIntel AI Lab, Reinforcement Learning Coachから引用




また,NAFはMiddlewareの後に3つに分岐し,これらの出力を用いて予測された行動のQ値を求めるNAF Headが存在しています.

この3つは

1つ目は入力した状態に対して最適な行動を出力する層(活性化関数はTanH)
2つ目はベルマン誤差を出力する層(活性化関数はlinear)
3つ目は行動価値を出力する層(活性化関数は指定無し,とりあえずlinear)

となっています.

ちなみに,このネットワークを見ると「入力した状態に対して予測された行動が出力されてないんじゃ?」と思うかもしれませんが,
”入力した状態に対して最適な行動を出力(予測)する層”
から出力を抜き出してしまえば良いわけですね(実装の仕方はプログラム参照).


それでは,このネットワークの出力からQ値を求めてみましょう. そのために,以下の式を使っていきます.

まず,ネットワークにより出力されたベルマン誤差 $L$を用いて状態依存値(state-dependent)$P$を求めます.

P(\textbf{x}|\theta^{P}) = L(\textbf{x}|\theta^{P})L(\textbf{x}|\theta^{P})^T

次に,ネットワークにより出力された予測された最適な行動を $\mu$,
状態$\textbf{x}$で実際にとった行動$\textbf{u}$,
さきほど求めた状態依存値$P$,
を用いてアドバンテージ関数(advantage function)$A$によりアドバンテージを求めます.

A(\textbf{x}, \textbf{u} | \theta^A ) = -\frac{1}{2}(\textbf{u}-\mu(\textbf{x} | \theta^{\mu})) \cdot P(\textbf{x} | \theta^{P})\cdot (\textbf{u}-\mu(\textbf{x} | \theta^{\mu})) 

最後に,ネットワークにより出力された行動価値$V$とアドバンテージ$A$を合わせてQ値を求めます.

Q(\textbf{x}, \textbf{u} | \theta^{Q}) = A(\textbf{x}, \textbf{u} | \theta^A ) + V(\textbf{x} | \theta^V) 

これでQ値が求まります.

実装

では実際に実装をしていきましょう.

今回環境に用いるのはOpenAI GymのPendulum-v0です.
https://github.com/openai/gym/wiki/Pendulum-v0

元論文より、大まかなアルゴリズムはこのようになっています。

44656A55-7120-4D30-A68B-EDE2C0081E0F.jpeg

実装した学習用プログラム

predict.py

# -*- coding: utf-8 -*-

import numpy as np
import gym
import random

import keras
from keras import backend as K
from keras.layers.convolutional import Conv2D
from keras.models import Model
from keras.layers import Input, Dense, Lambda
import tensorflow as tf
from keras.optimizers import Adam


from collections import deque


import sys
sys.path.remove('/opt/ros/kinetic/lib/python2.7/dist-packages')
import cv2

from skimage.color import rgb2gray
from skimage.transform import resize


gamma = 0.99

np.set_printoptions(suppress=True)

resize_x = 84
resize_y = 84

file_history = open("history.txt", "w")
file_history.close()


class Agent:
	def __init__(self, env):
		self.env = env
		self.input_dim = env.observation_space.shape
		
		self.state_dim = 3
		self.actions_dim = 1

		self.q_net_q, self.q_net_a, self.q_net_v = self.createModel()
		self.t_net_q, self.t_net_action, self.t_net_v = self.createModel()
		# 損失関数
		adam = Adam(lr=0.001, clipnorm=1.)
		# モデル生成
		self.q_net_q.compile(optimizer=adam, loss='mae')
		self.t_net_q.compile(optimizer=adam, loss='mae')

	def createModel(self):
		x = Input(shape=(self.state_dim,), name='observation_input')
		u = Input(shape=(self.actions_dim,), name='action_input')
		# Middleware
		h = Dense(32, activation="relu")(x)
		h = Dense(32, activation="relu")(h)
		h = Dense(32, activation="relu")(h)
		
		# NAF Head

		# < Value function >
		V = Dense(1, activation="linear", name='value_function')(h)
		
		# < Action Mean >
		mu = Dense(self.actions_dim, activation="tanh", name='action_mean')(h)
		
		# < L function -> P function >
		l0 = Dense(int(self.actions_dim * (self.actions_dim + 1) / 2), activation="linear", name='l0')(h)
		l1 = Lambda(lambda x: tf.contrib.distributions.fill_triangular(x))(l0)
		L = Lambda(lambda x: tf.matrix_set_diag(x, tf.exp(tf.matrix_diag_part(x))))(l1)
		P = Lambda(lambda x: tf.matmul(x, tf.matrix_transpose(x)))(L)
		
		# < Action function >
		u_mu = keras.layers.Subtract()([u, mu])
		u_mu_P = keras.layers.Dot(axes=1)([u_mu, P]) # transpose 自動でされてた
		u_mu_P_u_mu = keras.layers.Dot(axes=1)([u_mu_P, u_mu])
		A = Lambda(lambda x: -1.0/2.0 * x)(u_mu_P_u_mu)
		
		# < Q function >
		Q = keras.layers.Add()([A, V])

		# Input and Output
		model_q = Model(input=[x, u], output=[Q])
		model_mu = Model(input=[x], output=[mu])
		model_v = Model(input=[x], output=[V])
		model_q.summary()
		model_mu.summary()
		model_v.summary()

		return model_q, model_mu, model_v


	def getAction(self, state):
		action = self.q_net_a.predict_on_batch(state[np.newaxis,:])
		return action

	def Train(self, x_batch, y_batch):
		return self.q_net_q.train_on_batch(x_batch, y_batch)

	def PredictT(self, x_batch):
		return self.t_net_q.predict_on_batch(x_batch)

	def WeightCopy(self):
		self.t_net_q.set_weights(self.q_net_q.get_weights())
	

def CreateBatch(agent, replay_memory, batch_size):
	minibatch = random.sample(replay_memory, batch_size)
	state, action, reward, state2, end_flag =  map(np.array, zip(*minibatch))

	x_batch = state
	next_v_values = agent.t_net_v.predict_on_batch(state2)
	y_batch = np.zeros(batch_size)
	
	for i in range(batch_size):
		y_batch[i] = reward[i] + gamma * next_v_values[i]
	return [x_batch, action], y_batch



def main():
	n_episode = 150 # 繰り返すエピソード回数
	max_memory = 20000 # リプレイメモリの容量
	batch_size = 256 # いい感じに収束しないときはバッチサイズいろいろ変えてみて 
	
	max_sigma = 0.99 # 付与するノイズの最大分散値
	sigma = max_sigma

	reduce_sigma = max_sigma / n_episode # 1エピソードで下げる分散値

	env = gym.make("Pendulum-v0") # 環境
	agent = Agent(env)
	# リプレイメモリ
	replay_memory = deque()

	# ゲーム再スタート
	for episode in range(n_episode):

		print("episode " + str(episode))
		end_flag = False
		state = env.reset()

		sigma -= reduce_sigma
		if sigma < 0:
			sigma = 0

		while not end_flag:
			# 行動にノイズを付与
			action = agent.getAction(state) + np.random.normal(0, sigma, size=1)
			action = action[:,0]
			# Pendulumの行動が-2~2の範囲なので変換
			action = np.clip(action, -1.0, 1.0) * 2
			
			state2, reward, end_flag, info = env.step(action)
			# 前処理
			# リプレイメモリに保存
			replay_memory.append([state, action, reward, state2, end_flag])
			# リプレイメモリが溢れたら前から削除
			if len(replay_memory) > max_memory:
				replay_memory.popleft()
			# リプレイメモリが溜まったら学習
			if len(replay_memory) > batch_size*4:
				x_batch, y_batch = CreateBatch(agent, replay_memory, batch_size)
				agent.Train(x_batch, y_batch)
			
			state = state2
			# 可視化をする場合はこのコメントアウトを解除
			env.render()
		
		# 4episodeに1回ターゲットネットワークに重みをコピー
		if episode != 0 and episode % 4 == 0:
			agent.WeightCopy()
			# Q-networkの重みをTarget-networkにコピー
			agent.t_net_q.save_weights("weight.h5")
				
		
	env.close()
	
	agent.WeightCopy()
	# Q-networkの重みをTarget-networkにコピー
	agent.t_net_q.save_weights("weight.h5")


if __name__ == '__main__':
	main()

学習結果

こちらのテスト用プログラムを動かせば学習時に含ませたノイズがない状態で結果が見れます.

test.py
# -*- coding: utf-8 -*-

import numpy as np
import gym
import keras
import tensorflow as tf
from keras.models import Model
from keras.layers import Input, Dense, Lambda
from keras.optimizers import Adam
from collections import deque



class Agent:
	def __init__(self, env):
		self.env = env
		self.input_dim = env.observation_space.shape
		
		self.state_dim = 3
		self.actions_dim = 1

		self.t_net_q, self.t_net_a, self.t_net_v = self.createModel()

		self.t_net_q.load_weights("weight.h5") # 学習した重みのロード

	def createModel(self):
		x = Input(shape=(self.state_dim,), name='observation_input')
		u = Input(shape=(self.actions_dim,), name='action_input')
		# Middleware
		h = Dense(32, activation="relu")(x)
		h = Dense(32, activation="relu")(h)
		h = Dense(32, activation="relu")(h)
		
		# NAF Head

		# < Value function >
		V = Dense(1, activation="linear", name='value_function')(h)
		
		# < Action Mean >
		mu = Dense(self.actions_dim, activation="tanh", name='action_mean')(h)
		
		# < L function -> P function >
		l0 = Dense(int(self.actions_dim * (self.actions_dim + 1) / 2), activation="linear", name='l0')(h)
		l1 = Lambda(lambda x: tf.contrib.distributions.fill_triangular(x))(l0)
		L = Lambda(lambda x: tf.matrix_set_diag(x, tf.exp(tf.matrix_diag_part(x))))(l1)
		P = Lambda(lambda x: tf.matmul(x, tf.matrix_transpose(x)))(L)
		
		# < Action function >
		u_mu = keras.layers.Subtract()([u, mu])
		u_mu_P = keras.layers.Dot(axes=1)([u_mu, P]) # transpose 自動でされてた
		u_mu_P_u_mu = keras.layers.Dot(axes=1)([u_mu_P, u_mu])
		A = Lambda(lambda x: -1.0/2.0 * x)(u_mu_P_u_mu)
		
		# < Q function >
		Q = keras.layers.Add()([A, V])

		# Input and Output
		model_q = Model(input=[x, u], output=[Q])
		model_mu = Model(input=[x], output=[mu])
		model_v = Model(input=[x], output=[V])
		model_q.summary()
		model_mu.summary()
		model_v.summary()

		return model_q, model_mu, model_v


	def getAction(self, state):
		action = self.t_net_a.predict_on_batch(state[np.newaxis,:])
		return action


def main():
	env = gym.make("Pendulum-v0") # 環境
	agent = Agent(env)

	# ゲーム再スタート
	while 1:
		end_flag = False
		state = env.reset()

		while not end_flag:
			action = agent.getAction(state)
			action = action[:,0]
			# Pendulumの行動が-2~2の範囲なので変換
			action = np.clip(action, -1.0, 1.0) * 2
			
			state2, reward, end_flag, info = env.step(action)
			
			state = state2
			
			env.render()
		
	env.close()


if __name__ == '__main__':
	main()

学習結果としてはこのようになっています(学習されないことがあるので,その場合は何度か学習し直してみてください).

NAF_3layer (1).gif

参考文献

元論文
https://arxiv.org/abs/1603.00748
NAFを用いた日本語論文
https://www.ai-gakkai.or.jp/jsai2017/webprogram/2017/pdf/642.pdf
NAFの図
https://nervanasystems.github.io/coach/components/agents/value_optimization/naf.html#choosing-an-action
実装で参考にしたもの
https://gym.openai.com/evaluations/eval_CzoNQdPSAm0J3ikTBSTCg/
https://github.com/NervanaSystems/coach/blob/master/rl_coach/agents/naf_agent.py
https://github.com/carpedm20/NAF-tensorflow/blob/master/src/network.py
活性化関数での参考
https://pdfs.semanticscholar.org/e1af/36deaf167b4a671d25cb98f392d4a3ffac4a.pdf
中間レイヤーを出力する方法
https://keras.io/ja/getting-started/faq/#_7

13
13
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?