LoginSignup
0
0

はじめに

本記事は以下の人を対象としています.

  • 筋骨格シミュレータに興味がある人
  • MuJoCoに興味がある人

本記事では7_Fatigue_Modeling.ipynbをGoogle Colabで実行した際の手順と結果に関してまとめる.

MyoSuiteとは

MuJoCoを学習モジュールとして組み込んだ筋骨格シミュレータ

2024年にICRAにてworkshopを展開
https://sites.google.com/view/myosuite/myosymposium/icra24

Tutorial Notebook

実施事項

Tutorialに記載のコードを1つずつ試して, Errorになったところは都度修正版を記載する.

ライブラリのinstallと環境変数設定

!pip install -U myosuite
!pip install tabulate matplotlib torch gym==0.13 git+https://github.com/aravindr93/mjrl.git@pvr_beta_1vk
!pip install scikit-learn
!pip install stable-baselines3

%env MUJOCO_GL=egl

ライブラリのimportと関数設定

import myosuite
from myosuite.utils import gym
import skvideo.io
import numpy as np
import matplotlib.pyplot as plt
import pickle
import time
import os
import mujoco

from IPython.display import HTML
from base64 import b64encode

def show_video(video_path, video_width=400):

  video_file = open(video_path, "r+b").read()

  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"""<video autoplay width={video_width} controls><source src="{video_url}"></video>""")


import PIL.Image, PIL.ImageDraw, PIL.ImageFont

def add_text_to_frame(frame, text, pos=(20, 20), color=(255, 0, 0), fontsize=12):
    if isinstance(frame, np.ndarray):
        frame = PIL.Image.fromarray(frame)

    draw = PIL.ImageDraw.Draw(frame)
    draw.text(pos, text, fill=color)
    return frame

疲労モデルの設定

envFatigue = gym.make('myoFatiElbowPose1D6MRandom-v0', normalize_act=False)

## for comparison
env = gym.make('myoElbowPose1D6MRandom-v0', normalize_act=False)

MyoSuiteで提供されている疲労モデルは以下の論文をベースに構築しているとのこと
https://www.sciencedirect.com/science/article/abs/pii/S0021929018304342?via%3Dihub

筋肉の状態遷移の設定

上記論文にて以下3状態を設定し、今どこにいるかを予測できるようにしている

  • $M_{F}$: 疲労時、Fatigued
  • $M_{R}$: 安静時、Resting
  • $M_{A}$: 活動時、Active
envFatigue.unwrapped.muscle_fatigue.MF   #percentage of fatigued motor units for each muscle
envFatigue.unwrapped.muscle_fatigue.MR   #percentage of resting motor units for each muscle
envFatigue.unwrapped.muscle_fatigue.MA   #percentage of active motor units for each muscle

どれくらい早く回復するかについての係数を以下で設定

envFatigue.unwrapped.muscle_fatigue.set_RecoveryMultiplier(10)
envFatigue.unwrapped.muscle_fatigue.set_RecoveryCoefficient(0.0022)
envFatigue.unwrapped.muscle_fatigue.set_FatigueCoefficient(0.0146)

envFatigue.unwrapped.muscle_fatigue.r, envFatigue.unwrapped.muscle_fatigue.R, envFatigue.unwrapped.muscle_fatigue.F
(10, 0.0022, 0.0146)

envFatigue.unwrapped.muscle_fatigue.R, envFatigue.unwrapped.muscle_fatigue.Fはjointやmuscleによって適宜調整する必要がある

疲労状態のinitialize

完全回復状態

envFatigue.reset()
envFatigue.unwrapped.muscle_fatigue.MF, envFatigue.unwrapped.muscle_fatigue.MR, envFatigue.unwrapped.muscle_fatigue.MA
(array([0., 0., 0., 0., 0., 0.]),
 array([1., 1., 1., 1., 1., 1.]),
 array([0., 0., 0., 0., 0., 0.]))

randomな疲労状態へのreset

envFatigue.unwrapped.set_fatigue_reset_random(True)

envFatigue.reset()
envFatigue.unwrapped.muscle_fatigue.MF, envFatigue.unwrapped.muscle_fatigue.MR, envFatigue.unwrapped.muscle_fatigue.MA
(array([0.0653, 0.7334, 0.183 , 0.5725, 0.5757, 0.5274]),
 array([0.7949, 0.0634, 0.784 , 0.0369, 0.3437, 0.4019]),
 array([0.1398, 0.2033, 0.033 , 0.3906, 0.0807, 0.0707]))

疲労が徐々に増えていく様のシミュレーション

元コード
env.reset()
envFatigue.reset()
data_store = []
data_store_f = []
for i in range(7*3): # 7 batches of 3 episodes, with 2 episodes of maximum muscle controls for some muscles followed by a resting episode (i.e., zero muscle controls) in each batch
    a = np.zeros(env.unwrapped.sim.model.nu,)
    if i%3!=2:
        a[3:]=1
    else:
        a[:]=0

    for _ in range(500): # 500 samples (=10s) for each episode
        next_o, r, done, *_, ifo = env.step(a) # take an action
        next_f_o, r_f, done_F, *_, ifo_f = envFatigue.step(a) # take an action

        data_store.append({"action":a.copy(),
                            "jpos":env.unwrapped.sim.data.qpos.copy(),
                            "mlen":env.unwrapped.sim.data.actuator_length.copy(),
                            "act":env.unwrapped.sim.data.act.copy()})
        data_store_f.append({"action":a.copy(),
                            "jpos":envFatigue.unwrapped.sim.data.qpos.copy(),
                            "mlen":envFatigue.unwrapped.sim.data.actuator_length.copy(),
                            "MF":envFatigue.unwrapped.muscle_fatigue.MF.copy(),
                            "MR":envFatigue.unwrapped.muscle_fatigue.MR.copy(),
                            "MA":envFatigue.unwrapped.muscle_fatigue.MA.copy(),
                            "act":envFatigue.unwrapped.sim.data.act.copy()})

env.close()
envFatigue.close()

muscle_names = [env.unwrapped.sim.model.id2name(i, "actuator") for i in range(env.unwrapped.sim.model.nu) if env.unwrapped.sim.model.actuator_dyntype[i] == mujoco.mjtDyn.mjDYN_MUSCLE]
muscle_id = -1

plt.figure(figsize=(12, 6))
plt.subplot(221)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['act'][muscle_id] for d in data_store]), label="Normal model/Desired activations")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['act'][muscle_id] for d in data_store_f]), label='Fatigued model')
plt.legend()
plt.title(f'Muscle activations over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('act')

plt.subplot(222)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['jpos'] for d in data_store]), label="Normal model")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['jpos'] for d in data_store_f]), label="Fatigued model")
plt.legend()
plt.title('Joint angle over time')
plt.xlabel('time (s)'),plt.ylabel('angle')

plt.subplot(223)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['mlen'][muscle_id] for d in data_store]), label="Normal model")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['mlen'][muscle_id] for d in data_store_f]), label="Fatigued model")
plt.legend()
plt.title(f'Muscle lengths over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('muscle length')

plt.subplot(224)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['MF'][muscle_id] for d in data_store_f]), color="tab:orange")
plt.title(f'Fatigued motor units over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('%MVC')

plt.tight_layout()
plt.show()

出力結果
image.png

疲労のない通常のモデルと比較し,疲労によってactの効率が低下していたり,疲労が増えていることが見える

疲労を伴うagentの学習

from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback

env_name = "myoFatiElbowPose1D6MRandom-v0"

env = gym.make(env_name)
env.unwrapped.set_fatigue_reset_random(True)
env.reset()

# Save a checkpoint every 50 steps
checkpoint_callback = CheckpointCallback(
  save_freq=50, # 50step毎にiterationの結果を保存
  save_path=f"./{env_name}/iterations/",
  name_prefix="rl_model",
  save_replay_buffer=True,
  save_vecnormalize=True,
)

model = PPO("MlpPolicy", env, verbose=0)
model.learn(total_timesteps=200, callback=checkpoint_callback) # totalで2000step実施

PPOの中身を見ていないが,max_step=total_timesteps*10とかになってる?

学習をしたモデルを用いた結果の確認

シミュレートした結果をvideoで返す
env_name = "myoFatiElbowPose1D6MRandom-v0"

GENERATE_VIDEO = True
GENERATE_VIDEO_EPS = 4  #number of episodes that are rendered BOTH at the beginning (i.e., without fatigue) and at the end (i.e., with fatigue)

STORE_DATA = True  #store collected data from evaluation run in .npy file
n_eps = 250

###################################

env = gym.make(env_name)

from stable_baselines3 import PPO
model = PPO.load(f"{env_name}/iterations/rl_model_2000_steps")

env.unwrapped.set_fatigue_reset_random(False)
env.reset(fatigue_reset=True)  #ensure that fatigue is reset before the simulation starts

env.unwrapped.sim.model.cam_poscom0[0]= np.array([-1.3955, -0.3287,  0.6579])

data_store = []
if GENERATE_VIDEO:
    frames = []

env.unwrapped.target_jnt_value = env.unwrapped.target_jnt_range[:, 1]
env.unwrapped.target_type = 'fixed'
env.unwrapped.update_target(restore_sim=True)

start_time = time.time()
for ep in range(n_eps):
    print("Ep {} of {}".format(ep, n_eps))

    for _cstep in range(env.spec.max_episode_steps):
        if GENERATE_VIDEO and (ep in range(GENERATE_VIDEO_EPS) or ep in range(n_eps-GENERATE_VIDEO_EPS, n_eps)):
            frame = env.unwrapped.sim.renderer.render_offscreen(width=400, height=400, camera_id=0)

            # Add text overlay
            _current_time = (ep*env.spec.max_episode_steps + _cstep)*env.unwrapped.dt
            frame = np.array(add_text_to_frame(frame,
                    f"t={str(int(_current_time//60)).zfill(2)}:{str(int(_current_time%60)).zfill(2)}min",
                    pos=(285, 3), color=(0, 0, 0), fontsize=18))

            frames.append(frame)
        o = env.unwrapped.get_obs()
        a = model.predict(o)[0]
        next_o, r, done, _, ifo = env.step(a) # take an action based on the current observation

        data_store.append({"action":a.copy(),
                            "jpos":env.unwrapped.sim.data.qpos.copy(),
                            "mlen":env.unwrapped.sim.data.actuator_length.copy(),
                            "act":env.unwrapped.sim.data.act.copy(),
                            "reward":r,
                            "solved":env.unwrapped.rwd_dict['solved'].item(),
                            "pose_err":env.unwrapped.get_obs_dict(env.unwrapped.sim)["pose_err"],
                            "MA":env.unwrapped.muscle_fatigue.MA.copy(),
                            "MR":env.unwrapped.muscle_fatigue.MR.copy(),
                            "MF":env.unwrapped.muscle_fatigue.MF.copy(),
                            "ctrl":env.unwrapped.last_ctrl.copy()})
env.close()

## OPTIONALLY: Stored simulated data
if STORE_DATA:
    os.makedirs(f"{env_name}/logs", exist_ok=True)
    np.save(f"{env_name}/logs/fatitest.npy", data_store)

## OPTIONALLY: Render video
if GENERATE_VIDEO:
    os.makedirs(f'{env_name}/videos', exist_ok=True)
    # make a local copy
    skvideo.io.vwrite(f'{env_name}/videos/fatitest.mp4', np.asarray(frames),inputdict={'-r': str(int(1/env.unwrapped.dt))},outputdict={"-pix_fmt": "yuv420p"})

end_time = time.time()
print(f"DURATION: {end_time - start_time:.2f}s")

if GENERATE_VIDEO:
    display(show_video(f'{env_name}/videos/fatitest.mp4'))

7_fatigue_modeling-onlyfatigue.gif

疲労がなければもっと目標に近づく,って結果な気がする
疲労なしのバージョンも余裕あれば横に追加する

グラフで確認
env_name = "myoFatiElbowPose1D6MRandom-v0"

####################

env_test = gym.make(env_name, normalize_act=False)
muscle_names = [env_test.unwrapped.sim.model.id2name(i, "actuator") for i in range(env_test.unwrapped.sim.model.nu) if env_test.unwrapped.sim.model.actuator_dyntype[i] == mujoco.mjtDyn.mjDYN_MUSCLE]
_env_dt = env_test.unwrapped.dt  #0.02

data_store = np.load(f"{env_name}/logs/fatitest.npy", allow_pickle=True)

plt.figure()
for _muscleid in range(len(data_store[0]['MF'])):
    plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MF'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Fatigued Motor Units')

plt.figure()
for _muscleid in range(len(data_store[0]['MR'])):
    plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MR'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Resting Motor Units')

plt.figure()
for _muscleid in range(len(data_store[0]['MA'])):
    plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MA'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Active Motor Units')

plt.figure()
plt.plot(_env_dt*np.arange(len(data_store)), np.array([np.linalg.norm(d['pose_err']) for d in data_store])), plt.title('Pose Error')

plt.figure()
plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['reward'] for d in data_store])), plt.title(f"Reward (Total: {np.array([d['reward'] for d in data_store]).sum():.2f})")

if "solved" in data_store[0]:
    plt.figure()
    plt.scatter(_env_dt*np.arange(len(data_store))[np.array([d['solved'] for d in data_store])], np.array([d['solved'] for d in data_store])[np.array([d['solved'] for d in data_store])]), plt.title(f"Success")

print(f"Muscle Fatigue Equilibrium: {data_store[-1]['MF']}")

image.png

他にもいくつか結果が出力されるが,結構多いのとよくわからないのでわかりやすいPose Errorの画像だけ表示

本Tutorialのまとめ

  • 疲労を考慮したモデルの学習の仕方,結果の表示についてのTutorial
  • 用意されている筋肉の状態はFatigued, Resting, Activeの3状態
  • gym.make.unwrapped.muscle_fatigueのインスタンスで疲労度の調整が可能
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0