はじめに
前回まで強化学習にトライしました。今回は、Mujoco の画面見ながらゲームパッド Logicool F310 でアームロボット向け模倣学習のお手本データを作成します。
準備
pygame をインストールしておきます。
pip install pygame
ゲームパッド Logicool F310 は、背面のスイッチで X(XInput)と D(DirectInput)を切り替えることができます。今回は、スイッチを X 側にします。
そして、アームの第一関節、第二関節を左スティックでコントローすることにします。
ゲームパッドの Start ボタンで終了します。終了とともに、タイムスタンプを利用したファイル名で動作の内容を保存します。
実装
では、実装するプログラムです。
import gymnasium as gym
import numpy as np
import pygame
import pickle
import os
from datetime import datetime
# =========================================
# 保存フォルダ作成
# =========================================
os.makedirs("data", exist_ok=True)
# =========================================
# 保存ファイル名
# =========================================
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = f"data/demo_dataset_{timestamp}.pkl"
print("save_path =", save_path)
# =========================================
# MuJoCo 環境
# =========================================
env = gym.make(
"Reacher-v5",
render_mode="human",
width=1400,
height=900,
max_episode_steps=5000
)
obs, info = env.reset()
# =========================================
# ターゲット固定
# =========================================
target_x = 0.15
target_y = 0.15
env.unwrapped.data.qpos[-2] = target_x
env.unwrapped.data.qpos[-1] = target_y
# =========================================
# カメラ設定
# =========================================
viewer = env.unwrapped.mujoco_renderer.viewer
viewer.cam.distance = 0.7
viewer.cam.elevation = -20
# =========================================
# pygame 初期化
# =========================================
pygame.init()
pygame.joystick.init()
if pygame.joystick.get_count() == 0:
raise Exception("ゲームパッドが見つかりません")
joystick = pygame.joystick.Joystick(0)
joystick.init()
print("gamepad =", joystick.get_name())
# =========================================
# deadzone 関数
# =========================================
def deadzone(x, dz=0.05):
if abs(x) < dz:
return 0.0
return x
# =========================================
# action gain
# =========================================
gain = 0.05
# =========================================
# dataset
# =========================================
dataset = []
# =========================================
# main loop
# =========================================
running = True
step = 0
try:
while running:
# ---------------------------------
# pygame event 更新
# ---------------------------------
pygame.event.pump()
# ---------------------------------
# STARTボタンで終了
# F310(XInput)では通常 7
# ---------------------------------
if joystick.get_button(7):
print("START button pressed")
running = False
continue
# ---------------------------------
# ゲームパッド入力
# ---------------------------------
axis0 = deadzone(joystick.get_axis(0))
axis1 = deadzone(joystick.get_axis(1))
# action
action = gain * np.array(
[axis0, axis1],
dtype=np.float32
)
# ---------------------------------
# env step
# ---------------------------------
next_obs, reward, terminated, truncated, info = env.step(action)
# ---------------------------------
# dataset追加
# ---------------------------------
dataset.append({
"obs": obs,
"action": action,
"reward": reward
})
# ---------------------------------
# 表示
# ---------------------------------
print(
f"step={step}",
f"action={action}",
f"reward={reward:.3f}"
)
# ---------------------------------
# autosave
# ---------------------------------
if step % 100 == 0:
with open(save_path, "wb") as f:
pickle.dump(dataset, f)
print("autosaved")
# ---------------------------------
# 次状態
# ---------------------------------
obs = next_obs
step += 1
# ---------------------------------
# episode reset
# ---------------------------------
if terminated or truncated:
print("Episode finished")
obs, info = env.reset()
# ターゲット再固定
env.unwrapped.data.qpos[-2] = target_x
env.unwrapped.data.qpos[-1] = target_y
finally:
# =====================================
# 最終保存
# =====================================
with open(save_path, "wb") as f:
pickle.dump(dataset, f)
print("dataset saved")
# =====================================
# 終了処理
# =====================================
env.close()
pygame.quit()
print("program finished")
なお、実行すると以下のWarningがでます。今回使っている pkg_resources が、将来使えなくなるよというお知らせです。現時点では、使えるので無視します(将来は使えなくなるかも)。
C:\Users\abc\AppData\Local\miniconda3\envs\mujoco\lib\site-packages\pygame\pkgdata.py:25: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
実行してMujoco窓でターゲットにアーム先端を近づけるようにゲームパッドを操作してから、「Startボタン」を押して終了させると、操作内容が DATA フォルダーに ###.pkl が保存されます。実行時のターミナルでの表示例を以下に示します。
save_path = data/demo_dataset_20260518_182918.pkl
gamepad = Controller (Gamepad F310)
step=0 action=[0. 0.] reward=-0.140
autosaved
step=1 action=[0. 0.] reward=-0.140
step=2 action=[0. 0.] reward=-0.140
step=3 action=[0. 0.] reward=-0.140
.....
step=466 action=[0. 0.] reward=-0.066
step=467 action=[0. 0.] reward=-0.066
step=468 action=[0. 0.] reward=-0.066
START button pressed
dataset saved
program finished
何回か実行して、中身を確認してみます。確認方法は、以下を inspect_dataset.py の名前で保存して実行するだけです。
import pickle
import glob
import numpy as np
# =========================================
# pkl ファイル一覧取得
# =========================================
files = glob.glob("data/*.pkl")
print("found files =", len(files))
for i, file in enumerate(files):
print(f"[{i}] {file}")
# =========================================
# 最初のファイルを読む
# =========================================
file_path = files[0]
print()
print("loading =", file_path)
with open(file_path, "rb") as f:
dataset = pickle.load(f)
# =========================================
# dataset 基本情報
# =========================================
print()
print("dataset length =", len(dataset))
# =========================================
# 最初の1サンプル表示
# =========================================
sample = dataset[0]
print()
print("sample keys =", sample.keys())
print()
print("obs shape =", np.array(sample["obs"]).shape)
print("action shape =", np.array(sample["action"]).shape)
print("reward =", sample["reward"])
# =========================================
# 中身確認
# =========================================
print()
print("obs =")
print(sample["obs"])
print()
print("action =")
print(sample["action"])
# =========================================
# 全体統計
# =========================================
obs_list = []
action_list = []
reward_list = []
for data in dataset:
obs_list.append(data["obs"])
action_list.append(data["action"])
reward_list.append(data["reward"])
obs_array = np.array(obs_list)
action_array = np.array(action_list)
reward_array = np.array(reward_list)
print()
print("obs_array shape =", obs_array.shape)
print("action_array shape =", action_array.shape)
print("reward mean =", reward_array.mean())
print("reward min =", reward_array.min())
print("reward max =", reward_array.max())
print()
print("action abs mean =",
np.abs(action_array).mean())
print("action abs max =",
np.abs(action_array).max())
inspect_dataset.py の実行時の表示例を以下に示します。二回試したので、pkl ファイルが二つできています。
found files = 2
[0] data\demo_dataset_20260518_182728.pkl
[1] data\demo_dataset_20260518_182918.pkl
loading = data\demo_dataset_20260518_182728.pkl
dataset length = 930
sample keys = dict_keys(['obs', 'action', 'reward'])
obs shape = (10,)
action shape = (2,)
reward = -0.15734788175607903
obs =
[ 0.99977933 0.9967627 -0.02100681 0.08039975 0.0674552 0.01173788
0.00146486 0.00337638 0.14232822 -0.00729981]
action =
[0. 0.]
obs_array shape = (930, 10)
action_array shape = (930, 2)
reward mean = -0.1533252871789793
reward min = -0.413994888729662
reward max = -0.009019351791080646
action abs mean = 0.0023440116
action abs max = 0.05
pklファイルを使って、模倣学習の基本である Behavior Cloning の学習をします。train_bc.py の名前で保存して、実行します。
import os
import glob
import pickle
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
# =========================================
# データ読み込み
# =========================================
files = glob.glob("data/*.pkl")
print("found files =", len(files))
all_obs = []
all_action = []
for f in files:
print("loading =", f)
with open(f, "rb") as fp:
data = pickle.load(fp)
for d in data:
all_obs.append(d["obs"])
all_action.append(d["action"])
obs = np.array(all_obs, dtype=np.float32)
action = np.array(all_action, dtype=np.float32)
print("obs shape =", obs.shape)
print("action shape =", action.shape)
# =========================================
# Dataset定義
# =========================================
class BCdataset(Dataset):
def __init__(self, obs, action):
self.obs = torch.tensor(obs, dtype=torch.float32)
self.action = torch.tensor(action, dtype=torch.float32)
def __len__(self):
return len(self.obs)
def __getitem__(self, idx):
return self.obs[idx], self.action[idx]
dataset = BCdataset(obs, action)
loader = DataLoader(dataset, batch_size=64, shuffle=True)
# =========================================
# ネットワーク(MLP)
# =========================================
class Policy(nn.Module):
def __init__(self, obs_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
def forward(self, x):
return self.net(x)
obs_dim = obs.shape[1]
action_dim = action.shape[1]
model = Policy(obs_dim, action_dim)
# =========================================
# 学習設定
# =========================================
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
# =========================================
# 学習ループ
# =========================================
epochs = 20
for epoch in range(epochs):
total_loss = 0.0
for o, a in loader:
pred = model(o)
loss = loss_fn(pred, a)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"epoch {epoch+1}/{epochs}, loss = {total_loss:.6f}")
# =========================================
# モデル保存
# =========================================
os.makedirs("models", exist_ok=True)
torch.save(model.state_dict(), "models/bc_policy.pth")
print("model saved -> models/bc_policy.pth")
実行時の表示を以下に示します。2つの pkl ファイルを読み込んで処理しています。エポック数20で、徐々に loss が減少していることがわかります。
found files = 2
loading = data\demo_dataset_20260518_182728.pkl
loading = data\demo_dataset_20260518_182918.pkl
obs shape = (1399, 10)
action shape = (1399, 2)
epoch 1/20, loss = 0.012059
epoch 2/20, loss = 0.002140
epoch 3/20, loss = 0.001143
epoch 4/20, loss = 0.000813
epoch 5/20, loss = 0.000639
epoch 6/20, loss = 0.000523
epoch 7/20, loss = 0.000484
epoch 8/20, loss = 0.000481
epoch 9/20, loss = 0.000397
epoch 10/20, loss = 0.000386
epoch 11/20, loss = 0.000375
epoch 12/20, loss = 0.000333
epoch 13/20, loss = 0.000307
epoch 14/20, loss = 0.000278
epoch 15/20, loss = 0.000260
epoch 16/20, loss = 0.000285
epoch 17/20, loss = 0.000234
epoch 18/20, loss = 0.000231
epoch 19/20, loss = 0.000222
epoch 20/20, loss = 0.000217
model saved -> models/bc_policy.pth
では、評価します。以下を eval_bc.py の名前で保存して、実行します。
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
# =========================================
# ネットワーク定義(trainと完全一致必須)
# =========================================
class Policy(nn.Module):
def __init__(self, obs_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
def forward(self, x):
return self.net(x)
# =========================================
# 環境
# =========================================
env = gym.make(
"Reacher-v5",
render_mode="human",
width=1400,
height=900
)
obs, info = env.reset()
# =========================================
# カメラ調整(見やすく)
# =========================================
viewer = env.unwrapped.mujoco_renderer.viewer
viewer.cam.distance = 0.7
viewer.cam.elevation = -20
# =========================================
# モデル読み込み
# =========================================
model = Policy(obs_dim=10, action_dim=2) # Reacher-v5前提
model.load_state_dict(torch.load("models/bc_policy.pth"))
model.eval()
print("model loaded")
# =========================================
# メインループ
# =========================================
step = 0
episode_reward = 0.0
while True:
# -------------------------------------
# state → tensor
# -------------------------------------
obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
# -------------------------------------
# 推論
# -------------------------------------
with torch.no_grad():
action = model(obs_tensor).numpy().squeeze()
# -------------------------------------
# 環境ステップ
# -------------------------------------
next_obs, reward, terminated, truncated, info = env.step(action)
episode_reward += reward
# -------------------------------------
# 表示
# -------------------------------------
print(
f"step={step}",
f"action={action}",
f"reward={reward:.3f}",
f"ep_rew={episode_reward:.3f}"
)
obs = next_obs
step += 1
# -------------------------------------
# episode終了
# -------------------------------------
if terminated or truncated:
print("\nEpisode finished")
print("Total episode reward =", episode_reward, "\n")
obs, info = env.reset()
episode_reward = 0.0
step = 0
今回は2ケースしか学習元がないので、アーム先端はターゲットにうまく到達できていませんでした。もっと、増やせばターゲットに到達できるような気がします。
おわりに
Mujocoでゲームパッドが使えるようになりました。Behavior Cloning できるほどデータ採取してないので、検証結果もそれなりでした。色々できるようになってきたので、楽しめそうです。