0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Sim2Realへの挑戦:深層強化学習(Deep RL)とドメインランダマイゼーションで「転ばないAI」を育てる【Day 7】

Last updated at Posted at 2025-12-06

3軸リアクションホイール倒立振子を作るアドカレ の7日目です。

1. はじめに:なぜモデルベース(LQR)からデータ駆動(RL)へ転向するのか

前回の [Day 6] では、現代制御の王道である LQR (Linear Quadratic Regulator) を実装しました。
シミュレーション上の理想的な環境下では、LQRは数学的に最適な挙動を示し、微動だにしない静定性を発揮しました。

しかし、最終目標は 「実機 (STM32)」 への実装です。
現実世界 (Real) は、シミュレーション (Sim) とは決定的に異なります。

  • 非線形な摩擦: ギアやベアリングの静止摩擦 (Stiction) は線形モデルでは扱いきれません。
  • モデル化誤差: 3Dプリント筐体の剛性不足、重心のズレ、慣性モーメントの推定誤差。
  • 予期せぬ外乱: ケーブルの張力、床の傾き、突発的な衝撃。

これらの「不確実性」に対し、LQRのような固定ゲイン制御は脆さを露呈します。
そこで採用するのが 深層強化学習 (Deep Reinforcement Learning) です。

運動方程式を解くのではなく、シミュレータの中で何百万回も転倒し、「試行錯誤 (Trial and Error)」 を通じて、ノイズや外乱に打ち勝つ方策 (Policy) をニューラルネットワークに学習させます。

本日のゴールは、2.5Nm級の外乱にも耐えうる 「Robust (頑健) なAIモデル」 を構築することです。

2. 強化学習の問題設定 (Formulation)

今回のタスクを MDP (マルコフ決定過程) として定式化します。

2.1 状態空間 (Observation Space) S

エージェント(STM32マイコン)が観測できる情報です。
今回は6次元のベクトルとしました。角度そのものではなく、目標角度との偏差を入力することで、任意の目標値に対応できるようにします。

s_t = [ \theta_{err}, \phi_{err}, \psi_{err}, \dot{\theta}, \dot{\phi}, \dot{\psi} ]^T
  • $\theta_{err}, \phi_{err}, \psi_{err}$: Roll, Pitch, Yaw の目標角度との誤差($-\pi \sim \pi$ に正規化)
  • $\dot{\theta}, \dot{\phi}, \dot{\psi}$: 各軸の角速度 (ジャイロセンサ値)

2.2 行動空間 (Action Space) A

エージェントが出力する指令値です。
3つのリアクションホイールへのトルク指令を、$-1.0 \sim 1.0$ の連続値で出力します。

a_t = [ u_{roll}, u_{pitch}, u_{yaw} ]^T \quad \in [-1.0, 1.0]

2.3 報酬関数 (Reward Function) R

ここがAIの挙動を決定づける最も重要な設計要素です。
「姿勢を維持しろ」かつ「無駄なエネルギーを使うな」というトレードオフを数式化します。

r_t = r_{survival} - ( w_{angle} \sum \theta_{err}^2 + w_{vel} \sum \dot{\theta}^2 + w_{action} \sum a_t^2 )
  • $r_{survival} = 1.0$: 倒れずに耐えているだけで貰える生存報酬。
  • $w_{angle} = 10.0$: 角度誤差に対するペナルティ。ここを重くすることで直立を最優先させる。
  • $w_{action} = 0.01$: エネルギー消費(トルク)に対するペナルティ。LQRの $R$ 行列に相当し、バンバン制御(最大トルクの張り付き)を抑制する。

3. Sim2Realの秘策:ドメインランダマイゼーション

ただ漫然と学習させるだけでは、シミュレータ特有のクリーンな環境に過学習してしまいます(温室育ちのAI)。
そこで、学習プロセスに Random Kick Wrapper を導入します。

これは、エージェントが決定した行動 $a_t$ に対し、確率的に外乱ノイズ $\delta$ を混入させるものです。

a'_{t} = a_t + \delta, \quad \delta \sim U(-K, K)

これにより、エージェントは**「自分が意図した通りに体が動かない状況」**を強制的に経験し、そこからリカバリーするロバスト性を獲得します。

4. 実装コード解説

開発環境には GymnasiumStable Baselines3 (PPO) を使用します。
以下に、実際に使用した全ソースコードを掲載します。

4.1 環境定義: cubli_env.py

PyBulletの物理演算とGymnasiumのインターフェースを繋ぐクラスです。
_get_obs メソッド内で角度の正規化を行っている点に注目してください。

import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pybullet as p
import pybullet_data
import math
import time

class CubliEnv(gym.Env):
    def __init__(self, render_mode=None):
        super(CubliEnv, self).__init__()
        self.render_mode = render_mode
        
        # GUIモードと高速計算モード(DIRECT)の切り替え
        if render_mode == "human":
            self.physicsClient = p.connect(p.GUI)
            p.resetDebugVisualizerCamera(cameraDistance=0.4, cameraYaw=45, cameraPitch=-15, cameraTargetPosition=[0,0,0.1])
        else:
            self.physicsClient = p.connect(p.DIRECT) 

        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        
        # Action Space: 3軸トルク [-1.0 ~ 1.0]
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)
        # Observation Space: [誤差(3), 角速度(3)]
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)

        self.dt = 1./240.
        self.max_torque = 0.5  # 実機のモータ仕様に合わせる
        
        # 頂点倒立の目標角度 (立方体の対角線立ち)
        self.target_pitch = math.asin(1/math.sqrt(3))
        self.target_roll  = math.pi * 3 / 4
        self.target_yaw   = 0

        self.boxId = None

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        p.resetSimulation()
        p.setGravity(0, 0, -9.81)
        p.loadURDF("plane.urdf")
        
        # 初期位置にランダムノイズを乗せる(初期値ランダマイズ)
        noise = np.random.uniform(-0.1, 0.1, 3)
        start_r = self.target_roll + noise[0]
        start_p = self.target_pitch + noise[1]
        start_y = self.target_yaw + noise[2]
        
        startPos = [0, 0, 0.09]
        startOrientation = p.getQuaternionFromEuler([start_r, start_p, start_y])
        
        # 自作URDFの読み込み
        self.boxId = p.loadURDF("my_cubli.urdf", startPos, startOrientation, globalScaling=1.0)
        
        # 摩擦係数の設定(Sim2Realで重要)
        p.changeDynamics(self.boxId, -1, lateralFriction=1.0, spinningFriction=0.1)
        
        for i in range(3):
            p.setJointMotorControl2(self.boxId, i, p.VELOCITY_CONTROL, force=0)

        # 統計用変数のリセット
        self.episode_error_sum = 0.0
        self.episode_energy_sum = 0.0
        self.episode_steps = 0

        return self._get_obs(), {}

    def step(self, action):
        # トルク指令の適用
        # Y軸は慣性モーメントが大きいので少しゲイン倍率を変えても良いが、今回は強化学習に任せる
        torque = np.array(action) * self.max_torque * -1.0
        
        p.setJointMotorControl2(self.boxId, 0, p.TORQUE_CONTROL, force=torque[0])
        p.setJointMotorControl2(self.boxId, 1, p.TORQUE_CONTROL, force=torque[1])
        p.setJointMotorControl2(self.boxId, 2, p.TORQUE_CONTROL, force=torque[2])

        p.stepSimulation()
        if self.render_mode == "human":
            time.sleep(self.dt)

        obs = self._get_obs()
        err_r, err_p, err_y, vel_r, vel_p, vel_y = obs
        
        # 報酬関数の計算
        angle_penalty = 10.0 * (err_r**2 + err_p**2 + err_y**2)
        vel_penalty   = 0.1 * (vel_r**2 + vel_p**2 + vel_y**2)
        action_penalty = 0.01 * np.sum(np.square(action))
        survival_reward = 1.0

        reward = survival_reward - angle_penalty - vel_penalty - action_penalty

        # ログ用データ蓄積
        self.episode_error_sum += (abs(err_r) + abs(err_p)) * 180/math.pi
        self.episode_energy_sum += np.sum(np.abs(torque))
        self.episode_steps += 1

        # 終了判定(倒れたら終わり)
        limit_angle = 0.8 # rad (約45度)
        terminated = bool(abs(err_r) > limit_angle or abs(err_p) > limit_angle)
        truncated = False

        info = {}
        if terminated or truncated:
            info["episode_metrics"] = {
                "avg_angle_error_deg": self.episode_error_sum / self.episode_steps,
                "total_energy": self.episode_energy_sum,
                "survival_time": self.episode_steps * self.dt
            }

        return obs, reward, terminated, truncated, info

    def _get_obs(self):
        # PyBulletから状態取得
        pos, quat = p.getBasePositionAndOrientation(self.boxId)
        r, p_ang, y = p.getEulerFromQuaternion(quat)
        lin_vel, ang_vel = p.getBaseVelocity(self.boxId)
        
        # 誤差の計算
        err_r = r - self.target_roll
        err_p = p_ang - self.target_pitch
        err_y = y - self.target_yaw
        
        # 角度の正規化 (-pi ~ pi)
        # これをやらないと、359度と1度の区別がつかずに学習が発散する
        for err in [err_r, err_p, err_y]:
            while err > math.pi: err -= 2*math.pi
            while err < -math.pi: err += 2*math.pi
            
        return np.array([err_r, err_p, err_y, ang_vel[0], ang_vel[1], ang_vel[2]], dtype=np.float32)

    def close(self):
        p.disconnect()

4.2 学習実行: train_robust.py

PPOアルゴリズムを用いて学習を回します。
ここでは VecNormalize を使用している点に注意してください。PPOなどのPolicy Gradient手法は報酬のスケールに敏感なため、移動平均を用いて報酬と観測値を正規化することで、学習の安定性と収束速度を劇的に向上させています。

import gymnasium as gym
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize, VecMonitor
from stable_baselines3.common.callbacks import CheckpointCallback
from cubli_env import CubliEnv

# 外乱注入用ラッパー
class RandomKickWrapper(gym.ActionWrapper):
    def __init__(self, env, kick_prob=0.01, kick_power=1.5):
        super().__init__(env)
        self.kick_prob = kick_prob
        self.kick_power = kick_power

    def action(self, action):
        # 一定確率でランダムなキック(トルク外乱)を加える
        if np.random.rand() < self.kick_prob:
            kick = np.random.uniform(-self.kick_power, self.kick_power, size=action.shape)
            return action + kick
        return action

def make_env():
    env = CubliEnv(render_mode=None)
    # ここで外乱ラッパーを適用
    env = RandomKickWrapper(env, kick_prob=0.02, kick_power=0.8)
    return env

if __name__ == '__main__':
    MODELS_DIR = "models/PPO_Robust"
    LOG_DIR = "logs/PPO_Robust"
    os.makedirs(MODELS_DIR, exist_ok=True)
    os.makedirs(LOG_DIR, exist_ok=True)

    # 8並列で環境を構築
    num_cpu = 8 
    env = SubprocVecEnv([make_env for _ in range(num_cpu)])
    
    # モニタリングと正規化
    env = VecMonitor(env, filename=f"{LOG_DIR}/monitor.csv")
    env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)

    # PPOモデルの定義
    model = PPO(
        "MlpPolicy",
        env,
        verbose=1,
        tensorboard_log=LOG_DIR,
        learning_rate=0.0003,
        n_steps=2048,
        batch_size=64,
        gamma=0.99,
        device="cpu" # GPUがあれば "cuda"
    )

    checkpoint_callback = CheckpointCallback(
        save_freq=100000 // num_cpu, 
        save_path=MODELS_DIR,
        name_prefix="ppo_cubli_robust"
    )

    print(">>> Start Training...")
    # 200万ステップ学習させる
    model.learn(total_timesteps=2000000, callback=checkpoint_callback)

    model.save(f"{MODELS_DIR}/final_model")
    env.save(f"{MODELS_DIR}/vec_normalize.pkl") # 正規化パラメータも忘れず保存
    print(">>> Done!")
    env.close()

5. 学習結果の分析

学習中の報酬推移(Learning Curve)を確認します。
result_learning_curve.png

  • 初期 (0 - 500 Episodes): 報酬は -10000 以下。スタート直後に転倒しており、何も学習できていません。
  • 中期 (500 - 1500 Episodes): 試行錯誤の末、徐々に生存時間が伸び始めます。
  • 収束 (1500+ Episodes): 報酬が安定してゼロ付近(ペナルティなし=直立維持)に収束しました。

このグラフから、約1500エピソード(並列計算で30分程度)あれば、タスクを解決できるポリシーが獲得できたことが分かります。

6. 実証:2.5Nm キックテスト

学習済みモデルに対し、学習時(0.8Nm)の3倍以上である 2.5Nm の外乱を与えて検証します。

検証コード: enjoy_robust.py

import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from cubli_env import CubliEnv
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import time

# ==========================================
# 設定
# ==========================================
# ★アニメーションを見たい場合は True、グラフだけすぐ欲しい場合は False
SHOW_ANIMATION = True

# 比較したい外乱の強さ
TEST_KICK_POWER = 0.5  # 2.5Nm (enjoy_robust.pyと同じ強さ)

# 1. ノーマルモデル(比較対象)
model_normal_path = "models/PPO/500000"a
stats_normal_path = "models/PPO/500000_env.pkl"

# 2. ロバストモデル(今回作った強いやつ)
model_robust_path = "models/PPO_Robust/final_model"
stats_robust_path = "models/PPO_Robust/vec_normalize.pkl"

# 共通設定
KICK_INTERVAL = 5.0
KICK_DURATION = 0.1
SIM_DURATION  = 30.0  # シミュレーション時間 [秒]
FIXED_SEED    = 42    # ★重要: 両方に「全く同じ方向」の力を加えるための種

# ==========================================
# シミュレーション関数
# ==========================================
def run_simulation(model_path, stats_path, label_name, seed):
    print(f"--- Running Simulation for: {label_name} ---")
    
    if not os.path.exists(model_path + ".zip") or not os.path.exists(stats_path):
        print(f"Error: File not found: {model_path} or {stats_path}")
        return None

    # 環境作成
    # SHOW_ANIMATION設定に合わせてモード切替
    render_mode = "human" if SHOW_ANIMATION else None
    base_env = CubliEnv(render_mode=render_mode) 
    env = DummyVecEnv([lambda: base_env])

    # 正規化ロード
    try:
        env = VecNormalize.load(stats_path, env)
        env.training = False
        env.norm_reward = False
    except Exception as e:
        print(f"Stats load error: {e}")
        return None

    # モデルロード
    model = PPO.load(model_path, env=env)

    # 記録用
    history = {"time": [], "err_r": [], "err_p": [], "torque": []}
    
    obs = env.reset()
    
    # 時間管理用(シミュレーション速度調整)
    steps = int(SIM_DURATION / base_env.dt)
    
    print(f"Simulating {label_name}...")
    
    for i in range(steps):
        current_time = i * base_env.dt
        action, _ = model.predict(obs, deterministic=True)
        
        # --- 外乱注入 (共通ロジック) ---
        if current_time > 1.0:
            time_in_cycle = current_time % KICK_INTERVAL
            
            # キック期間内だけ力を加える
            if time_in_cycle < KICK_DURATION:
                # seedを使って確定的な乱数生成(NormalとRobustで同じシード、同じステップ番号)
                kick_rng = np.random.default_rng(seed + i)
                disturbance = kick_rng.uniform(-TEST_KICK_POWER, TEST_KICK_POWER, size=3)
                
                action[0] += disturbance
                
                # ログ表示(画面で見ているとき用)
                if SHOW_ANIMATION and i % 10 == 0:
                     print(f"KICK! {disturbance}")

        # 実行
        obs, reward, done, info = env.step(action)
        
        # ★重要: アニメーションONの場合は待機時間を入れないと物理計算が不安定になる
        if SHOW_ANIMATION:
            time.sleep(base_env.dt)

        # データ記録
        raw_obs = base_env._get_obs()
        flat_action = action[0]
        
        history["time"].append(current_time)
        history["err_r"].append(np.rad2deg(raw_obs[0]))
        history["err_p"].append(np.rad2deg(raw_obs[1]))
        history["torque"].append(flat_action[0] * base_env.max_torque * -1.0)

        if done[0]:
            print(f" -> {label_name} fell down at {current_time:.2f}s")
            break
            
    env.close()
    return history

# ==========================================
# 実行
# ==========================================
# 1. ノーマルモデル実行
hist_normal = run_simulation(model_normal_path, stats_normal_path, "Normal Model", FIXED_SEED)

# 少し待機
if SHOW_ANIMATION: time.sleep(1.0)

# 2. ロバストモデル実行
hist_robust = run_simulation(model_robust_path, stats_robust_path, "Robust Model", FIXED_SEED)

# ==========================================
# CSV保存 (データを結合)
# ==========================================
print("\nProcessing Data...")

# Robustデータをベースにする
if hist_robust:
    df_rob = pd.DataFrame(hist_robust)
    df_rob = df_rob.rename(columns={"err_r": "Robust_Roll_Error", "err_p": "Robust_Pitch_Error", "torque": "Robust_Torque"})
else:
    df_rob = pd.DataFrame({"time": []})

# Normalデータをマージ
if hist_normal:
    df_norm = pd.DataFrame(hist_normal)
    df_norm = df_norm.rename(columns={"err_r": "Normal_Roll_Error", "err_p": "Normal_Pitch_Error", "torque": "Normal_Torque"})
    
    # 時間をキーにして結合
    df_merged = pd.merge(df_rob, df_norm, on="time", how="outer")
    df_merged = df_merged.sort_values(by="time")
else:
    df_merged = df_rob

csv_path = "comparison_result.csv"
df_merged.to_csv(csv_path, index=False)
print(f">>> CSV Saved: {csv_path}")

# ==========================================
# グラフ描画
# ==========================================
print("Plotting comparison results...")
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)

c_roll = "red"
c_pitch = "blue"

# 上段: Roll & Pitch角度
if hist_normal:
    ax1.plot(hist_normal["time"], hist_normal["err_r"], color=c_roll, linestyle="--", label="Normal Roll", alpha=0.6)
    ax1.plot(hist_normal["time"], hist_normal["err_p"], color=c_pitch, linestyle="--", label="Normal Pitch", alpha=0.6)
if hist_robust:
    ax1.plot(hist_robust["time"], hist_robust["err_r"], color=c_roll, linestyle="-", label="Robust Roll", linewidth=2)
    ax1.plot(hist_robust["time"], hist_robust["err_p"], color=c_pitch, linestyle="-", label="Robust Pitch", linewidth=2)

ax1.set_title(f"Stability Comparison (Disturbance: {TEST_KICK_POWER} Nm)")
ax1.set_ylabel("Angle Error [deg]")
ax1.grid(True)
ax1.legend()
ax1.set_ylim(-20, 20)

# 下段: トルク
if hist_normal:
    ax2.plot(hist_normal["time"], hist_normal["torque"], color="red", linestyle="--", label="Normal Torque", alpha=0.6)
if hist_robust:
    ax2.plot(hist_robust["time"], hist_robust["torque"], color="blue", linestyle="-", label="Robust Torque", linewidth=2)

# キックタイミング表示
kick_times = [1.0 + i * KICK_INTERVAL for i in range(int(SIM_DURATION // KICK_INTERVAL))]
for kt in kick_times:
    ax2.axvline(x=kt, color="black", linestyle="--", alpha=0.5)

ax2.set_ylabel("Motor Torque [Nm]")
ax2.set_xlabel("Time [s]")
ax2.grid(True)
ax2.legend()

img_path = "comparison_result.png"
plt.tight_layout()
plt.savefig(img_path)
print(f">>> Graph Saved: {img_path}")
plt.show()

VecNormalize.load を使って、学習時と同じ正規化係数を適用するのが最大のポイントです。これを忘れると入力スケールが合わずに全く動きません。

トルク応答解析

検証の結果得られたデータです。

25N.png
グラフの 点線 (Normal Model)実線 (Robust Model) の挙動の違いに注目してください。

上段:角度誤差 (Angle Error)

  • Normal Model (点線): $t=5s$ の最初の衝撃で角度が大きく振れ、そのまま復帰できずにグラフが途切れています。これは転倒してシミュレーションが終了したことを意味します。外乱なしの環境でしか学習していないため、未知の衝撃に対応できませんでした。
  • Robust Model (実線): 同じ $t=5s$ で衝撃を受けていますが、瞬時にゼロ付近へ収束しています。その後も $t=10s, 15s, 20s...$ と繰り返される強烈なキックに対し、すべて耐え抜いています。

下段:モータトルク (Motor Torque)

  • Robust Model (実線) の凄み: 衝撃が入った瞬間(縦の点線)、鋭いスパイク状のカウンタートルク を出力しているのが分かります。
    • これは「傾いてからゆっくり戻す(PID的なフィードバック)」動きではありません。
    • 角速度の急変を検知した瞬間に、「このままだと倒れる!」と予測し、全力でブレーキを掛けるような 予測的な制御 を行っています。

この「人間がとっさに手をつくような反射神経」こそが、ドメインランダマイゼーションによって鍛えられたDeep RLモデルの真骨頂です。

7. まとめ

本日は、Deep RL (PPO) に「ドメインランダマイゼーション」を組み合わせることで、未知の衝撃にも耐えうるロバストな制御モデルを構築しました。

  • Sim2Realの鍵: 環境にノイズ(Random Kick)を混ぜることで、過学習を防ぐ。
  • 実装のコツ: 観測値の正規化 (VecNormalize) と報酬関数の設計 (Reward Shaping) が命。
  • 性能: ハードウェア限界に近い 2.5Nm の外乱にも耐えるAIが完成。

次回 Day 8 からは、いよいよこの最強の「脳(AI)」を宿す「体(ハードウェア)」の製作に入ります。
テーマは 「ハードウェア選定の要:なぜSTM32G4なのか」 です。お楽しみに。


アドベントカレンダー参加中
STM32×AIで「3軸倒立振子」を作る25日間(ひとりアドカレ)Advent Calendar 2025

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?