強化学習のQ-learningで迷路探索を行います。
Pythonのソースコードはこちら。
import numpy as np
import matplotlib.pyplot as plt
import copy
def main():
global is_end_episode
global rewards
for episode in range(NB_EPISODE):
episode_reward = []
while(is_end_episode == False):
action = act_func()
state_return, reward, is_end_episode = step_func(action)
observe_func(state_return, reward)
episode_reward.append(reward)
rewards.append(np.sum(episode_reward))
state_return2 = reset_state_func()
observe_func(state_return2)
is_end_episode = False
# 結果のプロット
plt.plot(np.arange(NB_EPISODE), rewards)
plt.xlabel("episode")
plt.ylabel("reward")
plt.savefig("result.jpg")
plt.show()
NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)
filed_type = {
"N": 0, # 通常
"G": 1, # ゴール
"W": 2, # 壁
"D": 3, # ダミー
}
actions_key = {
"UP": 0,
"DOWN": 1,
"LEFT": 2,
"RIGHT": 3
}
map_field = [[3, 2, 0, 1],
[ 0, 0, 0, 2],
[ 2, 2, 0, 0],
[ 0, 0, 0, 2],
[ 0, 2, 0, 0]]
start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)
alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON
actions = ACTIONS
previous_state = None
previous_action = None
state = start_pos
rewards = []
is_end_episode = False
def _init_q_values():
global q_values
global actions
q_values = {}
q_values[state] = np.repeat(0.0, len(actions))
return q_values
q_values = _init_q_values()
def act_func():
global epsilon
global q_values
global state
global previous_action
if np.random.uniform() < epsilon:
action = np.random.randint(0, len(q_values[state]))
else:
action = np.argmax(q_values[state])
previous_action = action
return action
def step_func(action):
global agent_pos
global actions_key
to_x, to_y = copy.deepcopy(agent_pos)
if _is_possible_action(to_x, to_y, action) == False:
return agent_pos, -1, False
if action == actions_key["UP"]:
to_y += -1
elif action == actions_key["DOWN"]:
to_y += 1
elif action == actions_key["LEFT"]:
to_x += -1
elif action == actions_key["RIGHT"]:
to_x += 1
is_goal = _is_end_episode(to_x, to_y)
reward = _compute_reward(to_x, to_y)
agent_pos = to_x, to_y
return agent_pos, reward, is_goal
def _is_possible_action(x, y, action):
global actions_key
global map_field
to_x = x
to_y = y
if action == actions_key["UP"]:
to_y += -1
elif action == actions_key["DOWN"]:
to_y += 1
elif action == actions_key["LEFT"]:
to_x += -1
elif action == actions_key["RIGHT"]:
to_x += 1
if len(map_field) <= to_y or 0 > to_y:
return False
elif len(map_field[0]) <= to_x or 0 > to_x:
return False
elif _is_wall(to_x, to_y):
return False
return True
def _is_end_episode(x,y):
global map_field
global field_type
if map_field[y][x] == filed_type["G"]:
return True
elif map_field[y][x] == filed_type["D"]:
return True
else:
return False
def _compute_reward(x, y):
global map_field
global field_type
if map_field[y][x] == filed_type["N"]:
return 0
elif map_field[y][x] == filed_type["G"]:
return 100
elif map_field[y][x] == filed_type["D"]:
return -100
def _is_wall(x, y):
global map_field
global field_type
if map_field[y][x] == filed_type["W"]:
return True
else:
return False
def observe_func(next_state, reward=None):
global q_values
global actions
global previous_state
global state
next_state = str(next_state)
if next_state not in q_values:
q_values[next_state] = np.repeat(0.0, len(actions))
previous_state = copy.deepcopy(state)
state = next_state
if reward is not None:
learn(reward)
def learn(reward):
global q_values
global previous_state
global previous_action
global state
global alpha
global gamma
q = q_values[previous_state][previous_action] # Q(s, a)
max_q = max(q_values[state]) # max Q(s')
# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))
def reset_state_func():
global agent_pos
global start_pos
agent_pos = start_pos
return start_pos
if __name__ == '__main__':
main()
今回エージェントが探索する迷路はこちら。
探索結果がこちら。
横軸が何回迷路をスタートからゴールまで繰り返して探索したか、縦軸が報酬値。
回数を繰り返す毎に多くの報酬が貰えてる事がわかる。
以下、ソースコードの解説をします。
興味のある方はどうぞ。
def main():
global is_end_episode
global rewards
for episode in range(NB_EPISODE):
episode_reward = []
while(is_end_episode == False):
action = act_func()
state_return, reward, is_end_episode = step_func(action)
observe_func(state_return, reward)
episode_reward.append(reward)
rewards.append(np.sum(episode_reward))
state_return2 = reset_state_func()
observe_func(state_return2)
is_end_episode = False
# 結果のプロット
plt.plot(np.arange(NB_EPISODE), rewards)
plt.xlabel("episode")
plt.ylabel("reward")
plt.savefig("result.jpg")
plt.show()
メイン関数です。
for文でepisode(今回は100episode)を繰り返します。
まずepisode_reward(1episodeの経路毎の報酬値)を初期化します。
その後while文でスタートからゴールに辿り着くまでを繰り返します。
最初にact_func()関数で上下左右どの方向に決めてその値をaction変数に返します。
次にstep_func()関数でマス目を進めて、次のマス目の座標、次のマス目の報酬値、ゴールしてるかのフラグを返します。
observe_func()関数で現在位置と過去位置の更新を行った後にQ-valueの値を更新します。
最後にepisode_reward配列の末尾にマス目の報酬値を追加してwhile文の一周を終えます。
while文を抜けたら1経路分の報酬値の合計値をrewards配列の末尾に追加します。
reset_state_func()関数でスタート位置を返してobserve_func()関数で現在位置をスタート位置に更新します。
is_end_episodeフラグ(ゴールしたかのフラグ)をFalse(ゴールしてない)に戻します。
これがfor文の繰り返し内容になります。
for文を抜けたらグラフ表示を行い、横軸がepisode数、縦軸が報酬値となります。
ついでにカレントディレクトリにjpgファイルを保存してます。
NB_EPISODE = 100
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.9
ACTIONS = np.arange(4)
filed_type = {
"N": 0, # 通常
"G": 1, # ゴール
"W": 2, # 壁
"D": 3, # ダミー
}
actions_key = {
"UP": 0,
"DOWN": 1,
"LEFT": 2,
"RIGHT": 3
}
map_field = [[3, 2, 0, 1],
[ 0, 0, 0, 2],
[ 2, 2, 0, 0],
[ 0, 0, 0, 2],
[ 0, 2, 0, 0]]
start_pos = 0, 4
agent_pos = copy.deepcopy(start_pos)
alpha = ALPHA
gamma = GAMMA
epsilon = EPSILON
actions = ACTIONS
previous_state = None
previous_action = None
state = start_pos
rewards = []
is_end_episode = False
NB_EPISODEはepisode数、EPSILONはε-greedy法の行動決定率(後述)、ALPHAは学習率(0~1)(後述)、GAMMAは割引率(0~1)(後述)、ACTIONSは上下左右の要素数(4)、field_typeは迷路の経路, ゴール, 壁, ダミーゴールの判別、actions_keyは上下左右を表す、map_fieldは迷路配列、start_posはスタート位置、agent_posはエージェントの現在位置、alpha,gamma,epsilon,actionsに定数値を代入、previous_stateは過去(一つ前)の座標、previous_actionは過去(一つ前)の移動方向値、stateは移動後座標、rewards配列は報酬値保存用配列、is_end_episodeはゴールしたかどうかのフラグです。
def _init_q_values():
global q_values
global actions
q_values = {}
q_values[state] = np.repeat(0.0, len(actions))
return q_values
q_values = _init_q_values()
_init_q_values()関数の処理です。
q_values配列のstate座標(スタート座標)を上下左右の4要素を0.0で初期化しておきます。
q_values変数にあらかじめ代入しておきます。
def act_func():
global epsilon
global q_values
global state
global previous_action
if np.random.uniform() < epsilon:
action = np.random.randint(0, len(q_values[state]))
else:
action = np.argmax(q_values[state])
previous_action = action
return action
act_func()関数ですε-greedy法に則ってepsilon(ε)を確率とする(0~1までの値)値で上下左右のランダム行動かもしくはQ-value値による上下左右の最大のQ-value値の選択行動か選びます。今回はepsilonを0.1に設定してるので10%の確率でランダム行動、90%でQ-value値のmax値を選択する行動になります。previous_action変数に移動方向を保存してaction変数(移動方向)を返します。
def step_func(action):
global agent_pos
global actions_key
to_x, to_y = copy.deepcopy(agent_pos)
if _is_possible_action(to_x, to_y, action) == False:
return agent_pos, -1, False
if action == actions_key["UP"]:
to_y += -1
elif action == actions_key["DOWN"]:
to_y += 1
elif action == actions_key["LEFT"]:
to_x += -1
elif action == actions_key["RIGHT"]:
to_x += 1
is_goal = _is_end_episode(to_x, to_y)
reward = _compute_reward(to_x, to_y)
agent_pos = to_x, to_y
return agent_pos, reward, is_goal
step_func()関数の処理です。
現在のエージェントの位置のx座標、y座標を求めます。
_is_possible_action()関数で壁や迷路の枠を越えているかを判定します。
移動不可能の場合は現在位置を次の位置、報酬を-1、ゴールしてないとフラグを設定してその3つを返戻値として返します。
action変数が上下左右か確認して次の座標位置を定めます。
_is_end_episode()関数でゴールしてるかの判定を行い、_compute_reward()関数で報酬値を得て、エージェントの位置を次の位置にしてその3つを返戻値として返します。
def _is_possible_action(x, y, action):
global actions_key
global map_field
to_x = x
to_y = y
if action == actions_key["UP"]:
to_y += -1
elif action == actions_key["DOWN"]:
to_y += 1
elif action == actions_key["LEFT"]:
to_x += -1
elif action == actions_key["RIGHT"]:
to_x += 1
if len(map_field) <= to_y or 0 > to_y:
return False
elif len(map_field[0]) <= to_x or 0 > to_x:
return False
elif _is_wall(to_x, to_y):
return False
return True
_is_possible_action()関数の処理です。
次の位置が壁かマップの外かを判定して真偽値を返します。
def _is_end_episode(x,y):
global map_field
global field_type
if map_field[y][x] == filed_type["G"]:
return True
elif map_field[y][x] == filed_type["D"]:
return True
else:
return False
_is_end_episode()関数の処理です。
ゴールしてるかの判定を行います。
ダミーゴールでもゴールしたものとみなします。
def _compute_reward(x, y):
global map_field
global field_type
if map_field[y][x] == filed_type["N"]:
return 0
elif map_field[y][x] == filed_type["G"]:
return 100
elif map_field[y][x] == filed_type["D"]:
return -100
_compute_reward()関数の処理です。
指定したマス目の報酬値を返します。
通常なら0、ゴールなら100、ダミーゴールなら-100を返します。
def _is_wall(x, y):
global map_field
global field_type
if map_field[y][x] == filed_type["W"]:
return True
else:
return False
_is_wall()関数の処理です。
指定したマス目が壁かどうかの判定を行います。
def observe_func(next_state, reward=None):
global q_values
global actions
global previous_state
global state
next_state = str(next_state)
if next_state not in q_values:
q_values[next_state] = np.repeat(0.0, len(actions))
previous_state = copy.deepcopy(state)
state = next_state
if reward is not None:
learn(reward)
observe_func()関数の処理です。
次に進む位置のq_value配列の要素が存在しなければ要素を追加して、エージェントの現在座標、過去座標の更新を行い、報酬値(reward)がNoneじゃなければQ学習によるQ-valueの更新を行います。
def learn(reward):
global q_values
global previous_state
global previous_action
global state
global alpha
global gamma
q = q_values[previous_state][previous_action] # Q(s, a)
max_q = max(q_values[state]) # max Q(s')
# Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a))
q_values[previous_state][previous_action] = q + (alpha * (reward + (gamma * max_q) - q))
learn()関数の処理です。
公式に従ってQ-valueの更新を行います。
Q-valueの更新の公式は以下です。
def reset_state_func():
global agent_pos
global start_pos
agent_pos = start_pos
return start_pos
reset_state_func()関数の処理です。
エージェントの位置を初期位置に戻して返します。
if __name__ == '__main__':
main()
main()関数を実行します。
以上がQ-learningにおける迷路探索の強化学習アルゴリズムの説明になります。