3軸リアクションホイール倒立振子を作るアドカレ の7日目です。
1. はじめに:なぜモデルベース(LQR)からデータ駆動(RL)へ転向するのか
前回の [Day 6] では、現代制御の王道である LQR (Linear Quadratic Regulator) を実装しました。
シミュレーション上の理想的な環境下では、LQRは数学的に最適な挙動を示し、微動だにしない静定性を発揮しました。
しかし、最終目標は 「実機 (STM32)」 への実装です。
現実世界 (Real) は、シミュレーション (Sim) とは決定的に異なります。
- 非線形な摩擦: ギアやベアリングの静止摩擦 (Stiction) は線形モデルでは扱いきれません。
- モデル化誤差: 3Dプリント筐体の剛性不足、重心のズレ、慣性モーメントの推定誤差。
- 予期せぬ外乱: ケーブルの張力、床の傾き、突発的な衝撃。
これらの「不確実性」に対し、LQRのような固定ゲイン制御は脆さを露呈します。
そこで採用するのが 深層強化学習 (Deep Reinforcement Learning) です。
運動方程式を解くのではなく、シミュレータの中で何百万回も転倒し、「試行錯誤 (Trial and Error)」 を通じて、ノイズや外乱に打ち勝つ方策 (Policy) をニューラルネットワークに学習させます。
本日のゴールは、2.5Nm級の外乱にも耐えうる 「Robust (頑健) なAIモデル」 を構築することです。
2. 強化学習の問題設定 (Formulation)
今回のタスクを MDP (マルコフ決定過程) として定式化します。
2.1 状態空間 (Observation Space) S
エージェント(STM32マイコン)が観測できる情報です。
今回は6次元のベクトルとしました。角度そのものではなく、目標角度との偏差を入力することで、任意の目標値に対応できるようにします。
s_t = [ \theta_{err}, \phi_{err}, \psi_{err}, \dot{\theta}, \dot{\phi}, \dot{\psi} ]^T
- $\theta_{err}, \phi_{err}, \psi_{err}$: Roll, Pitch, Yaw の目標角度との誤差($-\pi \sim \pi$ に正規化)
- $\dot{\theta}, \dot{\phi}, \dot{\psi}$: 各軸の角速度 (ジャイロセンサ値)
2.2 行動空間 (Action Space) A
エージェントが出力する指令値です。
3つのリアクションホイールへのトルク指令を、$-1.0 \sim 1.0$ の連続値で出力します。
a_t = [ u_{roll}, u_{pitch}, u_{yaw} ]^T \quad \in [-1.0, 1.0]
2.3 報酬関数 (Reward Function) R
ここがAIの挙動を決定づける最も重要な設計要素です。
「姿勢を維持しろ」かつ「無駄なエネルギーを使うな」というトレードオフを数式化します。
r_t = r_{survival} - ( w_{angle} \sum \theta_{err}^2 + w_{vel} \sum \dot{\theta}^2 + w_{action} \sum a_t^2 )
- $r_{survival} = 1.0$: 倒れずに耐えているだけで貰える生存報酬。
- $w_{angle} = 10.0$: 角度誤差に対するペナルティ。ここを重くすることで直立を最優先させる。
- $w_{action} = 0.01$: エネルギー消費(トルク)に対するペナルティ。LQRの $R$ 行列に相当し、バンバン制御(最大トルクの張り付き)を抑制する。
3. Sim2Realの秘策:ドメインランダマイゼーション
ただ漫然と学習させるだけでは、シミュレータ特有のクリーンな環境に過学習してしまいます(温室育ちのAI)。
そこで、学習プロセスに Random Kick Wrapper を導入します。
これは、エージェントが決定した行動 $a_t$ に対し、確率的に外乱ノイズ $\delta$ を混入させるものです。
a'_{t} = a_t + \delta, \quad \delta \sim U(-K, K)
これにより、エージェントは**「自分が意図した通りに体が動かない状況」**を強制的に経験し、そこからリカバリーするロバスト性を獲得します。
4. 実装コード解説
開発環境には Gymnasium と Stable Baselines3 (PPO) を使用します。
以下に、実際に使用した全ソースコードを掲載します。
4.1 環境定義: cubli_env.py
PyBulletの物理演算とGymnasiumのインターフェースを繋ぐクラスです。
_get_obs メソッド内で角度の正規化を行っている点に注目してください。
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pybullet as p
import pybullet_data
import math
import time
class CubliEnv(gym.Env):
def __init__(self, render_mode=None):
super(CubliEnv, self).__init__()
self.render_mode = render_mode
# GUIモードと高速計算モード(DIRECT)の切り替え
if render_mode == "human":
self.physicsClient = p.connect(p.GUI)
p.resetDebugVisualizerCamera(cameraDistance=0.4, cameraYaw=45, cameraPitch=-15, cameraTargetPosition=[0,0,0.1])
else:
self.physicsClient = p.connect(p.DIRECT)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
# Action Space: 3軸トルク [-1.0 ~ 1.0]
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)
# Observation Space: [誤差(3), 角速度(3)]
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32)
self.dt = 1./240.
self.max_torque = 0.5 # 実機のモータ仕様に合わせる
# 頂点倒立の目標角度 (立方体の対角線立ち)
self.target_pitch = math.asin(1/math.sqrt(3))
self.target_roll = math.pi * 3 / 4
self.target_yaw = 0
self.boxId = None
def reset(self, seed=None, options=None):
super().reset(seed=seed)
p.resetSimulation()
p.setGravity(0, 0, -9.81)
p.loadURDF("plane.urdf")
# 初期位置にランダムノイズを乗せる(初期値ランダマイズ)
noise = np.random.uniform(-0.1, 0.1, 3)
start_r = self.target_roll + noise[0]
start_p = self.target_pitch + noise[1]
start_y = self.target_yaw + noise[2]
startPos = [0, 0, 0.09]
startOrientation = p.getQuaternionFromEuler([start_r, start_p, start_y])
# 自作URDFの読み込み
self.boxId = p.loadURDF("my_cubli.urdf", startPos, startOrientation, globalScaling=1.0)
# 摩擦係数の設定(Sim2Realで重要)
p.changeDynamics(self.boxId, -1, lateralFriction=1.0, spinningFriction=0.1)
for i in range(3):
p.setJointMotorControl2(self.boxId, i, p.VELOCITY_CONTROL, force=0)
# 統計用変数のリセット
self.episode_error_sum = 0.0
self.episode_energy_sum = 0.0
self.episode_steps = 0
return self._get_obs(), {}
def step(self, action):
# トルク指令の適用
# Y軸は慣性モーメントが大きいので少しゲイン倍率を変えても良いが、今回は強化学習に任せる
torque = np.array(action) * self.max_torque * -1.0
p.setJointMotorControl2(self.boxId, 0, p.TORQUE_CONTROL, force=torque[0])
p.setJointMotorControl2(self.boxId, 1, p.TORQUE_CONTROL, force=torque[1])
p.setJointMotorControl2(self.boxId, 2, p.TORQUE_CONTROL, force=torque[2])
p.stepSimulation()
if self.render_mode == "human":
time.sleep(self.dt)
obs = self._get_obs()
err_r, err_p, err_y, vel_r, vel_p, vel_y = obs
# 報酬関数の計算
angle_penalty = 10.0 * (err_r**2 + err_p**2 + err_y**2)
vel_penalty = 0.1 * (vel_r**2 + vel_p**2 + vel_y**2)
action_penalty = 0.01 * np.sum(np.square(action))
survival_reward = 1.0
reward = survival_reward - angle_penalty - vel_penalty - action_penalty
# ログ用データ蓄積
self.episode_error_sum += (abs(err_r) + abs(err_p)) * 180/math.pi
self.episode_energy_sum += np.sum(np.abs(torque))
self.episode_steps += 1
# 終了判定(倒れたら終わり)
limit_angle = 0.8 # rad (約45度)
terminated = bool(abs(err_r) > limit_angle or abs(err_p) > limit_angle)
truncated = False
info = {}
if terminated or truncated:
info["episode_metrics"] = {
"avg_angle_error_deg": self.episode_error_sum / self.episode_steps,
"total_energy": self.episode_energy_sum,
"survival_time": self.episode_steps * self.dt
}
return obs, reward, terminated, truncated, info
def _get_obs(self):
# PyBulletから状態取得
pos, quat = p.getBasePositionAndOrientation(self.boxId)
r, p_ang, y = p.getEulerFromQuaternion(quat)
lin_vel, ang_vel = p.getBaseVelocity(self.boxId)
# 誤差の計算
err_r = r - self.target_roll
err_p = p_ang - self.target_pitch
err_y = y - self.target_yaw
# 角度の正規化 (-pi ~ pi)
# これをやらないと、359度と1度の区別がつかずに学習が発散する
for err in [err_r, err_p, err_y]:
while err > math.pi: err -= 2*math.pi
while err < -math.pi: err += 2*math.pi
return np.array([err_r, err_p, err_y, ang_vel[0], ang_vel[1], ang_vel[2]], dtype=np.float32)
def close(self):
p.disconnect()
4.2 学習実行: train_robust.py
PPOアルゴリズムを用いて学習を回します。
ここでは VecNormalize を使用している点に注意してください。PPOなどのPolicy Gradient手法は報酬のスケールに敏感なため、移動平均を用いて報酬と観測値を正規化することで、学習の安定性と収束速度を劇的に向上させています。
import gymnasium as gym
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize, VecMonitor
from stable_baselines3.common.callbacks import CheckpointCallback
from cubli_env import CubliEnv
# 外乱注入用ラッパー
class RandomKickWrapper(gym.ActionWrapper):
def __init__(self, env, kick_prob=0.01, kick_power=1.5):
super().__init__(env)
self.kick_prob = kick_prob
self.kick_power = kick_power
def action(self, action):
# 一定確率でランダムなキック(トルク外乱)を加える
if np.random.rand() < self.kick_prob:
kick = np.random.uniform(-self.kick_power, self.kick_power, size=action.shape)
return action + kick
return action
def make_env():
env = CubliEnv(render_mode=None)
# ここで外乱ラッパーを適用
env = RandomKickWrapper(env, kick_prob=0.02, kick_power=0.8)
return env
if __name__ == '__main__':
MODELS_DIR = "models/PPO_Robust"
LOG_DIR = "logs/PPO_Robust"
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
# 8並列で環境を構築
num_cpu = 8
env = SubprocVecEnv([make_env for _ in range(num_cpu)])
# モニタリングと正規化
env = VecMonitor(env, filename=f"{LOG_DIR}/monitor.csv")
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)
# PPOモデルの定義
model = PPO(
"MlpPolicy",
env,
verbose=1,
tensorboard_log=LOG_DIR,
learning_rate=0.0003,
n_steps=2048,
batch_size=64,
gamma=0.99,
device="cpu" # GPUがあれば "cuda"
)
checkpoint_callback = CheckpointCallback(
save_freq=100000 // num_cpu,
save_path=MODELS_DIR,
name_prefix="ppo_cubli_robust"
)
print(">>> Start Training...")
# 200万ステップ学習させる
model.learn(total_timesteps=2000000, callback=checkpoint_callback)
model.save(f"{MODELS_DIR}/final_model")
env.save(f"{MODELS_DIR}/vec_normalize.pkl") # 正規化パラメータも忘れず保存
print(">>> Done!")
env.close()
5. 学習結果の分析
学習中の報酬推移(Learning Curve)を確認します。

- 初期 (0 - 500 Episodes): 報酬は -10000 以下。スタート直後に転倒しており、何も学習できていません。
- 中期 (500 - 1500 Episodes): 試行錯誤の末、徐々に生存時間が伸び始めます。
- 収束 (1500+ Episodes): 報酬が安定してゼロ付近(ペナルティなし=直立維持)に収束しました。
このグラフから、約1500エピソード(並列計算で30分程度)あれば、タスクを解決できるポリシーが獲得できたことが分かります。
6. 実証:2.5Nm キックテスト
学習済みモデルに対し、学習時(0.8Nm)の3倍以上である 2.5Nm の外乱を与えて検証します。
検証コード: enjoy_robust.py
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from cubli_env import CubliEnv
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import time
# ==========================================
# 設定
# ==========================================
# ★アニメーションを見たい場合は True、グラフだけすぐ欲しい場合は False
SHOW_ANIMATION = True
# 比較したい外乱の強さ
TEST_KICK_POWER = 0.5 # 2.5Nm (enjoy_robust.pyと同じ強さ)
# 1. ノーマルモデル(比較対象)
model_normal_path = "models/PPO/500000"a
stats_normal_path = "models/PPO/500000_env.pkl"
# 2. ロバストモデル(今回作った強いやつ)
model_robust_path = "models/PPO_Robust/final_model"
stats_robust_path = "models/PPO_Robust/vec_normalize.pkl"
# 共通設定
KICK_INTERVAL = 5.0
KICK_DURATION = 0.1
SIM_DURATION = 30.0 # シミュレーション時間 [秒]
FIXED_SEED = 42 # ★重要: 両方に「全く同じ方向」の力を加えるための種
# ==========================================
# シミュレーション関数
# ==========================================
def run_simulation(model_path, stats_path, label_name, seed):
print(f"--- Running Simulation for: {label_name} ---")
if not os.path.exists(model_path + ".zip") or not os.path.exists(stats_path):
print(f"Error: File not found: {model_path} or {stats_path}")
return None
# 環境作成
# SHOW_ANIMATION設定に合わせてモード切替
render_mode = "human" if SHOW_ANIMATION else None
base_env = CubliEnv(render_mode=render_mode)
env = DummyVecEnv([lambda: base_env])
# 正規化ロード
try:
env = VecNormalize.load(stats_path, env)
env.training = False
env.norm_reward = False
except Exception as e:
print(f"Stats load error: {e}")
return None
# モデルロード
model = PPO.load(model_path, env=env)
# 記録用
history = {"time": [], "err_r": [], "err_p": [], "torque": []}
obs = env.reset()
# 時間管理用(シミュレーション速度調整)
steps = int(SIM_DURATION / base_env.dt)
print(f"Simulating {label_name}...")
for i in range(steps):
current_time = i * base_env.dt
action, _ = model.predict(obs, deterministic=True)
# --- 外乱注入 (共通ロジック) ---
if current_time > 1.0:
time_in_cycle = current_time % KICK_INTERVAL
# キック期間内だけ力を加える
if time_in_cycle < KICK_DURATION:
# seedを使って確定的な乱数生成(NormalとRobustで同じシード、同じステップ番号)
kick_rng = np.random.default_rng(seed + i)
disturbance = kick_rng.uniform(-TEST_KICK_POWER, TEST_KICK_POWER, size=3)
action[0] += disturbance
# ログ表示(画面で見ているとき用)
if SHOW_ANIMATION and i % 10 == 0:
print(f"KICK! {disturbance}")
# 実行
obs, reward, done, info = env.step(action)
# ★重要: アニメーションONの場合は待機時間を入れないと物理計算が不安定になる
if SHOW_ANIMATION:
time.sleep(base_env.dt)
# データ記録
raw_obs = base_env._get_obs()
flat_action = action[0]
history["time"].append(current_time)
history["err_r"].append(np.rad2deg(raw_obs[0]))
history["err_p"].append(np.rad2deg(raw_obs[1]))
history["torque"].append(flat_action[0] * base_env.max_torque * -1.0)
if done[0]:
print(f" -> {label_name} fell down at {current_time:.2f}s")
break
env.close()
return history
# ==========================================
# 実行
# ==========================================
# 1. ノーマルモデル実行
hist_normal = run_simulation(model_normal_path, stats_normal_path, "Normal Model", FIXED_SEED)
# 少し待機
if SHOW_ANIMATION: time.sleep(1.0)
# 2. ロバストモデル実行
hist_robust = run_simulation(model_robust_path, stats_robust_path, "Robust Model", FIXED_SEED)
# ==========================================
# CSV保存 (データを結合)
# ==========================================
print("\nProcessing Data...")
# Robustデータをベースにする
if hist_robust:
df_rob = pd.DataFrame(hist_robust)
df_rob = df_rob.rename(columns={"err_r": "Robust_Roll_Error", "err_p": "Robust_Pitch_Error", "torque": "Robust_Torque"})
else:
df_rob = pd.DataFrame({"time": []})
# Normalデータをマージ
if hist_normal:
df_norm = pd.DataFrame(hist_normal)
df_norm = df_norm.rename(columns={"err_r": "Normal_Roll_Error", "err_p": "Normal_Pitch_Error", "torque": "Normal_Torque"})
# 時間をキーにして結合
df_merged = pd.merge(df_rob, df_norm, on="time", how="outer")
df_merged = df_merged.sort_values(by="time")
else:
df_merged = df_rob
csv_path = "comparison_result.csv"
df_merged.to_csv(csv_path, index=False)
print(f">>> CSV Saved: {csv_path}")
# ==========================================
# グラフ描画
# ==========================================
print("Plotting comparison results...")
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)
c_roll = "red"
c_pitch = "blue"
# 上段: Roll & Pitch角度
if hist_normal:
ax1.plot(hist_normal["time"], hist_normal["err_r"], color=c_roll, linestyle="--", label="Normal Roll", alpha=0.6)
ax1.plot(hist_normal["time"], hist_normal["err_p"], color=c_pitch, linestyle="--", label="Normal Pitch", alpha=0.6)
if hist_robust:
ax1.plot(hist_robust["time"], hist_robust["err_r"], color=c_roll, linestyle="-", label="Robust Roll", linewidth=2)
ax1.plot(hist_robust["time"], hist_robust["err_p"], color=c_pitch, linestyle="-", label="Robust Pitch", linewidth=2)
ax1.set_title(f"Stability Comparison (Disturbance: {TEST_KICK_POWER} Nm)")
ax1.set_ylabel("Angle Error [deg]")
ax1.grid(True)
ax1.legend()
ax1.set_ylim(-20, 20)
# 下段: トルク
if hist_normal:
ax2.plot(hist_normal["time"], hist_normal["torque"], color="red", linestyle="--", label="Normal Torque", alpha=0.6)
if hist_robust:
ax2.plot(hist_robust["time"], hist_robust["torque"], color="blue", linestyle="-", label="Robust Torque", linewidth=2)
# キックタイミング表示
kick_times = [1.0 + i * KICK_INTERVAL for i in range(int(SIM_DURATION // KICK_INTERVAL))]
for kt in kick_times:
ax2.axvline(x=kt, color="black", linestyle="--", alpha=0.5)
ax2.set_ylabel("Motor Torque [Nm]")
ax2.set_xlabel("Time [s]")
ax2.grid(True)
ax2.legend()
img_path = "comparison_result.png"
plt.tight_layout()
plt.savefig(img_path)
print(f">>> Graph Saved: {img_path}")
plt.show()
VecNormalize.load を使って、学習時と同じ正規化係数を適用するのが最大のポイントです。これを忘れると入力スケールが合わずに全く動きません。
トルク応答解析
検証の結果得られたデータです。

グラフの 点線 (Normal Model) と 実線 (Robust Model) の挙動の違いに注目してください。
上段:角度誤差 (Angle Error)
- Normal Model (点線): $t=5s$ の最初の衝撃で角度が大きく振れ、そのまま復帰できずにグラフが途切れています。これは転倒してシミュレーションが終了したことを意味します。外乱なしの環境でしか学習していないため、未知の衝撃に対応できませんでした。
- Robust Model (実線): 同じ $t=5s$ で衝撃を受けていますが、瞬時にゼロ付近へ収束しています。その後も $t=10s, 15s, 20s...$ と繰り返される強烈なキックに対し、すべて耐え抜いています。
下段:モータトルク (Motor Torque)
-
Robust Model (実線) の凄み: 衝撃が入った瞬間(縦の点線)、鋭いスパイク状のカウンタートルク を出力しているのが分かります。
- これは「傾いてからゆっくり戻す(PID的なフィードバック)」動きではありません。
- 角速度の急変を検知した瞬間に、「このままだと倒れる!」と予測し、全力でブレーキを掛けるような 予測的な制御 を行っています。
この「人間がとっさに手をつくような反射神経」こそが、ドメインランダマイゼーションによって鍛えられたDeep RLモデルの真骨頂です。
7. まとめ
本日は、Deep RL (PPO) に「ドメインランダマイゼーション」を組み合わせることで、未知の衝撃にも耐えうるロバストな制御モデルを構築しました。
- Sim2Realの鍵: 環境にノイズ(Random Kick)を混ぜることで、過学習を防ぐ。
-
実装のコツ: 観測値の正規化 (
VecNormalize) と報酬関数の設計 (Reward Shaping) が命。 - 性能: ハードウェア限界に近い 2.5Nm の外乱にも耐えるAIが完成。
次回 Day 8 からは、いよいよこの最強の「脳(AI)」を宿す「体(ハードウェア)」の製作に入ります。
テーマは 「ハードウェア選定の要:なぜSTM32G4なのか」 です。お楽しみに。
アドベントカレンダー参加中
STM32×AIで「3軸倒立振子」を作る25日間(ひとりアドカレ)Advent Calendar 2025