7
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

強化学習 Q-learningで迷路探索アルゴリズム

Posted at

強化学習のQ-learningで迷路探索を行います。
Pythonのソースコードはこちら。

import numpy as np
import matplotlib.pyplot as plt
import copy

def main():
	global is_end_episode
	global rewards

	for episode in range(NB_EPISODE):
		episode_reward = []
		while(is_end_episode == False):
			action = act_func()
			state_return, reward, is_end_episode = step_func(action)
			observe_func(state_return, reward)
			episode_reward.append(reward)
		rewards.append(np.sum(episode_reward))
		state_return2 = reset_state_func()
		observe_func(state_return2)
		is_end_episode = False

	# 結果のプロット
	plt.plot(np.arange(NB_EPISODE), rewards)
	plt.xlabel("episode")
	plt.ylabel("reward")
	plt.savefig("result.jpg")
	plt.show()

NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)

filed_type = {
	"N": 0,  # 通常
	"G": 1,  # ゴール
	"W": 2,  # 壁
	"D": 3,  # ダミー
}

actions_key = {
	"UP": 0,
	"DOWN": 1,
	"LEFT": 2,
	"RIGHT": 3
}

map_field =	[[3, 2, 0, 1],
			[ 0, 0, 0, 2],
			[ 2, 2, 0, 0],
			[ 0, 0, 0, 2],
			[ 0, 2, 0, 0]]

start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)

alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON

actions = ACTIONS

previous_state = None
previous_action = None

state = start_pos

rewards = []

is_end_episode = False

def _init_q_values():
	global q_values
	global actions

	q_values = {}
	q_values[state] = np.repeat(0.0, len(actions))
	return q_values

q_values = _init_q_values()

def act_func():
	global epsilon
	global q_values
	global state
	global previous_action

	if np.random.uniform() < epsilon:
		action = np.random.randint(0, len(q_values[state]))
	else:
		action = np.argmax(q_values[state])

	previous_action = action
	return action

def step_func(action):
	global agent_pos
	global actions_key

	to_x, to_y = copy.deepcopy(agent_pos)

	if _is_possible_action(to_x, to_y, action) == False:
		return agent_pos, -1, False

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	is_goal = _is_end_episode(to_x, to_y)
	reward = _compute_reward(to_x, to_y)
	agent_pos = to_x, to_y
	return agent_pos, reward, is_goal

def _is_possible_action(x, y, action):
	global actions_key
	global map_field

	to_x = x
	to_y = y

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	if len(map_field) <= to_y or 0 > to_y:
		return False
	elif len(map_field[0]) <= to_x or 0 > to_x:
		return False
	elif _is_wall(to_x, to_y):
		return False

	return True

def _is_end_episode(x,y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["G"]:
		return True
	elif map_field[y][x] == filed_type["D"]:
		return True
	else:
		return False

def _compute_reward(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["N"]:
		return 0
	elif map_field[y][x] == filed_type["G"]:
		return 100
	elif map_field[y][x] == filed_type["D"]:
		return -100

def _is_wall(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["W"]:
		return True
	else:
		return False

def observe_func(next_state, reward=None):
	global q_values
	global actions
	global previous_state
	global state

	next_state = str(next_state)
	if next_state not in q_values:
		q_values[next_state] = np.repeat(0.0, len(actions))

	previous_state = copy.deepcopy(state)
	state = next_state

	if reward is not None:
		learn(reward)

def learn(reward):
	global q_values
	global previous_state
	global previous_action
	global state
	global alpha
	global gamma

	q = q_values[previous_state][previous_action]  # Q(s, a)
	max_q = max(q_values[state])  # max Q(s')
	# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
	q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))

def reset_state_func():
	global agent_pos
	global start_pos

	agent_pos = start_pos
	return start_pos

if __name__ == '__main__':
	main()

今回エージェントが探索する迷路はこちら。

maze.jpg

探索結果がこちら。
横軸が何回迷路をスタートからゴールまで繰り返して探索したか、縦軸が報酬値。
回数を繰り返す毎に多くの報酬が貰えてる事がわかる。

res.jpg

以下、ソースコードの解説をします。
興味のある方はどうぞ。

def main():
	global is_end_episode
	global rewards

	for episode in range(NB_EPISODE):
		episode_reward = []
		while(is_end_episode == False):
			action = act_func()
			state_return, reward, is_end_episode = step_func(action)
			observe_func(state_return, reward)
			episode_reward.append(reward)
		rewards.append(np.sum(episode_reward))
		state_return2 = reset_state_func()
		observe_func(state_return2)
		is_end_episode = False

	# 結果のプロット
	plt.plot(np.arange(NB_EPISODE), rewards)
	plt.xlabel("episode")
	plt.ylabel("reward")
	plt.savefig("result.jpg")
	plt.show()

メイン関数です。
for文でepisode(今回は100episode)を繰り返します。
まずepisode_reward(1episodeの経路毎の報酬値)を初期化します。
その後while文でスタートからゴールに辿り着くまでを繰り返します。
最初にact_func()関数で上下左右どの方向に決めてその値をaction変数に返します。
次にstep_func()関数でマス目を進めて、次のマス目の座標、次のマス目の報酬値、ゴールしてるかのフラグを返します。
observe_func()関数で現在位置と過去位置の更新を行った後にQ-valueの値を更新します。
最後にepisode_reward配列の末尾にマス目の報酬値を追加してwhile文の一周を終えます。
while文を抜けたら1経路分の報酬値の合計値をrewards配列の末尾に追加します。
reset_state_func()関数でスタート位置を返してobserve_func()関数で現在位置をスタート位置に更新します。
is_end_episodeフラグ(ゴールしたかのフラグ)をFalse(ゴールしてない)に戻します。
これがfor文の繰り返し内容になります。
for文を抜けたらグラフ表示を行い、横軸がepisode数、縦軸が報酬値となります。
ついでにカレントディレクトリにjpgファイルを保存してます。

NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)

filed_type = {
	"N": 0,  # 通常
	"G": 1,  # ゴール
	"W": 2,  # 壁
	"D": 3,  # ダミー
}

actions_key = {
	"UP": 0,
	"DOWN": 1,
	"LEFT": 2,
	"RIGHT": 3
}

map_field =	[[3, 2, 0, 1],
			[ 0, 0, 0, 2],
			[ 2, 2, 0, 0],
			[ 0, 0, 0, 2],
			[ 0, 2, 0, 0]]

start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)

alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON

actions = ACTIONS

previous_state = None
previous_action = None

state = start_pos

rewards = []

is_end_episode = False

NB_EPISODEはepisode数、EPSILONはε-greedy法の行動決定率(後述)、ALPHAは学習率(0~1)(後述)、GAMMAは割引率(0~1)(後述)、ACTIONSは上下左右の要素数(4)、field_typeは迷路の経路, ゴール, 壁, ダミーゴールの判別、actions_keyは上下左右を表す、map_fieldは迷路配列、start_posはスタート位置、agent_posはエージェントの現在位置、alpha,gamma,epsilon,actionsに定数値を代入、previous_stateは過去(一つ前)の座標、previous_actionは過去(一つ前)の移動方向値、stateは移動後座標、rewards配列は報酬値保存用配列、is_end_episodeはゴールしたかどうかのフラグです。

def _init_q_values():
	global q_values
	global actions

	q_values = {}
	q_values[state] = np.repeat(0.0, len(actions))
	return q_values

q_values = _init_q_values()

_init_q_values()関数の処理です。
q_values配列のstate座標(スタート座標)を上下左右の4要素を0.0で初期化しておきます。
q_values変数にあらかじめ代入しておきます。

def act_func():
	global epsilon
	global q_values
	global state
	global previous_action

	if np.random.uniform() < epsilon:
		action = np.random.randint(0, len(q_values[state]))
	else:
		action = np.argmax(q_values[state])

	previous_action = action
	return action

act_func()関数ですε-greedy法に則ってepsilon(ε)を確率とする(0~1までの値)値で上下左右のランダム行動かもしくはQ-value値による上下左右の最大のQ-value値の選択行動か選びます。今回はepsilonを0.1に設定してるので10%の確率でランダム行動、90%でQ-value値のmax値を選択する行動になります。previous_action変数に移動方向を保存してaction変数(移動方向)を返します。

def step_func(action):
	global agent_pos
	global actions_key

	to_x, to_y = copy.deepcopy(agent_pos)

	if _is_possible_action(to_x, to_y, action) == False:
		return agent_pos, -1, False

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	is_goal = _is_end_episode(to_x, to_y)
	reward = _compute_reward(to_x, to_y)
	agent_pos = to_x, to_y
	return agent_pos, reward, is_goal

step_func()関数の処理です。
現在のエージェントの位置のx座標、y座標を求めます。
_is_possible_action()関数で壁や迷路の枠を越えているかを判定します。
移動不可能の場合は現在位置を次の位置、報酬を-1、ゴールしてないとフラグを設定してその3つを返戻値として返します。
action変数が上下左右か確認して次の座標位置を定めます。
_is_end_episode()関数でゴールしてるかの判定を行い、_compute_reward()関数で報酬値を得て、エージェントの位置を次の位置にしてその3つを返戻値として返します。

def _is_possible_action(x, y, action):
	global actions_key
	global map_field

	to_x = x
	to_y = y

	if action == actions_key["UP"]:
		to_y += -1
	elif action == actions_key["DOWN"]:
		to_y += 1
	elif action == actions_key["LEFT"]:
		to_x += -1
	elif action == actions_key["RIGHT"]:
		to_x += 1

	if len(map_field) <= to_y or 0 > to_y:
		return False
	elif len(map_field[0]) <= to_x or 0 > to_x:
		return False
	elif _is_wall(to_x, to_y):
		return False

	return True

_is_possible_action()関数の処理です。
次の位置が壁かマップの外かを判定して真偽値を返します。

def _is_end_episode(x,y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["G"]:
		return True
	elif map_field[y][x] == filed_type["D"]:
		return True
	else:
		return False

_is_end_episode()関数の処理です。
ゴールしてるかの判定を行います。
ダミーゴールでもゴールしたものとみなします。

def _compute_reward(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["N"]:
		return 0
	elif map_field[y][x] == filed_type["G"]:
		return 100
	elif map_field[y][x] == filed_type["D"]:
		return -100

_compute_reward()関数の処理です。
指定したマス目の報酬値を返します。
通常なら0、ゴールなら100、ダミーゴールなら-100を返します。

def _is_wall(x, y):
	global map_field
	global field_type

	if map_field[y][x] == filed_type["W"]:
		return True
	else:
		return False

_is_wall()関数の処理です。
指定したマス目が壁かどうかの判定を行います。

def observe_func(next_state, reward=None):
	global q_values
	global actions
	global previous_state
	global state

	next_state = str(next_state)
	if next_state not in q_values:
		q_values[next_state] = np.repeat(0.0, len(actions))

	previous_state = copy.deepcopy(state)
	state = next_state

	if reward is not None:
		learn(reward)

observe_func()関数の処理です。
次に進む位置のq_value配列の要素が存在しなければ要素を追加して、エージェントの現在座標、過去座標の更新を行い、報酬値(reward)がNoneじゃなければQ学習によるQ-valueの更新を行います。

def learn(reward):
	global q_values
	global previous_state
	global previous_action
	global state
	global alpha
	global gamma

	q = q_values[previous_state][previous_action]  # Q(s, a)
	max_q = max(q_values[state])  # max Q(s')
	# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
	q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))

learn()関数の処理です。
公式に従ってQ-valueの更新を行います。
Q-valueの更新の公式は以下です。

Q-value公式.jpg

def reset_state_func():
	global agent_pos
	global start_pos

	agent_pos = start_pos
	return start_pos

reset_state_func()関数の処理です。
エージェントの位置を初期位置に戻して返します。

if __name__ == '__main__':
	main()

main()関数を実行します。

以上がQ-learningにおける迷路探索の強化学習アルゴリズムの説明になります。

7
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?