強化学習の勉強: 倒立振子問題(Q学習)
近年、注目されている深層強化学習をロボットにも応用したいと考えており、そのためにまずは強化学習について理解を深め、深層強化学習を学ぶうえで必要なことを身に着けることが目的。
強化学習アルゴリズム:Q学習
- エージェントなるものが学習を繰り返す中で,有利になる方向へ学習していく
- 報酬を与えられると同じように行動を続ける
- 罰を与えられるとその行動を避ける
- どのように報酬を与えるかについては扱う課題に応じて設計する
倒立振子(CartPole)の問題設定
- カート位置(-2.4 ~ 2.4)
- カート速度(-inf ~ inf)
- 棒の角度(-41.8 ~ 41.8)度
- 棒の角速度(-inf ~ inf)
これら連続値を離散化してデジタル化する!
(今回は6つのデジタル値に変換)
-
カート位置(-2.4 ~ 2.4)
→ -inf~-1.6: 0, -1.6~-0.8: 1, -0.8~-0.0: 2, 0.0~0.8: 3, 0.8~1.6: 4, 1.6~inf: 5
同様にして他の要素についても6つの分割により離散化する
すると変数が4種類あるため,$6^4=1296$となり,CarPoleの状態は1296種類のデジタル値で表現される.
$state = s_{cart:pos}\times6^0 + s_{cart:vel}\times6^1 + s_{pole:angle}\times6^2 + s_{pole:vel}\times6^3$
例:(カート位置,カート速度,棒の角度,棒の角速度)=(1, 2, 3, 4)
$state = 1\times6^0 + 2\times6^1 + 3\times6^2 + 4\times6^3$
方針
以下のように動く本体(Agent)とその脳(Brain)、そして環境(Environment)に分けて構築することを学ぶ。
実装
CartPoleをただ動かしてみるだけ
何も学習しないため、ただ適当に動いて倒れる。"""
迷路探索問題で強化学習を学ぶ
"""
import numpy as np
import gym
class CartPole():
def __init__(self) -> None:
self.env = gym.make('CartPole-v1')
self.observation = self.env.reset()
def run(self, steps):
for step in range(steps):
action = np.random.choice(2)
observation, reward, done, info = self.env.step(action)
if done:
self.env.reset()
return [observation, reward, done, info]
def run_with_frame(self, steps):
frames= []
for step in range(steps):
frames.append(self.env.render(mode='rgb_array'))
action = np.random.choice(2)
observation, reward, done, info = self.env.step(action)
if done: # ここ2行をコメントアウトすると、
self.env.reset() # ただ適当に動かしている一連の様子を記録できる
return frames, [observation, reward, done, info]
if __name__ == '__main__':
cartpole = CartPole()
cartpole.run(200)
frame, _ = cartpole.run_with_frame(200)
記録用として、frameを格納するメソッドもつけてみた。
GIFによる動作の記録
"""
倒立振子問題で強化学習を学ぶ
"""
import matplotlib.pyplot as plt
from matplotlib import animation as ani
from os.path import dirname, abspath
### 動いている様子を可視化 #############
class GIF():
def __init__(self, frames) -> None:
self.frames = frames
plt.figure(figsize=(self.frames[0].shape[1]/72.0, self.frames[0].shape[0]/72.0), dpi=72)
self.patch = plt.imshow(self.frames[0])
def animate(self, i):
"""フレームごとの描画内容"""
patch = self.patch.set_data(self.frames[i])
return (patch,)
def create(self, file_name="maze_random.gif"):
anim = ani.FuncAnimation(plt.gcf(), self.animate, frames=len(self.frames), interval=50, repeat=True)
save_path = dirname(abspath(__file__))
anim.save(f"{save_path}/{file_name}")
if __name__ == '__main__':
from cartpole import CartPole
# 迷路の作成
cartpole = CartPole()
frames, _ = cartpole.run_with_frame(200) # ゴールするまで1つの方策でランダム動き回る
# 記録
gif = GIF(frames)
gif.create(file_name="CartPole_random.gif")
動いている様子
この棒を倒れないように台車を制御することを学ばせる。
Q学習とその結果の記録
Agent
### Agentの実装 ####################
class Agent():
"""CartPoleのエージェントクラス。棒付き台車そのもの。"""
def __init__(self, num_states, num_actions) -> None:
# エージェントが行動決定するための頭脳を生成
self.brain = Brain(num_states, num_actions)
def update_Q_function(self, observation, action, reward, observation_next):
"""Q関数の更新"""
self.brain.update_Q_table(observation, action, reward, observation_next)
# 1 step移動後の状態sを求める
def get_action(self, observation, step):
"""行動の決定"""
action = self.brain.decide_action(observation, step)
return action
Brain
### Brain(Agentの脳)の実装 ####################
class Brain():
def __init__(self, num_states, num_actions) -> None:
self.num_actions = num_actions
# Qテーブルの作成。行は状態、列は行動
self.q_table = np.random.uniform(low=0, high=1, size=(NUM_DIGITIZED**num_states, num_actions)) # 0~1の間で乱数をとり、サイズ(行, 列)の行列を生成
def bins(self, clip_min, clip_max, num):
"""観測した状態(連続値)を離散値にデジタル変換する閾値を求める"""
return np.linspace(clip_min, clip_max, num+1)[1:-1]
def digitize_state(self, observation):
"""観測したobervation状態を、離散値に変換する"""
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIGITIZED)),
np.digitize(cart_v, bins=self.bins(-3.0, 2.4, NUM_DIGITIZED)),
np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIGITIZED)),
np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIGITIZED)),
]
d_observation = sum([x * (NUM_DIGITIZED**i) for i, x in enumerate(digitized)]) # 6進数 to 10進数
return d_observation
# Q学習による行動価値関数Qの更新
def update_Q_table(self, observation, action, reward, observation_next):
"""Qテーブル(価値関数)をQ学習により更新"""
state = self.digitize_state(observation) # 状態をデジタル表記で数値化
state_next = self.digitize_state(observation_next) # 状態をデジタル表記で数値化
Max_Q_next = max(self.q_table[state_next][:]) # 次の状態における行動価値の大きい方を取得
# Q学習の更新式
self.q_table[state, action] = self.q_table[state, action] + ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])
# 行動の決定
def decide_action(self, observation, episode):
"""epsilon-greedy法で徐々に最適行動のみを採用する"""
state = self.digitize_state(observation) # 状態をデジタル表記で数値化
epsilon = 0.5 * (1 / (episode + 1)) # 試行を繰り返す中で判断係数を小さくしていく
# epsilonが小さくなるほど、価値ベースで行動を決定するようになる。
# つまり、試行を繰り返すにつれて価値ベースとなる。
if epsilon <= np.random.uniform(0, 1):
action = np.argmax(self.q_table[state][:])
else:
action = np.random.choice(self.num_actions)
return action
Environment
### Environment(環境:全体)の実装 ####################
class Environment():
def __init__(self) -> None:
self.env = gym.make(ENV)
num_states = self.env.observation_space.shape[0] # 課題の状態数を取得(今回は4)
num_actions = self.env.action_space.n # 課題の行動数を取得(今回は2)
self.agent = Agent(num_states, num_actions) # Agent生成
def run(self):
"""実行"""
# 初期値
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
is_episode_final = False # 最終試行フラグ
frames = [] # 記録用配列
for episode in range(NUM_EPISODES):
observation = self.env.reset()
for step in range(MAX_STEPS):
if is_episode_final:
frames.append(self.env.render(mode='rgb_array'))
# 行動を求める
action = self.agent.get_action(observation, step)
# 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
observation_next, _, done, _ = self.env.step(action)
# 報酬を与える
if done: # step数が200を超えるか、一定角度以上傾くとdoneはTrueになる
if step < 195:
reward = -1
complete_episodes = 0
else:
reward = 1
complete_episodes += 1
else:
reward = 0
# step+1の状態のbservation_nextを用いて、Q関数を更新する
self.agent.update_Q_function(observation, action, reward, observation_next)
# 観測の更新
observation = observation_next
# 終了時の処理
if done:
print(f"{episode} Episode: Finished after {step+1} time steps")
break
if is_episode_final:
from cartpole_rl_gif import GIF
# 記録
gif = GIF(frames)
gif.create(file_name="CartPole_Q.gif")
break
if complete_episodes >= 10:
print('10回連続成功')
is_episode_final = True
すべてを組み合わせて
"""
倒立振子問題で強化学習を学ぶ
"""
import numpy as np
import gym
# 定数の設定
ENV = 'CartPole-v0' # 使用する課題名
NUM_DIGITIZED = 6 # 各状態の離散値への分割数
GAMMA = 0.99 # 時間割引率
ETA = 0.5 # 学習係数
MAX_STEPS = 200 # 1試行のstep数
NUM_EPISODES = 1000 # 最大試行数
### Agentの実装 ####################
class Agent():
"""CartPoleのエージェントクラス。棒付き台車そのもの。"""
def __init__(self, num_states, num_actions) -> None:
# エージェントが行動決定するための頭脳を生成
self.brain = Brain(num_states, num_actions)
def update_Q_function(self, observation, action, reward, observation_next):
"""Q関数の更新"""
self.brain.update_Q_table(observation, action, reward, observation_next)
# 1 step移動後の状態sを求める
def get_action(self, observation, step):
"""行動の決定"""
action = self.brain.decide_action(observation, step)
return action
### Brain(Agentの脳)の実装 ####################
class Brain():
def __init__(self, num_states, num_actions) -> None:
self.num_actions = num_actions
# Qテーブルの作成。行は状態、列は行動
self.q_table = np.random.uniform(low=0, high=1, size=(NUM_DIGITIZED**num_states, num_actions)) # 0~1の間で乱数をとり、サイズ(行, 列)の行列を生成
def bins(self, clip_min, clip_max, num):
"""観測した状態(連続値)を離散値にデジタル変換する閾値を求める"""
return np.linspace(clip_min, clip_max, num+1)[1:-1]
def digitize_state(self, observation):
"""観測したobervation状態を、離散値に変換する"""
cart_pos, cart_v, pole_angle, pole_v = observation
digitized = [
np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIGITIZED)),
np.digitize(cart_v, bins=self.bins(-3.0, 2.4, NUM_DIGITIZED)),
np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIGITIZED)),
np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIGITIZED)),
]
d_observation = sum([x * (NUM_DIGITIZED**i) for i, x in enumerate(digitized)]) # 6進数 to 10進数
return d_observation
# Q学習による行動価値関数Qの更新
def update_Q_table(self, observation, action, reward, observation_next):
"""Qテーブル(価値関数)をQ学習により更新"""
state = self.digitize_state(observation) # 状態をデジタル表記で数値化
state_next = self.digitize_state(observation_next) # 状態をデジタル表記で数値化
Max_Q_next = max(self.q_table[state_next][:]) # 次の状態における行動価値の大きい方を取得
# Q学習の更新式
self.q_table[state, action] = self.q_table[state, action] + ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])
# 行動の決定
def decide_action(self, observation, episode):
"""epsilon-greedy法で徐々に最適行動のみを採用する"""
state = self.digitize_state(observation) # 状態をデジタル表記で数値化
epsilon = 0.5 * (1 / (episode + 1)) # 試行を繰り返す中で判断係数を小さくしていく
# epsilonが小さくなるほど、価値ベースで行動を決定するようになる。
# つまり、試行を繰り返すにつれて価値ベースとなる。
if epsilon <= np.random.uniform(0, 1):
action = np.argmax(self.q_table[state][:])
else:
action = np.random.choice(self.num_actions)
return action
### Environment(環境:全体)の実装 ####################
class Environment():
def __init__(self) -> None:
self.env = gym.make(ENV)
num_states = self.env.observation_space.shape[0] # 課題の状態数を取得(今回は4)
num_actions = self.env.action_space.n # 課題の行動数を取得(今回は2)
self.agent = Agent(num_states, num_actions) # Agent生成
def run(self):
"""実行"""
# 初期値
complete_episodes = 0 # 195step以上連続で立ち続けた試行数
is_episode_final = False # 最終試行フラグ
frames = [] # 記録用配列
for episode in range(NUM_EPISODES):
observation = self.env.reset()
for step in range(MAX_STEPS):
if is_episode_final:
frames.append(self.env.render(mode='rgb_array'))
# 行動を求める
action = self.agent.get_action(observation, step)
# 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
observation_next, _, done, _ = self.env.step(action)
# 報酬を与える
if done: # step数が200を超えるか、一定角度以上傾くとdoneはTrueになる
if step < 195:
reward = -1
complete_episodes = 0
else:
reward = 1
complete_episodes += 1
else:
reward = 0
# step+1の状態のbservation_nextを用いて、Q関数を更新する
self.agent.update_Q_function(observation, action, reward, observation_next)
# 観測の更新
observation = observation_next
# 終了時の処理
if done:
print(f"{episode} Episode: Finished after {step+1} time steps")
break
if is_episode_final:
from cartpole_rl_gif import GIF
# 記録
gif = GIF(frames)
gif.create(file_name="CartPole_Q.gif")
break
if complete_episodes >= 10:
print('10回連続成功')
is_episode_final = True
if __name__ == '__main__':
carpole_env = Environment()
carpole_env.run()
結果
ここで結果を示し、考察していく。
まず、上記のgif画像から、棒が倒れないように制御できていることがわかる。
実際にこの移動を決めている価値関数Qについて見ていくこととする。
[[0.37493395 0.13411888]
[0.51963093 0.73912863]
[0.63219248 0.63012129]
...
[0.11309911 0.52237338]
[0.05054922 0.18484693]
[0.83869178 0.89865193]]
この行は初めに示したように1296行で倒立振子の状態を表している。そのある状態のときに右か左かどちらに動かすべきなのかということについて報酬が計算されている。例えば、倒立振子の状態が○○のときには、右へ動いた方がよいなどの情報が詰め込まれている。
したがって、状態さえ計算されてしまえば、あとは得た報酬に従って、台車を右へ左へと動かしているということがわかる。
感想
今回は、倒立振子の問題でもう一度Q学習について触れた。学習のアルゴリズムは変わっていないが、Agent・Brain・Environmentの3要素に分けることで、さらにプログラム的には見やすくなったのではないかと思う。また、デジタル化についての考えは、言われてみればなるほどとはなったが、正直そのような考えはなかった。速度や角度など4つの要素からなるものをうまくデジタル化して、それをスカラ値で状態を表現するというところが、驚きとともにこのような方法もあるのかと学ばされた。
次回はDQNに触れながら、いよいよ深層強化学習に入っていく。そのあたりで、ロボットに深層強化学習を適用することを考えている。
参考文献
「作りながら学ぶ深層強化学習 PyTorchによる実践プログラミング」
小川 雄太郎 著 マイナビ 出版
「現場で使える!Python 深層強化学習入門 強化学習と深層強化学習による探索と制御」
伊藤 多一、今津 善充、須藤 広大、仁ノ平 将人、川崎 悠介、酒井 裕企、魏 崇哲 著 翔泳社 出版