はじめに
最近、LLMでの強化学習が話題ですよね。たGRPOで大きく躍進したDeepSeek
X(旧Twitter)ではGensisによるPPOロボット制御が話題になり、時代は強化学習を求めている。PPO!PPO!
しかしPPOとは何か。解説記事を読んでもさっぱりわからない。だが時代の流れに乗ってみたい。
ということでPPOを実装してみようと思います。
制御対象
今回はタンクの水位を一定に保つ制御を行ってみようと思います。
蛇口を捻れば水が貯まる。貯まった水は常に排出している。なので一定の水の供給が必要。
そんな想定でいきます。
実装
ということで技術的な解説はスキップして実装してみます。
今回はシンプルにMlpPolicy=多層パーセプトロン(MLP)を選択しました。学習方法はPPOになります。
以下のプログラムは同じフォルダに配置してください。
タンク水位のシミュレータ
まずはタンクのシミュレータがなければ話になりません。
ということで作成しました。
1stepごとにsimulation()
を呼び出すことで、タンクの水位をシミュレーションします。
import numpy as np
import random
class Simulator:
def __init__(self):
""" シミュレーターの初期化 """
self.dt = 1 # タイムステップは1秒固定
self.water_level = 0.0 # 初期水位
self.Q_in_max = 0.02 # 操作量1.0に対応する最大流入量 (m/秒)
self.k_leak = 0.00001 # 漏洩係数 (1/秒)
self.k_outflow = 0.0005 # 放水係数 (1/秒)
self.min_water_level = 0.0 # 最小水位
self.max_water_level = 1.0 # 最大水位
def reset(self):
"""
シミュレーションの初期化
ランダムな初期水位とパラメータを設定
"""
self.water_level = random.uniform(0.0, 0.0) # 初期水位をランダムに設定
self.k_leak = random.uniform(0.001, 0.002) # 漏洩係数をランダムに設定
self.k_outflow = random.uniform(0.005, 0.015) # 放水係数をランダムに設定
self.Q_in_max = random.uniform(0.01, 0.02) # 最大流入量
return self.water_level
def simulation(self, action):
"""
シミュレーションを1ステップ実行
:param action: 操作量 (0.0 ~ 100.0)
:return: 更新後の水位
"""
# アクションを有効範囲内にクリップ
action = np.clip(action, 0.0, 1.0)
# 操作量に基づく流入量の計算
Q_in = action * self.Q_in_max # 流入量 (m/秒)
# 放水量の計算 (水位に依存)
Q_out = self.k_outflow * self.water_level # 放水量 (m/秒)
# 漏洩量の計算 (水位に依存)
Q_leak = self.k_leak * self.water_level # 漏洩量 (m/秒)
# 水位の更新
delta_water = (Q_in - Q_out - Q_leak) * self.dt
new_water_level = self.water_level + delta_water
# 水位を0と最大水位でクリップ
new_water_level = np.clip(new_water_level, self.min_water_level, self.max_water_level)
# 更新された水位を設定
self.water_level = new_water_level
return self.water_level
Gym環境の作成
現在はGymではなく、Gymnasiumが良いようです。
(実行するたびに警告がでます)
今回はGym環境で進めます。
報酬定義などもここで決定しています。
報酬は単純に目標水位と現在水位の差としました。
# water_level_env.py
import gym
from gym import spaces
import numpy as np
from simulator import Simulator
import random
class WaterLevelEnv(gym.Env):
""" カスタム環境:水位調整用のGym環境 """
def __init__(self, desired_level=None):
super(WaterLevelEnv, self).__init__()
self.sim = Simulator()
if desired_level is None:
self.desired_level_mode="random"
self.set_target_level() # 目標水位をランダムで取得
else:
self.desired_level_mode = "static"
self.desired_level = desired_level # 目標水位を設定
# アクション空間の定義:操作量は0.0~1.0
self.action_space = spaces.Box(low=np.array([0.0]), high=np.array([1.0]), dtype=np.float32)
# 観測空間の定義:水位および目標水位
self.observation_space = spaces.Box(low=np.array([self.sim.min_water_level, 0.0]),
high=np.array([self.sim.max_water_level, self.sim.max_water_level]),
dtype=np.float32)
self.state = None
self.dt = self.sim.dt # タイムステップは1秒固定
self.max_steps = 300 # 最大ステップ数
self.current_step = 0
self.last_action = None # 最後に実行されたアクションを初期化
def set_target_level(self):
# 目標水位をランダムに設定
self.desired_level = random.uniform(0.1, 0.9)
def reset(self):
""" 環境の初期化 """
self.state = self.sim.reset() # シミュレーターをリセットし、初期水位を設定
self.current_step = 0
if self.desired_level_mode == "random":
self.set_target_level()
self.last_action = None # リセット時にアクションを初期化
return np.array([self.state, self.desired_level], dtype=np.float32)
def step(self, action):
""" 環境のステップ実行 """
# アクションを有効範囲内にクリップ
action = np.clip(action, self.action_space.low, self.action_space.high)[0]
self.last_action = action # 最後に実行されたアクションを保存
# シミュレーションを実行して新しい水位を取得
water_level = self.sim.simulation(action)
self.state = water_level
# 報酬の定義:目標水位との差の絶対値の負
reward = -abs(water_level - self.desired_level)
# エピソード終了条件の定義
#今回は指定ステップまで実行で1エピソード終了
self.current_step += 1
done = self.current_step >= self.max_steps
# 追加情報(必要に応じて使用)
info = {"status":float(self.state)}
return np.array([self.state, self.desired_level], dtype=np.float32), reward, done, info
def render(self, mode='human'):
""" 環境のレンダリング """
if self.last_action is not None:
print(f"現在の水位: {self.state:.2f} m (目標: {self.desired_level} m) | 最後の操作量: {self.last_action:.2f}")
else:
print(f"現在の水位: {self.state:.2f} m (目標: {self.desired_level} m) | 最後の操作量: なし")
学習
続いて学習です。
こちらのpyを実行することで、学習が開始されます。
total_timesteps=100000
で学習回数を指定しています。10万回であれば数分で終了すると思います。
# train.py
from water_level_env import WaterLevelEnv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
def make_env():
env = WaterLevelEnv()
env.is_training = True # 環境に訓練モードを設定
return env
if __name__ == "__main__":
# DummyVecEnvによる環境ラッピング
env = DummyVecEnv([make_env])
# 観測値と報酬の正規化
env = VecNormalize(env, norm_obs=True, norm_reward=True)
# PPOエージェントの作成
model = PPO("MlpPolicy", env, verbose=1)
# エージェントの訓練
model.learn(total_timesteps=100000)
# モデルの保存(VecNormalizeの統計情報も一緒に保存すると、テスト時に同じ統計で正規化可能)
model.save("ppo_water_level")
env.save("vecnormalize_stats.pkl")
env.close()
テスト実行
いよいよPPO強化学習が実行できます。
from water_level_env import WaterLevelEnv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
import os
def make_test_env():
return WaterLevelEnv(desired_level=0.5)
if __name__ == "__main__":
# DummyVecEnvでテスト環境をラッピング
test_env = DummyVecEnv([make_test_env])
# VecNormalizeで観測・報酬の正規化
test_env = VecNormalize(test_env, training=False, norm_obs=True, norm_reward=True)
# 学習時に保存したVecNormalizeの統計情報を読み込む
if os.path.exists("vecnormalize_stats.pkl"):
test_env = VecNormalize.load("vecnormalize_stats.pkl", test_env)
# 訓練済みモデルの読み込み
model = PPO.load("ppo_water_level", env=test_env)
# テストの実行
obs = test_env.reset()
for _ in range(1000):
action, _states = model.predict(obs, deterministic=True)
obs, reward, done, info = test_env.step(action)
# VecEnvの場合、各環境に対してrender()を呼び出すためenv_method()を利用
test_env.env_method("render")
if done:
obs = test_env.reset()
結果
目標0.5mでテストを行いました。
黄色が目標値で、青が水位、赤が操作量です。
ほんと?と驚くほどドンピシャですね。
Tips
- action,水位は0~1.0で正規化した方が良さそうです
- test.pyの
deterministic=False
またはdeterministic=True
の指定で結構結果が変わりました
まとめ
思ったよりもシンプルに実装できました。
CPUなので時間がかかるかなと思っていましたが、爆速(1-2分?)で完了しました。
結果も良好です。
しかし今回の方法では時系列情報は持っていません。
実際の制御対象は少なくとも無駄時間、遅れ時間を持っており、その当該step時間のみの関係性では限界があります。また遊び、外乱、量子化ノイズなど様々な要素が複雑に絡みます。
次回は時系列的な情報を含めたもう少し実用的なPPO制御を探っていこうと思います
それではまた。