PPOの概要と実装例
PPOとは?(概要)
Q学習との違い:
Q学習とPPOは、強化学習における異なるアプローチを提供します。Q学習は値ベースの方法であり、シンプルさと理論的な収束性が強みですが、スケールが大きくなると課題が生じます。一方、PPOはポリシー最適化の手法として高次元かつ連続的な問題に強く、学習の安定性も高いですが、実装の複雑性やサンプル効率の面で課題があります。
No | 比較項目 | Q学習 | PPO |
---|---|---|---|
1 | アプローチ |
値ベース (Value-Based) |
ポリシー最適化 (Policy Optimization) |
2 | 学習対象 | 状態-行動価値関数(Q関数) | ポリシー & 状態価値関数 |
3 | 更新方法 |
オフポリシー (ベルマン方程式によるQ値の更新) |
オンポリシー (ポリシー勾配法でポリシーを直接最適化) |
4 | 探索戦略 |
明示的 (ε-グリーディー法など探索方法を明示) |
暗黙的、ポリシーに内在 (確率的な行動選択による自然な探索) |
5 | サンプル効率 |
高い (経験再利用/経験リプレイが可能) |
低い (データ再利用が限定的) |
6 | 学習安定性 | 不安定になりやすい | クリッピングにより高い安定性 |
7 | 関数近似 | Qテーブル または DNN(DQN) | DNN |
8 | 適用範囲 |
離散行動空間 (関数近似により連続的な行動空間に対応可) |
離散・連続行動空間の両方 |
9 | 実装の複雑さ |
シンプル (Qテーブル学習は解釈性が高い) |
複雑 (ポリシーNW、バリューNW、クリッピングを実装要) |
10 | ポリシーの性質 |
決定論的 (探索戦略に基づき最適アクションを選択) |
確率的 (アクション選択が確率分布に基づく) |
NO.3 更新方法の違い
-
Q学習
-
ベルマン方程式を用いてQ値を逐次更新します。
[
Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right]
] - ここで、(\alpha)は学習率、(\gamma)は割引率、(r)は報酬、(s')は次の状態 です。
-
ベルマン方程式を用いてQ値を逐次更新します。
-
PPO
- ポリシーの更新において、クリッピング手法を用いて古いポリシーから大きく逸脱しないような制約を加えます。
-
目的関数は以下のように定義されます。
[
L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \hat{A}_t \right) \right]
] - ここで、(r_t(\theta))はポリシー確率比、(\hat{A}_t)はアドバンテージ、(\epsilon)はクリッピング係数 です。
No.4 探索戦略の違い
項目 | Q学習 | PPO |
---|---|---|
探索戦略 | 貪欲法 | サンプリング |
選択方法 | 確率が最大の行動を決定的に選択 | 確率分布に基づいてランダムに行動を選択 |
探索の度合い | 低い (常に同じ行動を選択) |
高い (多様な行動を試す可能性がある) |
実装の複雑さ | 非常に簡単 (最大値の選択) |
比較的簡単 (確率分布からのサンプリング) |
メリット | 短期的に高い報酬を得やすい | 環境の広範な探索が可能、局所的な最適解に陥りにくい |
デメリット | 新しい行動の発見が困難、局所的な最適解に陥りやすい | 学習が不安定になる可能性、最適行動が選ばれない場合がある |
例 | (上, 下, 左, 右)の確率が(0.0, 0.3, 0.6, 0.1)のとき、確率の最大値である0.6(左に移動)が選択される | (上, 下, 左, 右)の確率が(0.0, 0.3, 0.6, 0.1)のとき、左に移動が選択される確率が最も高いが、下(0.3)や右(0.1)が選択させる可能性も残されている。ただし、確率が0である上は選択されない。 |
PPOの特徴
- クリッピング目的関数: PPOでは、方策の更新時に大幅な変化を防ぐため、クリッピングを用いて更新幅を制限します。具体的には、現在の方策と以前の方策の確率比を計算し、この比率が一定の範囲(通常は1±0.2)を超えないようにします。これにより、方策が急激に変化することを防ぎ、学習の安定性を向上させます。
- ミニバッチ更新: PPOは、収集したデータを用いてミニバッチ更新を行います。これにより、データの再利用が可能となり、サンプル効率が向上します。
- 適応型KLペナルティ: 方策の更新に際して、Kullback-Leibler(KL)ダイバージェンスをペナルティ項として追加し、方策の大幅な変更を抑制します。このペナルティ係数は適応的に調整され、学習の安定性を向上させます。
使い分け、ユースケース:
選択の指針としては、問題の特性や必要な性能、利用可能なリソースに応じて、適切なアルゴリズムを選択することが重要です。例えば、離散的で小規模な環境ではQ学習が適している場合が多く、大規模で連続的な環境ではPPOなどのポリシー最適化手法が有効です。
PPOの実装例
問題設定
今回は、GridWorld(格子世界)を対象とする。GridWorldは、エージェントがグリッド上を移動し、特定のゴール地点に到達することを目指すシンプルな環境です。本稿では5x5のグリッドを使用し、エージェントは左上の位置から右下のゴールを目指します。この環境は、強化学習の基本を学ぶ上で理想的なモデルであり、エージェントが取ることのできる行動(上、下、左、右)と、それに伴う報酬がシンプルに設定されています。
グリッドの定義
以下は使用する5x5のグリッドの具体例です。
0 | 1 | 2 | 3 | 4 | |
---|---|---|---|---|---|
0 | 開始(0,0) | (0,1) | (0,2) | (0,3) | (0,4) |
1 | (1,0) | (1,1) | (1,2) | (1,3) | (1,4) |
2 | (2,0) | (2,1) | (2,2) | (2,3) | (2,4) |
3 | (3,0) | (3,1) | (3,2) | (3,3) | (3,4) |
4 | (4,0) | (4,1) | (4,2) | (4,3) | ゴール(4,4) |
このグリッドでは、左上の位置 (0, 0) から右下の位置 (4, 4) がゴールです。各セルが異なる状態を表し、エージェントはこれらの状態を移動しながらゴールを目指します。
状態(State)、インデックス
エージェントの位置を「状態」として定義します。この状態は、グリッド上の各位置に対応し、それぞれの状態はインデックス(index)として表現されます。
0 | 1 | 2 | 3 | 4 | |
---|---|---|---|---|---|
0 | 0 | 1 | 2 | 3 | 4 |
1 | 5 | 6 | 7 | 8 | 9 |
2 | 10 | 11 | 12 | 13 | 14 |
3 | 15 | 16 | 17 | 18 | 19 |
4 | 20 | 21 | 22 | 23 | 24 |
このように、各グリッドの位置は一意のインデックスに対応しています。
行動(Action)
エージェントは、グリッド上で上下左右に移動することが可能です。
- 上(UP)
- 下(DOWN)
- 左(LEFT)
- 右(RIGHT)
これらの行動を通じて、エージェントはゴールを目指します。行動の選択はエージェントのポリシーに依存しており、PPOではサンプリングという確率分布に基づくランダム性のある探索戦略が採用されています。
ゴール(Goal)
エージェントの目標地点は右下のマス (4, 4) です。この位置に到達すると、エピソードが終了し、報酬が与えられます。このゴール報酬は、エージェントにとって非常に重要なフィードバックであり、最適な経路を学習するための指針となります。エージェントは、この報酬を最大化するように行動ポリシーを改善します。
実装
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Tuple, List
import random
# グリッドワールド環境の定義
class GridWorld:
def __init__(self,
size: int = 5 # グリッドワールドのサイズ。一辺の長さ。
) -> None:
"""
5x5のグリッドワールドを初期化します。
"""
self.size = size
self.start_pos = (0, 0)
self.goal_pos = (size - 1, size - 1)
self.reset()
def reset(self) -> Tuple[int, int]: #エージェントの初期位置を返す
"""
環境をリセットし、エージェントの位置を開始地点に戻します。
"""
self.agent_pos = self.start_pos # エージェントの位置を開始地点に設定
return self.agent_pos
def step(self,
action: int # 移動アクション(0: 上, 1: 右, 2: 下, 3: 左)
) -> Tuple[Tuple[int, int], float, bool]: # 次の位置, 報酬, ゴール到達フラグ
"""
エージェントを指定されたアクションに従って移動させます。
"""
x, y = self.agent_pos # 現在の位置を取得
if action == 0 and x > 0: # 上に移動
x -= 1
elif action == 1 and y < self.size - 1: # 右に移動
y += 1
elif action == 2 and x < self.size - 1: # 下に移動
x += 1
elif action == 3 and y > 0: # 左に移動
y -= 1
# 移動後の位置を更新
self.agent_pos = (x, y)
# ゴールに到達したかどうかを判定
done = self.agent_pos == self.goal_pos
# 報酬の設定:ゴールに到達したら1、それ以外は0、と今回は簡易的にスパース報酬を設定する。(欠点:学習が不安定にある恐れがある)
# より滑らかな報酬設計が必要な場合は、例えばゴールに近づくほど報酬が増加するように設計する。(効果:学習が安定する)
reward = 1.0 if done else 0.0
return self.agent_pos, reward, done
def get_state(self) -> np.ndarray: # 現在のエージェントの位置をワンホットベクトル状態として返す
"""
現在のエージェントの位置を状態として返します。
"""
state = np.zeros((self.size, self.size)) # 5x5のゼロ行列を作成
state[self.agent_pos] = 1 # エージェントの位置を1に設定
return state.flatten() # 5x5のグリッドワールドを1次元のベクトルに変換して返す
# ニューラルネットワークの定義
class PolicyNetwork(nn.Module):
def __init__(self,
state_size: int, # 状態の次元数. 例: 5x5で25
action_size: int # アクションの数. 例: 上, 右, 下, 左の4つ
) -> None:
"""
ポリシーネットワークを初期化します。
"""
super(PolicyNetwork, self).__init__() # 親クラスの初期化
self.fc1 = nn.Linear(state_size, 128) # 入力層
self.fc2 = nn.Linear(128, 128) # 隠れ層
self.action_head = nn.Linear(128, action_size) # アクションの確率分布を出力する層
self.value_head = nn.Linear(128, 1) # 状態価値を出力する層
def forward(self,
x: torch.Tensor # 入力状態. 例: 5x5のグリッドワールドの状態. 例: [0, 0, 1, 0, 0, ...]
) -> Tuple[torch.Tensor, torch.Tensor]: # アクションの確率分布, 状態価値. 例: ([0.1, 0.2, 0.3, 0.4], 0.5)
"""
フォワードパスを実行します。
"""
x = torch.relu(self.fc1(x)) # 入力層から隠れ層への活性化関数
x = torch.relu(self.fc2(x)) # 隠れ層から隠れ層への活性化関数
action_probs = torch.softmax(self.action_head(x), dim=-1) # アクションの確率分布を計算
state_values = self.value_head(x) # 状態価値を計算
return action_probs, state_values
# PPOエージェントの定義
class PPOAgent:
def __init__(self,
state_size: int, # 状態の次元数
action_size: int, # アクションの数
lr: float = 1e-3, # 学習率
gamma: float = 0.99, # 割引率
eps_clip: float = 0.2, # クリッピング係数
K_epochs: int = 4 # ポリシーの更新回数
) -> None:
"""
PPOエージェントを初期化します。
"""
self.gamma = gamma # 割引率
self.eps_clip = eps_clip # クリッピング係数
self.K_epochs = K_epochs # ポリシーの更新回数
self.policy = PolicyNetwork(state_size, action_size) # ポリシーネットワークの初期化
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)# オプティマイザの初期化
self.policy_old = PolicyNetwork(state_size, action_size) # 古いポリシーネットワークの初期化
self.policy_old.load_state_dict(self.policy.state_dict()) # 古いポリシーネットワークのパラメータをコピー
self.MseLoss = nn.MSELoss() # 平均二乗誤差
def select_action(self,
state: np.ndarray # 現在の状態
) -> Tuple[int, torch.Tensor, torch.Tensor]: # 選択したアクション, アクションの確率, 状態価値
"""
現在のポリシーに基づいてアクションを選択します。
"""
state = torch.FloatTensor(state) # 状態をテンソルに変換. 例: [0, 0, 1, 0, 0, ...]
with torch.no_grad(): # 勾配を計算しない
action_probs, state_value = self.policy_old(state) # 古いポリシーでアクションの確率と状態価値を取得
action_dist = torch.distributions.Categorical(action_probs) # カテゴリカル分布を作成. 例: [0.1, 0.2, 0.3, 0.4]
action = action_dist.sample() # アクションをサンプリング. 例: 2
return action.item(), action_probs[action.item()], state_value # 例: 2, [0.1, 0.2, 0.3, 0.4], 0.5
def update(self,
memory: List[Tuple[np.ndarray, int, float, np.ndarray, bool]] # エピソード中の経験のリスト
) -> None:
"""
PPOのアップデートステップを実行します。
"""
# 状態、アクション、報酬、次状態、終了フラグを分解
states = torch.FloatTensor([m[0] for m in memory]) # 状態. 例: [[0, 0, 1, 0, 0, ...], [0, 0, 1, 0, 0, ...], ...]
actions = torch.LongTensor([m[1] for m in memory]).unsqueeze(1) # アクション. 例: [2, 1, ...]
rewards = [m[2] for m in memory] # 報酬. 例: [0.0, 0.0, ...]
next_states = torch.FloatTensor([m[3] for m in memory]) # 次状態. 例: [[0, 0, 1, 0, 0, ...], [0, 0, 1, 0, 0, ...], ...]
dones = torch.FloatTensor([m[4] for m in memory]) # 終了フラグ. 例: [False, True, ...]
# 割引累積報酬の計算
discounted_rewards = [] # 割引累積報酬
R = 0 # 累積報酬
for reward, done in zip(reversed(rewards), reversed(dones)): # エピソードの最後から計算
if done: # エピソードが終了したら累積報酬をリセット
R = 0
R = reward + self.gamma * R # 割引累積報酬の計算
discounted_rewards.insert(0, R) # 割引累積報酬を挿入
discounted_rewards = torch.FloatTensor(discounted_rewards).unsqueeze(1) # 割引累積報酬. 例: [[0.0], [0.0], ...]
# 状態価値の計算
_, state_values = self.policy(states) # 例: [0.5, 0.6, ...]
advantages = discounted_rewards - state_values.detach() # アドバンテージ. 例: [[0.0], [0.0], ...]
# 古いポリシーでアクションの確率を取得
action_probs, _ = self.policy_old(states) # 例: [[0.1, 0.2, 0.3, 0.4], [0.2, 0.3, 0.4, 0.1], ...]
action_dist = torch.distributions.Categorical(action_probs) # カテゴリカル分布を作成. 例: [0.1, 0.2, 0.3, 0.4]
old_log_probs = action_dist.log_prob(actions.squeeze()).detach() # 古いポリシーでのアクションの対数確率. 例: [0.2, 0.3, ...]
# PPOの更新
for _ in range(self.K_epochs): # ポリシーの更新回数分繰り返す
# 新しいポリシーでアクションの確率を取得
new_action_probs, new_state_values = self.policy(states) # 例: [[0.1, 0.2, 0.3, 0.4], [0.2, 0.3, 0.4, 0.1], ...]
new_dist = torch.distributions.Categorical(new_action_probs) # カテゴリカル分布を作成. 例: [0.1, 0.2, 0.3, 0.4]
new_log_probs = new_dist.log_prob(actions.squeeze()) # 新しいポリシーでのアクションの対数確率. 例: [0.1, 0.2, ...]
# クリップされた比率の計算
ratios = torch.exp(new_log_probs - old_log_probs) # 比率の計算. 例: [1.0, 1.1, ...]
surr1 = ratios * advantages # クリップされた比率1. 例: [[0.0], [0.0], ...]
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages # クリップされた比率2. 例: [[0.0], [0.0], ...]
# 損失の計算
loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(new_state_values, discounted_rewards) # 損失の計算. 例: [[0.0], [0.0], ...]
# 勾配の更新
self.optimizer.zero_grad() # 勾配をリセット. これがないと勾配が加算される
loss.mean().backward() # 損失を逆伝播.
self.optimizer.step() # パラメータを更新.
# 古いポリシーを新しいポリシーで更新
self.policy_old.load_state_dict(self.policy.state_dict()) # パラメータをコピー. これがないと古いポリシーが更新される
def main() -> None:
"""
メイン関数。環境とエージェントを初期化し、訓練を実行します。
"""
env = GridWorld(size=5) # グリッドワールド環境の初期化. 5x5のグリッドワールド
state_size = env.size * env.size # 状態の次元数. 例: 5x5で25
action_size = 4 # 上, 右, 下, 左 の4つ
agent = PPOAgent(state_size, action_size) # PPOエージェントの初期化
max_episodes = 1000 # エピソード数
max_steps = 100 # 最大ステップ数. ゴールに到達できない場合は強制終了.
log_interval = 100 # ログ表示のインターバル. 例: 100エピソードごとに進捗を表示.
for episode in range(1, max_episodes + 1): # エピソードの繰り返し
state = env.reset() # 環境をリセット. エージェントの初期位置を取得
memory: List[Tuple[np.ndarray, int, float, np.ndarray, bool]] = [] # 経験のリスト. 例: [([0, 0, 1, 0, 0, ...], 2, 0.0, [0, 0, 1, 0, 0, ...], False), ...]
for _ in range(max_steps): # ステップ数の繰り返し
action, action_prob, state_value = agent.select_action(env.get_state()) # アクションを選択. 例: 2, [0.1, 0.2, 0.3, 0.4], 0.5
next_state, reward, done = env.step(action) # 環境を進める. 例: [0, 0, 1, 0, 0, ...], 0.0, False
memory.append((env.get_state(), action, reward, next_state, done)) # 経験を追加. 例: [([0, 0, 1, 0, 0, ...], 2, 0.0, [0, 0, 1, 0, 0, ...], False), ...]
if done: # ゴールに到達したら終了
break
state = next_state # 状態を更新. 例: [0, 0, 1, 0, 0, ...]
# PPOの更新
agent.update(memory)
# 定期的に進捗を表示
if episode % log_interval == 0:
print(f"Episode {episode} completed.")
# 訓練後のエージェントの動作確認
state = env.reset()
env_steps = 0
print("訓練後のエージェントの動作:")
while True: # ゴールに到達するか、最大ステップ数に達するまで繰り返す
action, _, _ = agent.select_action(env.get_state()) # アクションを選択
next_state, reward, done = env.step(action) # 環境を進める
print(f"Step {env_steps}: Moved to {next_state}") # 状態を表示
env_steps += 1 # ステップ数を更新
if done or env_steps >= max_steps: # ゴールに到達したら終了
break
if done:
print("ゴールに到達しました!")
else:
print("ゴールに到達できませんでした。")
if __name__ == "__main__":
main()
評価
- 1回目:
- 学習+実行時間: 1分40秒
- 訓練後のエージェントの動作:
Step 0: Moved to (0, 1)
Step 1: Moved to (0, 2)
Step 2: Moved to (1, 2)
Step 3: Moved to (2, 2)
Step 4: Moved to (3, 2)
Step 5: Moved to (3, 3)
Step 6: Moved to (4, 3)
Step 7: Moved to (4, 4)
ゴールに到達しました! - 右に行ったり、下に行ったり、ジグザグとゴールに移動する方策が採用されている。
- 2回目:
- 学習+実行時間: 1分43秒
- 訓練後のエージェントの動作:
Step 0: Moved to (1, 0)
Step 1: Moved to (2, 0)
Step 2: Moved to (3, 0)
Step 3: Moved to (4, 0)
Step 4: Moved to (4, 0)
Step 5: Moved to (4, 1)
Step 6: Moved to (4, 2)
Step 7: Moved to (4, 3)
Step 8: Moved to (4, 4)
ゴールに到達しました! - 最初はずっと下に移動して、その後はずっと右に移動する方策が採用されている。
- 3回目:
- 学習+実行時間: 2分4秒
- 訓練後のエージェントの動作:
Step 0: Moved to (1, 0)
Step 1: Moved to (2, 0)
Step 2: Moved to (3, 0)
Step 3: Moved to (3, 1)
Step 4: Moved to (3, 2)
Step 5: Moved to (4, 2)
Step 6: Moved to (4, 3)
Step 7: Moved to (4, 4)
ゴールに到達しました! - 1回目と2回目の中間のような方策が採用された。
PPOの基本構成
以下の表は、PPOの基本構成要素と、具体的なソースコードにおけるクラスやメソッドとの対応関係を整理したものです。
構成要素 | 役割 | 対応するクラス・メソッド |
---|---|---|
1. 環境 | エージェントが動作する仮想空間を定義。状態のリセット、アクションの実行、次の状態や報酬の取得など、基本メソッドを提供。 例:OpenAI Gym環境やカスタム環境(GridWorldなど)。 |
クラス名: GridWorld 主なメソッド:
|
2. ポリシーNW | エージェントの行動方針を決定するニューラルNW。入力として現在の状態を受け取り、各アクションの確率分布(アクション選択)および状態価値を出力。 |
クラス名: PolicyNetwork 主なメソッド:
|
3. PPO エージェント |
PPOアルゴリズムの実装を担当。アクションの選択、経験の収集、ポリシーおよびバリュー関数の更新などの主要な機能を持つ。 |
クラス名: PPOAgent 主なメソッド:
|
4. メモリ | エージェントが経験した状態、アクション、報酬、次状態、終了フラグなどを一時的に保存するデータ構造。PPOではバッチごとにこれらの経験を利用してポリシーを更新。 |
データ構造: List[Tuple[np.ndarray, int, float, np.ndarray, bool]] (リスト内のタプル形式) |
5. 訓練ループ | 環境とエージェントを初期化し、複数のエピソードを通じてエージェントを訓練。エージェントが環境と相互作用し、経験をメモリに保存し、一定期間ごとにポリシーの更新を行う。 |
関数名: main() 主な処理内容:
|
6. ハイパラ設定 | 学習率、割引率(γ)、クリッピング係数(ε)、エポック数など、PPOの学習に影響を与えるパラメータを設定。これらのパラメータは学習の効率や安定性に大きく影響する。 |
パラメータ設定箇所:
|
メモリについて
メモリは、エージェントが環境と相互作用するなかで得られた経験(エピソード)を記録する。具体的には、以下のように、(1)状態、(2)行動、(3)報酬、(4)次の状態、(5)終了フラグ、という5つの要素が記憶されている。
memory: List[Tuple[np.ndarray, int, float, np.ndarray, bool]] = []
要素 | 説明 | データ型 | 具体例 |
---|---|---|---|
(1)状態 | 現在の環境の状態を表すベクトル。 グリッドワールドの場合、one-hotエンコーディングされた位置情報など。 |
np.ndarray |
(0, 0)の位置にいるとき、 状態ベクトルは [1, 0, 0, ..., 0] |
(2)行動 | エージェントが選択したアクション。 | int |
0: 上, 1: 右, 2: 下, 3: 左 などの整数値。 |
(3)報酬 | エージェントがその行動を取った結果得た報酬。 | float |
ゴールに到達した場合は1.0、 それ以外は-0.01など。 |
(4)次の状態 | アクションを取った後の新しい環境の状態。 現在の状態と同様にone-hotエンコーディングされた位置情報など。 |
np.ndarray | 例: 位置=(0,0)から行動=1(右に移動)を実行して、 **位置=(0, 1)**に移動したとき、 状態ベクトルは [0, 1, 0, ..., 0] |
(5)終了フラグ | エピソードが終了したかどうかを示すフラグ。 | bool | ゴールに到達した場合はTrue、 それ以外はFalse。 |