0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Mujoco ゲームパッド Logicool F310 でアームロボット向け模倣学習のお手本を作成

0
Posted at

はじめに

前回まで強化学習にトライしました。今回は、Mujoco の画面見ながらゲームパッド Logicool F310 でアームロボット向け模倣学習のお手本データを作成します。

準備

 pygame をインストールしておきます。

pip install pygame

 ゲームパッド Logicool F310 は、背面のスイッチで X(XInput)と D(DirectInput)を切り替えることができます。今回は、スイッチを X 側にします。

 そして、アームの第一関節、第二関節を左スティックでコントローすることにします。
ゲームパッドの Start ボタンで終了します。終了とともに、タイムスタンプを利用したファイル名で動作の内容を保存します。

実装

 では、実装するプログラムです。

import gymnasium as gym
import numpy as np
import pygame
import pickle
import os

from datetime import datetime

# =========================================
# 保存フォルダ作成
# =========================================
os.makedirs("data", exist_ok=True)

# =========================================
# 保存ファイル名
# =========================================
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

save_path = f"data/demo_dataset_{timestamp}.pkl"

print("save_path =", save_path)

# =========================================
# MuJoCo 環境
# =========================================
env = gym.make(
    "Reacher-v5",
    render_mode="human",
    width=1400,
    height=900,
    max_episode_steps=5000
)

obs, info = env.reset()

# =========================================
# ターゲット固定
# =========================================
target_x = 0.15
target_y = 0.15

env.unwrapped.data.qpos[-2] = target_x
env.unwrapped.data.qpos[-1] = target_y

# =========================================
# カメラ設定
# =========================================
viewer = env.unwrapped.mujoco_renderer.viewer

viewer.cam.distance = 0.7
viewer.cam.elevation = -20

# =========================================
# pygame 初期化
# =========================================
pygame.init()
pygame.joystick.init()

if pygame.joystick.get_count() == 0:
    raise Exception("ゲームパッドが見つかりません")

joystick = pygame.joystick.Joystick(0)
joystick.init()

print("gamepad =", joystick.get_name())

# =========================================
# deadzone 関数
# =========================================
def deadzone(x, dz=0.05):

    if abs(x) < dz:
        return 0.0

    return x

# =========================================
# action gain
# =========================================
gain = 0.05

# =========================================
# dataset
# =========================================
dataset = []

# =========================================
# main loop
# =========================================
running = True
step = 0

try:

    while running:

        # ---------------------------------
        # pygame event 更新
        # ---------------------------------
        pygame.event.pump()

        # ---------------------------------
        # STARTボタンで終了
        # F310(XInput)では通常 7
        # ---------------------------------
        if joystick.get_button(7):

            print("START button pressed")
            running = False
            continue

        # ---------------------------------
        # ゲームパッド入力
        # ---------------------------------
        axis0 = deadzone(joystick.get_axis(0))
        axis1 = deadzone(joystick.get_axis(1))

        # action
        action = gain * np.array(
            [axis0, axis1],
            dtype=np.float32
        )

        # ---------------------------------
        # env step
        # ---------------------------------
        next_obs, reward, terminated, truncated, info = env.step(action)

        # ---------------------------------
        # dataset追加
        # ---------------------------------
        dataset.append({
            "obs": obs,
            "action": action,
            "reward": reward
        })

        # ---------------------------------
        # 表示
        # ---------------------------------
        print(
            f"step={step}",
            f"action={action}",
            f"reward={reward:.3f}"
        )

        # ---------------------------------
        # autosave
        # ---------------------------------
        if step % 100 == 0:

            with open(save_path, "wb") as f:
                pickle.dump(dataset, f)

            print("autosaved")

        # ---------------------------------
        # 次状態
        # ---------------------------------
        obs = next_obs
        step += 1

        # ---------------------------------
        # episode reset
        # ---------------------------------
        if terminated or truncated:

            print("Episode finished")

            obs, info = env.reset()

            # ターゲット再固定
            env.unwrapped.data.qpos[-2] = target_x
            env.unwrapped.data.qpos[-1] = target_y

finally:

    # =====================================
    # 最終保存
    # =====================================
    with open(save_path, "wb") as f:
        pickle.dump(dataset, f)

    print("dataset saved")

    # =====================================
    # 終了処理
    # =====================================
    env.close()
    pygame.quit()

    print("program finished")

 なお、実行すると以下のWarningがでます。今回使っている pkg_resources が、将来使えなくなるよというお知らせです。現時点では、使えるので無視します(将来は使えなくなるかも)。

C:\Users\abc\AppData\Local\miniconda3\envs\mujoco\lib\site-packages\pygame\pkgdata.py:25: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.

 実行してMujoco窓でターゲットにアーム先端を近づけるようにゲームパッドを操作してから、「Startボタン」を押して終了させると、操作内容が DATA フォルダーに ###.pkl が保存されます。実行時のターミナルでの表示例を以下に示します。

save_path = data/demo_dataset_20260518_182918.pkl
gamepad = Controller (Gamepad F310)
step=0 action=[0. 0.] reward=-0.140
autosaved
step=1 action=[0. 0.] reward=-0.140
step=2 action=[0. 0.] reward=-0.140
step=3 action=[0. 0.] reward=-0.140
.....
step=466 action=[0. 0.] reward=-0.066
step=467 action=[0. 0.] reward=-0.066
step=468 action=[0. 0.] reward=-0.066
START button pressed
dataset saved
program finished

何回か実行して、中身を確認してみます。確認方法は、以下を inspect_dataset.py の名前で保存して実行するだけです。

import pickle
import glob
import numpy as np

# =========================================
# pkl ファイル一覧取得
# =========================================
files = glob.glob("data/*.pkl")

print("found files =", len(files))

for i, file in enumerate(files):

    print(f"[{i}] {file}")

# =========================================
# 最初のファイルを読む
# =========================================
file_path = files[0]

print()
print("loading =", file_path)

with open(file_path, "rb") as f:

    dataset = pickle.load(f)

# =========================================
# dataset 基本情報
# =========================================
print()
print("dataset length =", len(dataset))

# =========================================
# 最初の1サンプル表示
# =========================================
sample = dataset[0]

print()
print("sample keys =", sample.keys())

print()
print("obs shape =", np.array(sample["obs"]).shape)

print("action shape =", np.array(sample["action"]).shape)

print("reward =", sample["reward"])

# =========================================
# 中身確認
# =========================================
print()
print("obs =")
print(sample["obs"])

print()
print("action =")
print(sample["action"])

# =========================================
# 全体統計
# =========================================
obs_list = []
action_list = []
reward_list = []

for data in dataset:

    obs_list.append(data["obs"])
    action_list.append(data["action"])
    reward_list.append(data["reward"])

obs_array = np.array(obs_list)
action_array = np.array(action_list)
reward_array = np.array(reward_list)

print()
print("obs_array shape =", obs_array.shape)

print("action_array shape =", action_array.shape)

print("reward mean =", reward_array.mean())

print("reward min =", reward_array.min())

print("reward max =", reward_array.max())

print()
print("action abs mean =",
      np.abs(action_array).mean())

print("action abs max =",
      np.abs(action_array).max())

 inspect_dataset.py の実行時の表示例を以下に示します。二回試したので、pkl ファイルが二つできています。

found files = 2
[0] data\demo_dataset_20260518_182728.pkl
[1] data\demo_dataset_20260518_182918.pkl

loading = data\demo_dataset_20260518_182728.pkl

dataset length = 930

sample keys = dict_keys(['obs', 'action', 'reward'])

obs shape = (10,)
action shape = (2,)
reward = -0.15734788175607903

obs =
[ 0.99977933  0.9967627  -0.02100681  0.08039975  0.0674552   0.01173788
  0.00146486  0.00337638  0.14232822 -0.00729981]

action =
[0. 0.]

obs_array shape = (930, 10)
action_array shape = (930, 2)
reward mean = -0.1533252871789793
reward min = -0.413994888729662
reward max = -0.009019351791080646

action abs mean = 0.0023440116
action abs max = 0.05

pklファイルを使って、模倣学習の基本である Behavior Cloning の学習をします。train_bc.py の名前で保存して、実行します。

import os
import glob
import pickle
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# =========================================
# データ読み込み
# =========================================
files = glob.glob("data/*.pkl")
print("found files =", len(files))

all_obs = []
all_action = []

for f in files:

    print("loading =", f)

    with open(f, "rb") as fp:
        data = pickle.load(fp)

    for d in data:
        all_obs.append(d["obs"])
        all_action.append(d["action"])

obs = np.array(all_obs, dtype=np.float32)
action = np.array(all_action, dtype=np.float32)

print("obs shape =", obs.shape)
print("action shape =", action.shape)

# =========================================
# Dataset定義
# =========================================
class BCdataset(Dataset):

    def __init__(self, obs, action):

        self.obs = torch.tensor(obs, dtype=torch.float32)
        self.action = torch.tensor(action, dtype=torch.float32)

    def __len__(self):
        return len(self.obs)

    def __getitem__(self, idx):

        return self.obs[idx], self.action[idx]


dataset = BCdataset(obs, action)
loader = DataLoader(dataset, batch_size=64, shuffle=True)

# =========================================
# ネットワーク(MLP)
# =========================================
class Policy(nn.Module):

    def __init__(self, obs_dim, action_dim):

        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.net(x)


obs_dim = obs.shape[1]
action_dim = action.shape[1]

model = Policy(obs_dim, action_dim)

# =========================================
# 学習設定
# =========================================
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

# =========================================
# 学習ループ
# =========================================
epochs = 20

for epoch in range(epochs):

    total_loss = 0.0

    for o, a in loader:

        pred = model(o)
        loss = loss_fn(pred, a)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"epoch {epoch+1}/{epochs}, loss = {total_loss:.6f}")

# =========================================
# モデル保存
# =========================================
os.makedirs("models", exist_ok=True)

torch.save(model.state_dict(), "models/bc_policy.pth")

print("model saved -> models/bc_policy.pth")

実行時の表示を以下に示します。2つの pkl ファイルを読み込んで処理しています。エポック数20で、徐々に loss が減少していることがわかります。

found files = 2
loading = data\demo_dataset_20260518_182728.pkl
loading = data\demo_dataset_20260518_182918.pkl
obs shape = (1399, 10)
action shape = (1399, 2)
epoch 1/20, loss = 0.012059
epoch 2/20, loss = 0.002140
epoch 3/20, loss = 0.001143
epoch 4/20, loss = 0.000813
epoch 5/20, loss = 0.000639
epoch 6/20, loss = 0.000523
epoch 7/20, loss = 0.000484
epoch 8/20, loss = 0.000481
epoch 9/20, loss = 0.000397
epoch 10/20, loss = 0.000386
epoch 11/20, loss = 0.000375
epoch 12/20, loss = 0.000333
epoch 13/20, loss = 0.000307
epoch 14/20, loss = 0.000278
epoch 15/20, loss = 0.000260
epoch 16/20, loss = 0.000285
epoch 17/20, loss = 0.000234
epoch 18/20, loss = 0.000231
epoch 19/20, loss = 0.000222
epoch 20/20, loss = 0.000217
model saved -> models/bc_policy.pth

では、評価します。以下を eval_bc.py の名前で保存して、実行します。

import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn

# =========================================
# ネットワーク定義(trainと完全一致必須)
# =========================================
class Policy(nn.Module):

    def __init__(self, obs_dim, action_dim):

        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.net(x)

# =========================================
# 環境
# =========================================
env = gym.make(
    "Reacher-v5",
    render_mode="human",
    width=1400,
    height=900
)

obs, info = env.reset()

# =========================================
# カメラ調整(見やすく)
# =========================================
viewer = env.unwrapped.mujoco_renderer.viewer
viewer.cam.distance = 0.7
viewer.cam.elevation = -20

# =========================================
# モデル読み込み
# =========================================
model = Policy(obs_dim=10, action_dim=2)  # Reacher-v5前提

model.load_state_dict(torch.load("models/bc_policy.pth"))
model.eval()

print("model loaded")

# =========================================
# メインループ
# =========================================
step = 0
episode_reward = 0.0

while True:

    # -------------------------------------
    # state → tensor
    # -------------------------------------
    obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)

    # -------------------------------------
    # 推論
    # -------------------------------------
    with torch.no_grad():
        action = model(obs_tensor).numpy().squeeze()

    # -------------------------------------
    # 環境ステップ
    # -------------------------------------
    next_obs, reward, terminated, truncated, info = env.step(action)

    episode_reward += reward

    # -------------------------------------
    # 表示
    # -------------------------------------
    print(
        f"step={step}",
        f"action={action}",
        f"reward={reward:.3f}",
        f"ep_rew={episode_reward:.3f}"
    )

    obs = next_obs
    step += 1

    # -------------------------------------
    # episode終了
    # -------------------------------------
    if terminated or truncated:

        print("\nEpisode finished")
        print("Total episode reward =", episode_reward, "\n")

        obs, info = env.reset()

        episode_reward = 0.0

        step = 0

 今回は2ケースしか学習元がないので、アーム先端はターゲットにうまく到達できていませんでした。もっと、増やせばターゲットに到達できるような気がします。

おわりに

 Mujocoでゲームパッドが使えるようになりました。Behavior Cloning できるほどデータ採取してないので、検証結果もそれなりでした。色々できるようになってきたので、楽しめそうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?