GenesisとUnitree Go2を用いた強化学習プロジェクト解説
この記事では、物理シミュレーションと強化学習を組み合わせたプラットフォームであるGenesisを用いたサンプルプロジェクトについて解説します。
具体的には、Genesisを使って四足歩行ロボットUnitree Go2の歩行を学習させるプロジェクトを例に、強化学習の基本的な概念、Genesisのシミュレーション環境、そして強化学習エージェントの訓練について説明します。
Unitree Go2の歩行学習
今回解説するサンプルプロジェクトでは、Genesisを用いて四足歩行ロボット Unitree Go2 の歩行を学習させます。
実物のGo2ロボットを使用する代わりに、Genesisのシミュレーション環境内でロボットのモデルを動かし、強化学習アルゴリズムによって歩行パターンを獲得させます。
1. 強化学習の基本概念
強化学習について簡単に復習しましょう。
- エージェント (Agent): 学習し、行動する主体です。今回の例では、Go2ロボットがエージェントに相当します。
- 環境 (Environment): エージェントが相互作用する対象です。Genesisのシミュレーション空間が環境となります。
- 状態 (State): エージェントが観測する環境の情報です。例えば、Go2の各関節の角度や角速度、胴体の姿勢などです。
- 行動 (Action): エージェントが環境に対して行う操作です。Go2の場合、各関節の目標トルクなどが行動になります。
- 報酬 (Reward): エージェントの行動の良し悪しを評価する指標です。歩行タスクでは、前進速度や安定性などに基づいて報酬が設計されます。
強化学習の目標は、エージェントが累積報酬を最大化するような行動方針(方策, Policy)を獲得することです。
1.1 PPO (Proximal Policy Optimization) アルゴリズム
このプロジェクトでは、強化学習アルゴリズムとして PPO (Proximal Policy Optimization) が使用されています。
PPOは、方策勾配法 をベースとしたアルゴリズムの一種です。
方策勾配法では、方策(確率的な行動選択ルール)を直接パラメータ化し、そのパラメータを報酬が高くなる方向に少しずつ更新していきます。
PPOは、方策の更新幅をクリッピングすることで、学習の安定性を向上させているのが特徴です。
具体的には、新しい方策と古い方策の比率が一定範囲内に収まるように制限することで、方策が急激に変化するのを防ぎます。
2. Genesisのシミュレーション環境 (go2_env.py)
go2_env.py
ファイルは、Genesisプラットフォームにおける強化学習の「環境」を定義します。Unitree Go2ロボットとエージェントの相互作用(状態、行動、報酬、シミュレーションのステップ)を記述します。
import torch
import math
import genesis as gs
from genesis.utils.geom import quat_to_xyz, transform_by_quat, inv_quat, transform_quat_by_quat
def gs_rand_float(lower, upper, shape, device):
return (upper - lower) * torch.rand(size=shape, device=device) + lower
class Go2Env:
def __init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_viewer=False, device="cuda"):
self.device = torch.device(device)
self.num_envs = num_envs
self.num_obs = obs_cfg["num_obs"]
self.num_privileged_obs = None # 特権情報は使用しない
self.num_actions = env_cfg["num_actions"]
self.num_commands = command_cfg["num_commands"]
self.simulate_action_latency = True # 実機での1ステップの遅延をシミュレート
self.dt = 0.02 # 制御周期 (実機は50Hz)
self.max_episode_length = math.ceil(env_cfg["episode_length_s"] / self.dt)
self.env_cfg = env_cfg
self.obs_cfg = obs_cfg
self.reward_cfg = reward_cfg
self.command_cfg = command_cfg
self.obs_scales = obs_cfg["obs_scales"]
self.reward_scales = reward_cfg["reward_scales"]
# create scene
self.scene = gs.Scene(
sim_options=gs.options.SimOptions(dt=self.dt, substeps=2),
viewer_options=gs.options.ViewerOptions(
max_FPS=int(0.5 / self.dt),
camera_pos=(2.0, 0.0, 2.5),
camera_lookat=(0.0, 0.0, 0.5),
camera_fov=40,
),
vis_options=gs.options.VisOptions(n_rendered_envs=1),
rigid_options=gs.options.RigidOptions(
dt=self.dt,
constraint_solver=gs.constraint_solver.Newton,
enable_collision=True,
enable_joint_limit=True,
),
show_viewer=show_viewer,
)
# add plain
self.scene.add_entity(gs.morphs.URDF(file="urdf/plane/plane.urdf", fixed=True))
# add robot
self.base_init_pos = torch.tensor(self.env_cfg["base_init_pos"], device=self.device)
self.base_init_quat = torch.tensor(self.env_cfg["base_init_quat"], device=self.device)
self.inv_base_init_quat = inv_quat(self.base_init_quat) # 初期姿勢の逆クォータニオン
self.robot = self.scene.add_entity(
gs.morphs.URDF(
file="urdf/go2/urdf/go2.urdf",
pos=self.base_init_pos.cpu().numpy(),
quat=self.base_init_quat.cpu().numpy(),
),
)
# build
self.scene.build(n_envs=num_envs)
# names to indices
self.motor_dofs = [self.robot.get_joint(name).dof_idx_local for name in self.env_cfg["dof_names"]]
# PD control parameters
self.robot.set_dofs_kp([self.env_cfg["kp"]] * self.num_actions, self.motor_dofs)
self.robot.set_dofs_kv([self.env_cfg["kd"]] * self.num_actions, self.motor_dofs)
# prepare reward functions and multiply reward scales by dt
self.reward_functions, self.episode_sums = dict(), dict()
for name in self.reward_scales.keys():
self.reward_scales[name] *= self.dt # 時間ステップを乗算
self.reward_functions[name] = getattr(self, "_reward_" + name) # Reward関数への参照
self.episode_sums[name] = torch.zeros((self.num_envs,), device=self.device, dtype=gs.tc_float)
# initialize buffers
self.base_lin_vel = torch.zeros((self.num_envs, 3), device=self.device, dtype=gs.tc_float)
self.base_ang_vel = torch.zeros((self.num_envs, 3), device=self.device, dtype=gs.tc_float)
self.projected_gravity = torch.zeros((self.num_envs, 3), device=self.device, dtype=gs.tc_float)
self.global_gravity = torch.tensor([0.0, 0.0, -1.0], device=self.device, dtype=gs.tc_float).repeat(
self.num_envs, 1
)
self.obs_buf = torch.zeros((self.num_envs, self.num_obs), device=self.device, dtype=gs.tc_float)
self.rew_buf = torch.zeros((self.num_envs,), device=self.device, dtype=gs.tc_float)
self.reset_buf = torch.ones((self.num_envs,), device=self.device, dtype=gs.tc_int)
self.episode_length_buf = torch.zeros((self.num_envs,), device=self.device, dtype=gs.tc_int)
self.commands = torch.zeros((self.num_envs, self.num_commands), device=self.device, dtype=gs.tc_float)
self.commands_scale = torch.tensor(
[self.obs_scales["lin_vel"], self.obs_scales["lin_vel"], self.obs_scales["ang_vel"]],
device=self.device,
dtype=gs.tc_float,
)
self.actions = torch.zeros((self.num_envs, self.num_actions), device=self.device, dtype=gs.tc_float)
self.last_actions = torch.zeros_like(self.actions)
self.dof_pos = torch.zeros_like(self.actions)
self.dof_vel = torch.zeros_like(self.actions)
self.last_dof_vel = torch.zeros_like(self.actions)
self.base_pos = torch.zeros((self.num_envs, 3), device=self.device, dtype=gs.tc_float)
self.base_quat = torch.zeros((self.num_envs, 4), device=self.device, dtype=gs.tc_float)
self.default_dof_pos = torch.tensor(
[self.env_cfg["default_joint_angles"][name] for name in self.env_cfg["dof_names"]],
device=self.device,
dtype=gs.tc_float,
)
self.extras = dict() # ログ用の追加情報
def _resample_commands(self, envs_idx):
self.commands[envs_idx, 0] = gs_rand_float(*self.command_cfg["lin_vel_x_range"], (len(envs_idx),), self.device)
self.commands[envs_idx, 1] = gs_rand_float(*self.command_cfg["lin_vel_y_range"], (len(envs_idx),), self.device)
self.commands[envs_idx, 2] = gs_rand_float(*self.command_cfg["ang_vel_range"], (len(envs_idx),), self.device)
def step(self, actions):
self.actions = torch.clip(actions, -self.env_cfg["clip_actions"], self.env_cfg["clip_actions"])
exec_actions = self.last_actions if self.simulate_action_latency else self.actions # 遅延を考慮
target_dof_pos = exec_actions * self.env_cfg["action_scale"] + self.default_dof_pos
self.robot.control_dofs_position(target_dof_pos, self.motor_dofs)
self.scene.step()
# update buffers
self.episode_length_buf += 1
self.base_pos[:] = self.robot.get_pos()
self.base_quat[:] = self.robot.get_quat()
self.base_euler = quat_to_xyz(
transform_quat_by_quat(torch.ones_like(self.base_quat) * self.inv_base_init_quat, self.base_quat)
)
inv_base_quat = inv_quat(self.base_quat)
self.base_lin_vel[:] = transform_by_quat(self.robot.get_vel(), inv_base_quat)
self.base_ang_vel[:] = transform_by_quat(self.robot.get_ang(), inv_base_quat)
self.projected_gravity = transform_by_quat(self.global_gravity, inv_base_quat)
self.dof_pos[:] = self.robot.get_dofs_position(self.motor_dofs)
self.dof_vel[:] = self.robot.get_dofs_velocity(self.motor_dofs)
# resample commands
envs_idx = (
(self.episode_length_buf % int(self.env_cfg["resampling_time_s"] / self.dt) == 0)
.nonzero(as_tuple=False)
.flatten()
)
self._resample_commands(envs_idx)
# check termination and reset
self.reset_buf = self.episode_length_buf > self.max_episode_length
self.reset_buf |= torch.abs(self.base_euler[:, 1]) > self.env_cfg["termination_if_pitch_greater_than"]
self.reset_buf |= torch.abs(self.base_euler[:, 0]) > self.env_cfg["termination_if_roll_greater_than"]
time_out_idx = (self.episode_length_buf > self.max_episode_length).nonzero(as_tuple=False).flatten()
self.extras["time_outs"] = torch.zeros_like(self.reset_buf, device=self.device, dtype=gs.tc_float)
self.extras["time_outs"][time_out_idx] = 1.0
self.reset_idx(self.reset_buf.nonzero(as_tuple=False).flatten())
# compute reward
self.rew_buf[:] = 0.0
for name, reward_func in self.reward_functions.items():
rew = reward_func() * self.reward_scales[name]
self.rew_buf += rew
self.episode_sums[name] += rew
# compute observations
self.obs_buf = torch.cat(
[
self.base_ang_vel * self.obs_scales["ang_vel"], # 3
self.projected_gravity, # 3
self.commands * self.commands_scale, # 3
(self.dof_pos - self.default_dof_pos) * self.obs_scales["dof_pos"], # 12
self.dof_vel * self.obs_scales["dof_vel"], # 12
self.actions, # 12
],
axis=-1,
)
self.last_actions[:] = self.actions[:]
self.last_dof_vel[:] = self.dof_vel[:]
return self.obs_buf, None, self.rew_buf, self.reset_buf, self.extras
def get_observations(self):
return self.obs_buf
def get_privileged_observations(self):
return None
def reset_idx(self, envs_idx):
if len(envs_idx) == 0:
return
# reset dofs
self.dof_pos[envs_idx] = self.default_dof_pos
self.dof_vel[envs_idx] = 0.0
self.robot.set_dofs_position(
position=self.dof_pos[envs_idx],
dofs_idx_local=self.motor_dofs,
zero_velocity=True,
envs_idx=envs_idx,
)
# reset base
self.base_pos[envs_idx] = self.base_init_pos
self.base_quat[envs_idx] = self.base_init_quat.reshape(1, -1)
self.robot.set_pos(self.base_pos[envs_idx], zero_velocity=False, envs_idx=envs_idx)
self.robot.set_quat(self.base_quat[envs_idx], zero_velocity=False, envs_idx=envs_idx)
self.base_lin_vel[envs_idx] = 0
self.base_ang_vel[envs_idx] = 0
self.robot.zero_all_dofs_velocity(envs_idx)
# reset buffers
self.last_actions[envs_idx] = 0.0
self.last_dof_vel[envs_idx] = 0.0
self.episode_length_buf[envs_idx] = 0
self.reset_buf[envs_idx] = True
# fill extras
self.extras["episode"] = {}
for key in self.episode_sums.keys():
self.extras["episode"]["rew_" + key] = (
torch.mean(self.episode_sums[key][envs_idx]).item() / self.env_cfg["episode_length_s"]
)
self.episode_sums[key][envs_idx] = 0.0
self._resample_commands(envs_idx)
def reset(self):
self.reset_buf[:] = True
self.reset_idx(torch.arange(self.num_envs, device=self.device))
return self.obs_buf, None
# ------------ reward functions----------------
def _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error / self.reward_cfg["tracking_sigma"])
def _reward_tracking_ang_vel(self):
# Tracking of angular velocity commands (yaw)
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
return torch.exp(-ang_vel_error / self.reward_cfg["tracking_sigma"])
def _reward_lin_vel_z(self):
# Penalize z axis base linear velocity
return torch.square(self.base_lin_vel[:, 2])
def _reward_action_rate(self):
# Penalize changes in actions
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)
def _reward_similar_to_default(self):
# Penalize joint poses far away from default pose
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1)
def _reward_base_height(self):
# Penalize base height away from target
return torch.square(self.base_pos[:, 2] - self.reward_cfg["base_height_target"])
2.1 Go2Env
クラス
__init__(self, num_envs, env_cfg, obs_cfg, reward_cfg, command_cfg, show_viewer=False, device="cuda")
環境の初期化を行います。
-
引数:
-
num_envs
: 並列実行する環境の数。 -
env_cfg
,obs_cfg
,reward_cfg
,command_cfg
: 環境、観測、報酬、コマンドに関する設定(YAMLファイルなどから読み込まれる)。 -
show_viewer
: シミュレーションビューアを表示するかどうか。 -
device
: 計算に使用するデバイス (CPU or GPU)。
-
-
内部処理:
- 各種設定の読み込みと初期化。
- Genesisの
Scene
オブジェクトの作成:-
SimOptions
: シミュレーションの基本設定(時間刻みdt
、サブステップ数など)。 -
ViewerOptions
: ビューアの設定(最大FPS、カメラ位置、視野角など)。 -
VisOptions
: 可視化オプション(表示する環境数)。 -
RigidOptions
: 剛体シミュレーションの設定(時間刻み、制約ソルバ、衝突判定、関節制限など)。
-
- シーンへのエンティティの追加 (
self.scene.add_entity()
):-
plane.urdf
: 地面(固定)。 -
go2.urdf
: Go2 ロボット。
-
-
self.scene.build()
: シミュレーション環境を構築。 -
motor_dofs
: 制御対象の関節のインデックスリストを取得。 -
self.robot.set_dofs_kp()
,self.robot.set_dofs_kv()
: 関節のP制御ゲイン(Kp)とD制御ゲイン(Kd)を設定。 - 報酬関数の準備:
-
self.reward_scales
: 各報酬項の重み。時間ステップdt
を乗算し、時間あたりの報酬になるように調整。 -
self.reward_functions
: 各報酬関数への参照を辞書形式で保持。getattr(self, "_reward_" + name)
で、_reward_...
という名前のメソッドを取得。 -
self.episode_sums
: 各報酬項の累積値を保持するバッファ。
-
- 各種バッファの初期化:
-
base_lin_vel
: 胴体の線形速度(ローカル座標系)。 -
base_ang_vel
: 胴体の角速度(ローカル座標系)。 -
projected_gravity
: 重力ベクトルを胴体座標系に射影。 -
obs_buf
: 観測を格納するバッファ。 -
rew_buf
: 報酬を格納するバッファ。 -
reset_buf
: 環境をリセットする必要があるかを示すフラグ (1: リセット, 0: 継続)。 -
episode_length_buf
: 各エピソードの長さをカウント。 -
commands
: 目標速度(線形速度 x, y, 角速度 z)。 -
commands_scale
: コマンドの値をスケーリングするための係数。 -
actions
: エージェントが出力する行動(関節の目標位置)。 -
last_actions
: 1ステップ前の行動。 -
dof_pos
: 関節角度。 -
dof_vel
: 関節角速度。 -
last_dof_vel
: 1ステップ前の関節角速度。 -
base_pos
: 胴体の位置。 -
base_quat
: 胴体の姿勢(クォータニオン)。 -
default_dof_pos
: 関節のデフォルト位置。 -
extras
: ログ用の追加情報を格納する辞書。
-
_resample_commands(self, envs_idx)
指定された環境 (envs_idx
) に対して、目標速度 (self.commands
) をランダムに再設定します。gs_rand_float
関数を使って、設定ファイル (command_cfg
) で指定された範囲から一様乱数を生成します。
step(self, actions)
エージェントから行動を受け取り、シミュレーションを1ステップ進めます。
-
行動のクリッピング:
actions
をclip_actions
で指定された範囲に制限。 -
行動の遅延:
simulate_action_latency
がTrue
の場合、1ステップ前の行動 (self.last_actions
) を使用。 -
目標関節位置の計算:
actions
にaction_scale
を掛け、default_dof_pos
を加算して目標関節位置を計算。 -
ロボットの制御:
self.robot.control_dofs_position()
で、計算された目標関節位置にロボットを制御。 -
シミュレーションの実行:
self.scene.step()
で、Genesis の物理シミュレーションを1ステップ進めます。 -
状態の更新:
-
episode_length_buf
: エピソード長をインクリメント。 -
base_pos
,base_quat
: ロボット胴体の位置と姿勢を取得。 -
base_euler
: 胴体姿勢をオイラー角(roll, pitch, yaw)に変換(Genesisユーティリティ関数quat_to_xyz
,transform_quat_by_quat
を使用)。 -
base_lin_vel
,base_ang_vel
: 胴体の線形/角速度を取得し、胴体座標系に変換(transform_by_quat
)。 -
projected_gravity
: 重力ベクトルを胴体座標系に変換。 -
dof_pos
,dof_vel
: 関節角度と角速度を取得。
-
-
コマンドの再サンプリング:
resampling_time_s
で指定された時間が経過するごとに、_resample_commands
を呼び出して目標速度を更新。 -
終了判定:
- エピソード長が
max_episode_length
を超えた場合。 - 胴体のピッチ角(
base_euler[:, 1]
)またはロール角(base_euler[:, 0]
)が閾値を超えた場合(転倒判定)。 -
time_out_idx
: 時間切れになった環境のインデックス。extras["time_outs"]
に記録。 -
reset_buf
: リセットが必要な環境のインデックスを記録。
- エピソード長が
-
環境のリセット:
reset_idx
を呼び出し、リセットが必要な環境をリセット。 -
報酬の計算:
-
rew_buf
を0で初期化。 -
reward_functions
に登録されている各報酬関数を呼び出し、対応するreward_scales
を掛けて、rew_buf
に加算。 - 各報酬項の累積値を
episode_sums
に加算。
-
-
観測の計算:
-
obs_buf
に観測値を格納。 - 観測値は、
obs_scales
でスケーリング。
-
-
バッファの更新:
last_actions
,last_dof_vel
を更新。 -
戻り値:
obs_buf
,None
,rew_buf
,reset_buf
,extras
を返す。
get_observations()
現在の観測 (obs_buf
) を返します。
get_privileged_observations()
特権情報を返します (今回は使用しないため None
)。
reset_idx(self, envs_idx)
指定された環境 (envs_idx
) をリセットします。
-
リセット対象の確認:
envs_idx
が空でないことを確認。 -
関節のリセット:
-
dof_pos
をdefault_dof_pos
に設定。 -
dof_vel
を0に設定。 -
self.robot.set_dofs_position()
で、関節位置と速度をリセット。
-
-
胴体のリセット:
-
base_pos
をbase_init_pos
に設定。 -
base_quat
をbase_init_quat
に設定。 -
self.robot.set_pos()
,self.robot.set_quat()
で、胴体の位置と姿勢をリセット。 -
base_lin_vel
,base_ang_vel
を0に設定。 -
self.robot.zero_all_dofs_velocity()
で、すべての関節速度を0にリセット。
-
-
バッファのリセット:
-
last_actions
,last_dof_vel
を0に設定。 -
episode_length_buf
を0に設定。 -
reset_buf
をTrue
に設定。
-
-
ログ情報の更新:
-
extras["episode"]
に、各報酬項の平均値を記録(1秒あたりの値)。 -
episode_sums
を0にリセット。
-
-
コマンドの再サンプリング:
_resample_commands
を呼び出して、目標速度を更新。
reset()
すべての環境をリセットします。reset_buf
をすべて True
にし、reset_idx
をすべての環境インデックスについて呼び出します。
報酬関数 (_reward_...
)
各報酬関数は、現在の状態に基づいて報酬値を計算し、torch.Tensor
として返します。報酬の設計は、タスクの目標に合わせて調整されます。
-
例:
_reward_tracking_lin_vel()
- 目標線形速度(xy軸)との誤差に基づく報酬を計算します。
- 誤差の二乗和を
tracking_sigma
でスケーリングした指数関数を使用します。
genesis.utils.geom
の関数
-
quat_to_xyz(q)
: クォータニオンq
をオイラー角 (roll, pitch, yaw) に変換。 -
transform_by_quat(v, q)
: ベクトルv
をクォータニオンq
で回転。 -
inv_quat(q)
: クォータニオンq
の逆クォータニオンを計算。 -
transform_quat_by_quat(q1, q2)
: クォータニオンq1
をクォータニオンq2
で回転 (クォータニオンの積)。
2.2 Go2Env クラスの役割
Go2Env
クラスは、主に以下の役割を担います。
-
状態の定義: エージェントが観測する状態空間を定義します。
- 例: 関節角度、角速度、胴体の姿勢、足先の位置など。
- これらの情報をどのように組み合わせて状態とするかは、タスクの性質や設計者の意図によって異なります。
-
行動の定義: エージェントが実行できる行動空間を定義します。
- 例: 各関節の目標トルク、目標角度など。
- 連続値 (Continuous) で表現するか、離散値 (Discrete) で表現するかは、タスクやアルゴリズムに合わせて選択します。
-
報酬の定義: エージェントの行動を評価するための報酬関数を定義します。
- 例: 前進速度、エネルギー効率、安定性などに基づいて報酬を与える。
- 報酬の設計は、強化学習の性能に大きく影響する重要な要素です。
-
環境のリセット (reset): 各エピソードの開始時に、環境を初期状態に戻します。
- Go2ロボットを初期姿勢に戻したり、ランダムな位置からスタートさせたりします。
-
ステップの実行 (step): エージェントから行動を受け取り、シミュレーションを1ステップ進め、次の状態、報酬、エピソード終了判定 (done)、追加情報 (info) を返します。
2.3 環境とのインタラクション
強化学習エージェントは、Go2Env
クラスの reset()
メソッドと step()
メソッドを通じて環境と相互作用します。
-
reset()
で環境を初期化し、最初の状態を観測します。 - 現在の状態に基づいて行動を選択します。
-
step()
に行動を渡し、次の状態、報酬、終了判定などを受け取ります。 - 2と3を繰り返し、エピソードが終了するまで学習を続けます。
3. 強化学習エージェントの訓練 (go2_train.py)
go2_train.py
は、go2_env.py
で定義された環境を用いて、PPO (Proximal Policy Optimization) アルゴリズムで Unitree Go2 ロボットの歩行を学習させるための訓練スクリプトです。
import argparse
import os
import pickle
import shutil
from go2_env import Go2Env
from rsl_rl.runners import OnPolicyRunner
import genesis as gs
def get_train_cfg(exp_name, max_iterations):
train_cfg_dict = {
"algorithm": {
"clip_param": 0.2,
"desired_kl": 0.01,
"entropy_coef": 0.01,
"gamma": 0.99,
"lam": 0.95,
"learning_rate": 0.001,
"max_grad_norm": 1.0,
"num_learning_epochs": 5,
"num_mini_batches": 4,
"schedule": "adaptive",
"use_clipped_value_loss": True,
"value_loss_coef": 1.0,
},
"init_member_classes": {},
"policy": {
"activation": "elu",
"actor_hidden_dims": [512, 256, 128],
"critic_hidden_dims": [512, 256, 128],
"init_noise_std": 1.0,
},
"runner": {
"algorithm_class_name": "PPO",
"checkpoint": -1,
"experiment_name": exp_name,
"load_run": -1,
"log_interval": 1,
"max_iterations": max_iterations,
"num_steps_per_env": 24,
"policy_class_name": "ActorCritic",
"record_interval": -1,
"resume": False,
"resume_path": None,
"run_name": "",
"runner_class_name": "runner_class_name",
"save_interval": 100,
},
"runner_class_name": "OnPolicyRunner",
"seed": 1,
}
return train_cfg_dict
def get_cfgs():
env_cfg = {
"num_actions": 12,
# joint/link names
"default_joint_angles": { # [rad]
"FL_hip_joint": 0.0,
"FR_hip_joint": 0.0,
"RL_hip_joint": 0.0,
"RR_hip_joint": 0.0,
"FL_thigh_joint": 0.8,
"FR_thigh_joint": 0.8,
"RL_thigh_joint": 1.0,
"RR_thigh_joint": 1.0,
"FL_calf_joint": -1.5,
"FR_calf_joint": -1.5,
"RL_calf_joint": -1.5,
"RR_calf_joint": -1.5,
},
"dof_names": [
"FR_hip_joint",
"FR_thigh_joint",
"FR_calf_joint",
"FL_hip_joint",
"FL_thigh_joint",
"FL_calf_joint",
"RR_hip_joint",
"RR_thigh_joint",
"RR_calf_joint",
"RL_hip_joint",
"RL_thigh_joint",
"RL_calf_joint",
],
# PD
"kp": 20.0,
"kd": 0.5,
# termination
"termination_if_roll_greater_than": 10, # degree
"termination_if_pitch_greater_than": 10,
# base pose
"base_init_pos": [0.0, 0.0, 0.42],
"base_init_quat": [1.0, 0.0, 0.0, 0.0],
"episode_length_s": 20.0,
"resampling_time_s": 4.0,
"action_scale": 0.25,
"simulate_action_latency": True,
"clip_actions": 100.0,
}
obs_cfg = {
"num_obs": 45,
"obs_scales": {
"lin_vel": 2.0,
"ang_vel": 0.25,
"dof_pos": 1.0,
"dof_vel": 0.05,
},
}
reward_cfg = {
"tracking_sigma": 0.25,
"base_height_target": 0.3,
"feet_height_target": 0.075,
"reward_scales": {
"tracking_lin_vel": 1.0,
"tracking_ang_vel": 0.2,
"lin_vel_z": -1.0,
"base_height": -50.0,
"action_rate": -0.005,
"similar_to_default": -0.1,
},
}
command_cfg = {
"num_commands": 3,
"lin_vel_x_range": [0.5, 0.5],
"lin_vel_y_range": [0, 0],
"ang_vel_range": [0, 0],
}
return env_cfg, obs_cfg, reward_cfg, command_cfg
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--exp_name", type=str, default="go2-walking")
parser.add_argument("-B", "--num_envs", type=int, default=4096)
parser.add_argument("--max_iterations", type=int, default=100)
args = parser.parse_args()
gs.init(logging_level="warning")
log_dir = f"logs/{args.exp_name}"
env_cfg, obs_cfg, reward_cfg, command_cfg = get_cfgs()
train_cfg = get_train_cfg(args.exp_name, args.max_iterations)
if os.path.exists(log_dir):
shutil.rmtree(log_dir)
os.makedirs(log_dir, exist_ok=True)
env = Go2Env(
num_envs=args.num_envs, env_cfg=env_cfg, obs_cfg=obs_cfg, reward_cfg=reward_cfg, command_cfg=command_cfg
)
runner = OnPolicyRunner(env, train_cfg, log_dir, device="cuda:0")
pickle.dump(
[env_cfg, obs_cfg, reward_cfg, command_cfg, train_cfg],
open(f"{log_dir}/cfgs.pkl", "wb"),
)
runner.learn(num_learning_iterations=args.max_iterations, init_at_random_ep_len=True)
if __name__ == "__main__":
main()
"""
# training
python examples/locomotion/go2_train.py
"""
3.1 主要な関数とクラス
get_train_cfg(exp_name, max_iterations)
PPOアルゴリズムのハイパーパラメータや学習の進行に関する設定(エポック数、ミニバッチサイズなど)を定義した辞書を返します。
-
algorithm
: PPOアルゴリズム自体の設定。-
clip_param
: 方策更新のクリッピング範囲。 -
desired_kl
: KLダイバージェンスの目標値(学習率の調整に使用)。 -
entropy_coef
: エントロピーボーナスの係数(方策の多様性を促進)。 -
gamma
: 割引率。 -
lam
: GAE (Generalized Advantage Estimation)のパラメータ。 -
learning_rate
: 学習率。 -
max_grad_norm
: 勾配のクリッピングの閾値。 -
num_learning_epochs
: 各更新ステップでの学習エポック数。 -
num_mini_batches
: ミニバッチの数。 -
schedule
: 学習率のスケジューリング方法('adaptive' または 'fixed')。 -
use_clipped_value_loss
: 価値関数の損失計算にクリッピングを使用するかどうか。 -
value_loss_coef
: 価値関数の損失の係数。
-
-
policy
: 方策ネットワークと価値ネットワークの設定。-
activation
: 活性化関数('elu')。 -
actor_hidden_dims
: 方策ネットワーク(Actor)の隠れ層の次元。 -
critic_hidden_dims
: 価値ネットワーク(Critic)の隠れ層の次元。 -
init_noise_std
: 行動の初期ノイズの標準偏差。
-
-
runner
: 学習の実行に関する設定。-
algorithm_class_name
: 使用するアルゴリズムのクラス名('PPO')。 -
checkpoint
: チェックポイントを読み込むステップ数(-1 は読み込まない)。 -
experiment_name
: 実験名。 -
load_run
: 読み込む実行ID(-1 は読み込まない)。 -
log_interval
: ログを出力する間隔(ステップ数)。 -
max_iterations
: 最大学習回数。 -
num_steps_per_env
: 各環境での1ステップで収集するデータの数。 -
policy_class_name
: 方策と価値関数をまとめるクラス名('ActorCritic')。 -
record_interval
: 動画を記録する間隔(-1 は記録しない)。 -
resume
: 学習を再開するかどうか。 -
resume_path
: 再開するチェックポイントのパス。 -
run_name
: 実行名。 -
save_interval
: モデルを保存する間隔。 -
runner_class_name
: 訓練ループを管理するクラス名('OnPolicyRunner')。 -
seed
: 乱数シード。
-
get_cfgs()
環境 (env_cfg
)、観測 (obs_cfg
)、報酬 (reward_cfg
)、コマンド (command_cfg
) に関する設定を定義した辞書を返します。これらの設定は、Go2Env
クラスの初期化に使用されます。
-
env_cfg
: 環境に関する設定。-
num_actions
: 行動の次元数(関節の数)。 -
default_joint_angles
: 関節のデフォルトの角度。 -
dof_names
: 関節の名前。 -
kp
: P制御の比例ゲイン。 -
kd
: P制御の微分ゲイン。 -
termination_if_roll_greater_than
,termination_if_pitch_greater_than
: 終了判定の閾値(ロール角とピッチ角)。 -
base_init_pos
,base_init_quat
: ロボットの初期位置と姿勢。 -
episode_length_s
: エピソードの最大秒数。 -
resampling_time_s
: コマンドを再サンプリングする間隔。 -
action_scale
: 行動のスケーリング係数。 -
simulate_action_latency
: 行動の遅延をシミュレートするかどうか。 -
clip_actions
: 行動のクリッピング範囲。
-
-
obs_cfg
: 観測に関する設定。-
num_obs
: 観測の次元数。 -
obs_scales
: 各観測値をスケーリングするための係数。
-
-
reward_cfg
: 報酬に関する設定。-
tracking_sigma
: 速度追従報酬の計算に使用するパラメータ。 -
base_height_target
: 目標とする胴体の高さ。 -
feet_height_target
: 目標とする足先の高さ(今回は使用されていない)。 -
reward_scales
: 各報酬項の重み。
-
-
command_cfg
: コマンドに関する設定。-
num_commands
: コマンドの次元 -
lin_vel_x_range
: x軸方向の最大最小速度 -
lin_vel_y_range
: y軸方向の最大最小速度 -
ang_vel_range
: 角速度の最大最小速度
-
main()
-
コマンドライン引数の解析:
argparse
を使用して、コマンドライン引数(実験名、環境数、最大学習回数)を解析。 -
Genesis の初期化:
gs.init()
で Genesis を初期化。 - ログディレクトリの作成: ログを保存するディレクトリを作成。
-
設定の取得:
get_cfgs()
とget_train_cfg()
を呼び出して設定を取得。 -
環境の準備:
Go2Env
インスタンスを作成し、強化学習環境を準備。 -
訓練ループの準備:
OnPolicyRunner
インスタンスを作成。OnPolicyRunner
は RSL-RL ライブラリのクラスで、PPO などのオンポリシー型強化学習アルゴリズムの訓練を管理。 -
設定の保存: 設定をファイル (
cfgs.pkl
) に保存 (pickle形式)。 -
学習の開始:
runner.learn()
を呼び出して学習を開始。-
num_learning_iterations
: 学習の繰り返し回数。 -
init_at_random_ep_len
: 各エピソードの開始時にランダムな長さで初期化するかどうか。
-
OnPolicyRunner
(RSL-RL ライブラリ)
このクラスが、PPOアルゴリズムによる学習の進行を管理します。learn()
メソッド内で、以下の処理が繰り返し実行されます。
-
データの収集:
-
env.reset()
で環境を初期化。 - 現在のポリシーに基づいて行動を選択し、
env.step()
を呼び出して環境を1ステップ進める。 - 状態、行動、報酬、次の状態、終了フラグなどの情報を収集。
-
-
Advantage (価値) の計算:
- 収集したデータと現在の価値関数を使用して、各ステップの Advantage を推定(GAEを使用)。
-
ポリシーと価値関数の更新:
- 収集したデータと Advantage を使用して、PPO アルゴリズムに基づいて方策ネットワークと価値ネットワークを更新。
- 複数エポック、ミニバッチに分割して学習。
-
ログの出力とモデルの保存:
- 定期的に、報酬、エピソード長、学習率などの情報をログに出力。
- 定期的に、モデルのチェックポイントを保存。
まとめ
この記事では、Genesisプラットフォームを用いた強化学習プロジェクト (Unitree Go2の歩行学習) について解説しました。
強化学習の基本的な概念、Genesisのシミュレーション環境 (go2_env.py
)、そして強化学習エージェントの訓練 (go2_train.py
) について説明しました。
Genesisは、物理シミュレーションと強化学習を組み合わせることで、ロボット制御などの複雑なタスクの学習を可能にする強力なプラットフォームです。
この記事が、Genesisを用いた強化学習プロジェクトの理解の一助となれば幸いです。
(Optional) さらに詳しく学ぶには
- Genesis公式ドキュメント: https://genesis-world.readthedocs.io/ja/latest/user_guide/