0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PPO(強化学習)による水位制御 その1

Last updated at Posted at 2025-02-06

はじめに

最近、LLMでの強化学習が話題ですよね。たGRPOで大きく躍進したDeepSeek
X(旧Twitter)ではGensisによるPPOロボット制御が話題になり、時代は強化学習を求めている。PPO!PPO!

しかしPPOとは何か。解説記事を読んでもさっぱりわからない。だが時代の流れに乗ってみたい。
ということでPPOを実装してみようと思います。

制御対象

今回はタンクの水位を一定に保つ制御を行ってみようと思います。
蛇口を捻れば水が貯まる。貯まった水は常に排出している。なので一定の水の供給が必要。
そんな想定でいきます。

image.png

実装

ということで技術的な解説はスキップして実装してみます。
今回はシンプルにMlpPolicy=多層パーセプトロン(MLP)を選択しました。学習方法はPPOになります。
以下のプログラムは同じフォルダに配置してください。

タンク水位のシミュレータ

まずはタンクのシミュレータがなければ話になりません。
ということで作成しました。
1stepごとにsimulation()を呼び出すことで、タンクの水位をシミュレーションします。

simulator.py
import numpy as np
import random

class Simulator:
    def __init__(self):
        """ シミュレーターの初期化 """
        self.dt = 1  # タイムステップは1秒固定
        self.water_level = 0.0  # 初期水位
        self.Q_in_max = 0.02  # 操作量1.0に対応する最大流入量 (m/秒)
        self.k_leak = 0.00001  # 漏洩係数 (1/秒)
        self.k_outflow = 0.0005  # 放水係数 (1/秒)
        self.min_water_level = 0.0  # 最小水位
        self.max_water_level = 1.0  # 最大水位

    def reset(self):
        """
        シミュレーションの初期化
        ランダムな初期水位とパラメータを設定
        """
        self.water_level = random.uniform(0.0, 0.0)  # 初期水位をランダムに設定
        self.k_leak = random.uniform(0.001, 0.002)  # 漏洩係数をランダムに設定
        self.k_outflow = random.uniform(0.005, 0.015)  # 放水係数をランダムに設定
        self.Q_in_max  = random.uniform(0.01, 0.02)  # 最大流入量
        return self.water_level

    def simulation(self, action):
        """
        シミュレーションを1ステップ実行
        :param action: 操作量 (0.0 ~ 100.0)
        :return: 更新後の水位
        """
        # アクションを有効範囲内にクリップ
        action = np.clip(action, 0.0, 1.0)

        # 操作量に基づく流入量の計算
        Q_in = action * self.Q_in_max  # 流入量 (m/秒)

        # 放水量の計算 (水位に依存)
        Q_out = self.k_outflow * self.water_level  # 放水量 (m/秒)

        # 漏洩量の計算 (水位に依存)
        Q_leak = self.k_leak * self.water_level  # 漏洩量 (m/秒)

        # 水位の更新
        delta_water = (Q_in - Q_out - Q_leak) * self.dt
        new_water_level = self.water_level + delta_water

        # 水位を0と最大水位でクリップ
        new_water_level = np.clip(new_water_level, self.min_water_level, self.max_water_level)

        # 更新された水位を設定
        self.water_level = new_water_level

        return self.water_level

Gym環境の作成

現在はGymではなく、Gymnasiumが良いようです。
(実行するたびに警告がでます)
今回はGym環境で進めます。

報酬定義などもここで決定しています。
報酬は単純に目標水位と現在水位の差としました。

water_level_env.py
# water_level_env.py

import gym
from gym import spaces
import numpy as np
from simulator import Simulator
import random

class WaterLevelEnv(gym.Env):
    """ カスタム環境:水位調整用のGym環境 """
    def __init__(self, desired_level=None):
        super(WaterLevelEnv, self).__init__()
        self.sim = Simulator()

        if desired_level is None:
            self.desired_level_mode="random"
            self.set_target_level() # 目標水位をランダムで取得
        else:
            self.desired_level_mode = "static"
            self.desired_level = desired_level  # 目標水位を設定

        # アクション空間の定義:操作量は0.0~1.0
        self.action_space = spaces.Box(low=np.array([0.0]), high=np.array([1.0]), dtype=np.float32)

        # 観測空間の定義:水位および目標水位
        self.observation_space = spaces.Box(low=np.array([self.sim.min_water_level, 0.0]),
                                            high=np.array([self.sim.max_water_level, self.sim.max_water_level]),
                                            dtype=np.float32)

        self.state = None
        self.dt = self.sim.dt  # タイムステップは1秒固定
        self.max_steps = 300  # 最大ステップ数
        self.current_step = 0
        self.last_action = None  # 最後に実行されたアクションを初期化

    def set_target_level(self):
        # 目標水位をランダムに設定
        self.desired_level = random.uniform(0.1, 0.9)

    def reset(self):
        """ 環境の初期化 """
        self.state = self.sim.reset()  # シミュレーターをリセットし、初期水位を設定
        self.current_step = 0

        if self.desired_level_mode == "random":
            self.set_target_level()

        self.last_action = None  # リセット時にアクションを初期化

        return np.array([self.state, self.desired_level], dtype=np.float32)

    def step(self, action):
        """ 環境のステップ実行 """
        # アクションを有効範囲内にクリップ
        action = np.clip(action, self.action_space.low, self.action_space.high)[0]
        self.last_action = action  # 最後に実行されたアクションを保存

        # シミュレーションを実行して新しい水位を取得
        water_level = self.sim.simulation(action)
        self.state = water_level

        # 報酬の定義:目標水位との差の絶対値の負
        reward = -abs(water_level - self.desired_level)

        # エピソード終了条件の定義
        #今回は指定ステップまで実行で1エピソード終了
        self.current_step += 1
        done = self.current_step >= self.max_steps

        # 追加情報(必要に応じて使用)
        info = {"status":float(self.state)}
        return np.array([self.state, self.desired_level], dtype=np.float32), reward, done, info

    def render(self, mode='human'):
        """ 環境のレンダリング """
        if self.last_action is not None:
            print(f"現在の水位: {self.state:.2f} m (目標: {self.desired_level} m) | 最後の操作量: {self.last_action:.2f}")
        else:
            print(f"現在の水位: {self.state:.2f} m (目標: {self.desired_level} m) | 最後の操作量: なし")


学習

続いて学習です。
こちらのpyを実行することで、学習が開始されます。
total_timesteps=100000 で学習回数を指定しています。10万回であれば数分で終了すると思います。

train.py
# train.py

from water_level_env import WaterLevelEnv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize

def make_env():
    env = WaterLevelEnv()
    env.is_training = True  # 環境に訓練モードを設定
    return env

if __name__ == "__main__":
    # DummyVecEnvによる環境ラッピング
    env = DummyVecEnv([make_env])
    # 観測値と報酬の正規化
    env = VecNormalize(env, norm_obs=True, norm_reward=True)

    # PPOエージェントの作成
    model = PPO("MlpPolicy", env, verbose=1)

    # エージェントの訓練
    model.learn(total_timesteps=100000)

    # モデルの保存(VecNormalizeの統計情報も一緒に保存すると、テスト時に同じ統計で正規化可能)
    model.save("ppo_water_level")
    env.save("vecnormalize_stats.pkl")

    env.close()
    

テスト実行

いよいよPPO強化学習が実行できます。

test.py
from water_level_env import WaterLevelEnv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
import os

def make_test_env():
    return WaterLevelEnv(desired_level=0.5)

if __name__ == "__main__":
    # DummyVecEnvでテスト環境をラッピング
    test_env = DummyVecEnv([make_test_env])
    # VecNormalizeで観測・報酬の正規化
    test_env = VecNormalize(test_env, training=False, norm_obs=True, norm_reward=True)

    # 学習時に保存したVecNormalizeの統計情報を読み込む
    if os.path.exists("vecnormalize_stats.pkl"):
        test_env = VecNormalize.load("vecnormalize_stats.pkl", test_env)

    # 訓練済みモデルの読み込み
    model = PPO.load("ppo_water_level", env=test_env)

    # テストの実行
    obs = test_env.reset()
    for _ in range(1000):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, info = test_env.step(action)
        # VecEnvの場合、各環境に対してrender()を呼び出すためenv_method()を利用
        test_env.env_method("render")
        if done:
            obs = test_env.reset()

結果

目標0.5mでテストを行いました。
黄色が目標値で、青が水位、赤が操作量です。
ほんと?と驚くほどドンピシャですね。

test.png

Tips

  • action,水位は0~1.0で正規化した方が良さそうです
  • test.pyのdeterministic=False またはdeterministic=Trueの指定で結構結果が変わりました

まとめ

思ったよりもシンプルに実装できました。
CPUなので時間がかかるかなと思っていましたが、爆速(1-2分?)で完了しました。
結果も良好です。
しかし今回の方法では時系列情報は持っていません。
実際の制御対象は少なくとも無駄時間、遅れ時間を持っており、その当該step時間のみの関係性では限界があります。また遊び、外乱、量子化ノイズなど様々な要素が複雑に絡みます。
次回は時系列的な情報を含めたもう少し実用的なPPO制御を探っていこうと思います

それではまた。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?