はじめに
本記事は以下の人を対象としています.
- 筋骨格シミュレータに興味がある人
- MuJoCoに興味がある人
本記事では7_Fatigue_Modeling.ipynbをGoogle Colabで実行した際の手順と結果に関してまとめる.
MyoSuiteとは
MuJoCoを学習モジュールとして組み込んだ筋骨格シミュレータ
2024年にICRAにてworkshopを展開
https://sites.google.com/view/myosuite/myosymposium/icra24
Tutorial Notebook
実施事項
Tutorialに記載のコードを1つずつ試して, Errorになったところは都度修正版を記載する.
ライブラリのinstallと環境変数設定
!pip install -U myosuite
!pip install tabulate matplotlib torch gym==0.13 git+https://github.com/aravindr93/mjrl.git@pvr_beta_1vk
!pip install scikit-learn
!pip install stable-baselines3
%env MUJOCO_GL=egl
ライブラリのimportと関数設定
import myosuite
from myosuite.utils import gym
import skvideo.io
import numpy as np
import matplotlib.pyplot as plt
import pickle
import time
import os
import mujoco
from IPython.display import HTML
from base64 import b64encode
def show_video(video_path, video_width=400):
video_file = open(video_path, "r+b").read()
video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
return HTML(f"""<video autoplay width={video_width} controls><source src="{video_url}"></video>""")
import PIL.Image, PIL.ImageDraw, PIL.ImageFont
def add_text_to_frame(frame, text, pos=(20, 20), color=(255, 0, 0), fontsize=12):
if isinstance(frame, np.ndarray):
frame = PIL.Image.fromarray(frame)
draw = PIL.ImageDraw.Draw(frame)
draw.text(pos, text, fill=color)
return frame
疲労モデルの設定
envFatigue = gym.make('myoFatiElbowPose1D6MRandom-v0', normalize_act=False)
## for comparison
env = gym.make('myoElbowPose1D6MRandom-v0', normalize_act=False)
MyoSuiteで提供されている疲労モデルは以下の論文をベースに構築しているとのこと
https://www.sciencedirect.com/science/article/abs/pii/S0021929018304342?via%3Dihub
筋肉の状態遷移の設定
上記論文にて以下3状態を設定し、今どこにいるかを予測できるようにしている
- $M_{F}$: 疲労時、Fatigued
- $M_{R}$: 安静時、Resting
- $M_{A}$: 活動時、Active
envFatigue.unwrapped.muscle_fatigue.MF #percentage of fatigued motor units for each muscle
envFatigue.unwrapped.muscle_fatigue.MR #percentage of resting motor units for each muscle
envFatigue.unwrapped.muscle_fatigue.MA #percentage of active motor units for each muscle
どれくらい早く回復するかについての係数を以下で設定
envFatigue.unwrapped.muscle_fatigue.set_RecoveryMultiplier(10)
envFatigue.unwrapped.muscle_fatigue.set_RecoveryCoefficient(0.0022)
envFatigue.unwrapped.muscle_fatigue.set_FatigueCoefficient(0.0146)
envFatigue.unwrapped.muscle_fatigue.r, envFatigue.unwrapped.muscle_fatigue.R, envFatigue.unwrapped.muscle_fatigue.F
(10, 0.0022, 0.0146)
envFatigue.unwrapped.muscle_fatigue.R
, envFatigue.unwrapped.muscle_fatigue.F
はjointやmuscleによって適宜調整する必要がある
疲労状態のinitialize
完全回復状態
envFatigue.reset()
envFatigue.unwrapped.muscle_fatigue.MF, envFatigue.unwrapped.muscle_fatigue.MR, envFatigue.unwrapped.muscle_fatigue.MA
(array([0., 0., 0., 0., 0., 0.]),
array([1., 1., 1., 1., 1., 1.]),
array([0., 0., 0., 0., 0., 0.]))
randomな疲労状態へのreset
envFatigue.unwrapped.set_fatigue_reset_random(True)
envFatigue.reset()
envFatigue.unwrapped.muscle_fatigue.MF, envFatigue.unwrapped.muscle_fatigue.MR, envFatigue.unwrapped.muscle_fatigue.MA
(array([0.0653, 0.7334, 0.183 , 0.5725, 0.5757, 0.5274]),
array([0.7949, 0.0634, 0.784 , 0.0369, 0.3437, 0.4019]),
array([0.1398, 0.2033, 0.033 , 0.3906, 0.0807, 0.0707]))
疲労が徐々に増えていく様のシミュレーション
元コード
env.reset()
envFatigue.reset()
data_store = []
data_store_f = []
for i in range(7*3): # 7 batches of 3 episodes, with 2 episodes of maximum muscle controls for some muscles followed by a resting episode (i.e., zero muscle controls) in each batch
a = np.zeros(env.unwrapped.sim.model.nu,)
if i%3!=2:
a[3:]=1
else:
a[:]=0
for _ in range(500): # 500 samples (=10s) for each episode
next_o, r, done, *_, ifo = env.step(a) # take an action
next_f_o, r_f, done_F, *_, ifo_f = envFatigue.step(a) # take an action
data_store.append({"action":a.copy(),
"jpos":env.unwrapped.sim.data.qpos.copy(),
"mlen":env.unwrapped.sim.data.actuator_length.copy(),
"act":env.unwrapped.sim.data.act.copy()})
data_store_f.append({"action":a.copy(),
"jpos":envFatigue.unwrapped.sim.data.qpos.copy(),
"mlen":envFatigue.unwrapped.sim.data.actuator_length.copy(),
"MF":envFatigue.unwrapped.muscle_fatigue.MF.copy(),
"MR":envFatigue.unwrapped.muscle_fatigue.MR.copy(),
"MA":envFatigue.unwrapped.muscle_fatigue.MA.copy(),
"act":envFatigue.unwrapped.sim.data.act.copy()})
env.close()
envFatigue.close()
muscle_names = [env.unwrapped.sim.model.id2name(i, "actuator") for i in range(env.unwrapped.sim.model.nu) if env.unwrapped.sim.model.actuator_dyntype[i] == mujoco.mjtDyn.mjDYN_MUSCLE]
muscle_id = -1
plt.figure(figsize=(12, 6))
plt.subplot(221)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['act'][muscle_id] for d in data_store]), label="Normal model/Desired activations")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['act'][muscle_id] for d in data_store_f]), label='Fatigued model')
plt.legend()
plt.title(f'Muscle activations over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('act')
plt.subplot(222)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['jpos'] for d in data_store]), label="Normal model")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['jpos'] for d in data_store_f]), label="Fatigued model")
plt.legend()
plt.title('Joint angle over time')
plt.xlabel('time (s)'),plt.ylabel('angle')
plt.subplot(223)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['mlen'][muscle_id] for d in data_store]), label="Normal model")
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['mlen'][muscle_id] for d in data_store_f]), label="Fatigued model")
plt.legend()
plt.title(f'Muscle lengths over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('muscle length')
plt.subplot(224)
plt.plot(env.unwrapped.dt*np.arange(len(data_store)), np.array([d['MF'][muscle_id] for d in data_store_f]), color="tab:orange")
plt.title(f'Fatigued motor units over time ({muscle_names[muscle_id]})')
plt.xlabel('time (s)'),plt.ylabel('%MVC')
plt.tight_layout()
plt.show()
疲労のない通常のモデルと比較し,疲労によってactの効率が低下していたり,疲労が増えていることが見える
疲労を伴うagentの学習
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback
env_name = "myoFatiElbowPose1D6MRandom-v0"
env = gym.make(env_name)
env.unwrapped.set_fatigue_reset_random(True)
env.reset()
# Save a checkpoint every 50 steps
checkpoint_callback = CheckpointCallback(
save_freq=50, # 50step毎にiterationの結果を保存
save_path=f"./{env_name}/iterations/",
name_prefix="rl_model",
save_replay_buffer=True,
save_vecnormalize=True,
)
model = PPO("MlpPolicy", env, verbose=0)
model.learn(total_timesteps=200, callback=checkpoint_callback) # totalで2000step実施
PPO
の中身を見ていないが,max_step=total_timesteps*10
とかになってる?
学習をしたモデルを用いた結果の確認
シミュレートした結果をvideoで返す
env_name = "myoFatiElbowPose1D6MRandom-v0"
GENERATE_VIDEO = True
GENERATE_VIDEO_EPS = 4 #number of episodes that are rendered BOTH at the beginning (i.e., without fatigue) and at the end (i.e., with fatigue)
STORE_DATA = True #store collected data from evaluation run in .npy file
n_eps = 250
###################################
env = gym.make(env_name)
from stable_baselines3 import PPO
model = PPO.load(f"{env_name}/iterations/rl_model_2000_steps")
env.unwrapped.set_fatigue_reset_random(False)
env.reset(fatigue_reset=True) #ensure that fatigue is reset before the simulation starts
env.unwrapped.sim.model.cam_poscom0[0]= np.array([-1.3955, -0.3287, 0.6579])
data_store = []
if GENERATE_VIDEO:
frames = []
env.unwrapped.target_jnt_value = env.unwrapped.target_jnt_range[:, 1]
env.unwrapped.target_type = 'fixed'
env.unwrapped.update_target(restore_sim=True)
start_time = time.time()
for ep in range(n_eps):
print("Ep {} of {}".format(ep, n_eps))
for _cstep in range(env.spec.max_episode_steps):
if GENERATE_VIDEO and (ep in range(GENERATE_VIDEO_EPS) or ep in range(n_eps-GENERATE_VIDEO_EPS, n_eps)):
frame = env.unwrapped.sim.renderer.render_offscreen(width=400, height=400, camera_id=0)
# Add text overlay
_current_time = (ep*env.spec.max_episode_steps + _cstep)*env.unwrapped.dt
frame = np.array(add_text_to_frame(frame,
f"t={str(int(_current_time//60)).zfill(2)}:{str(int(_current_time%60)).zfill(2)}min",
pos=(285, 3), color=(0, 0, 0), fontsize=18))
frames.append(frame)
o = env.unwrapped.get_obs()
a = model.predict(o)[0]
next_o, r, done, _, ifo = env.step(a) # take an action based on the current observation
data_store.append({"action":a.copy(),
"jpos":env.unwrapped.sim.data.qpos.copy(),
"mlen":env.unwrapped.sim.data.actuator_length.copy(),
"act":env.unwrapped.sim.data.act.copy(),
"reward":r,
"solved":env.unwrapped.rwd_dict['solved'].item(),
"pose_err":env.unwrapped.get_obs_dict(env.unwrapped.sim)["pose_err"],
"MA":env.unwrapped.muscle_fatigue.MA.copy(),
"MR":env.unwrapped.muscle_fatigue.MR.copy(),
"MF":env.unwrapped.muscle_fatigue.MF.copy(),
"ctrl":env.unwrapped.last_ctrl.copy()})
env.close()
## OPTIONALLY: Stored simulated data
if STORE_DATA:
os.makedirs(f"{env_name}/logs", exist_ok=True)
np.save(f"{env_name}/logs/fatitest.npy", data_store)
## OPTIONALLY: Render video
if GENERATE_VIDEO:
os.makedirs(f'{env_name}/videos', exist_ok=True)
# make a local copy
skvideo.io.vwrite(f'{env_name}/videos/fatitest.mp4', np.asarray(frames),inputdict={'-r': str(int(1/env.unwrapped.dt))},outputdict={"-pix_fmt": "yuv420p"})
end_time = time.time()
print(f"DURATION: {end_time - start_time:.2f}s")
if GENERATE_VIDEO:
display(show_video(f'{env_name}/videos/fatitest.mp4'))
疲労がなければもっと目標に近づく,って結果な気がする
疲労なしのバージョンも余裕あれば横に追加する
グラフで確認
env_name = "myoFatiElbowPose1D6MRandom-v0"
####################
env_test = gym.make(env_name, normalize_act=False)
muscle_names = [env_test.unwrapped.sim.model.id2name(i, "actuator") for i in range(env_test.unwrapped.sim.model.nu) if env_test.unwrapped.sim.model.actuator_dyntype[i] == mujoco.mjtDyn.mjDYN_MUSCLE]
_env_dt = env_test.unwrapped.dt #0.02
data_store = np.load(f"{env_name}/logs/fatitest.npy", allow_pickle=True)
plt.figure()
for _muscleid in range(len(data_store[0]['MF'])):
plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MF'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Fatigued Motor Units')
plt.figure()
for _muscleid in range(len(data_store[0]['MR'])):
plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MR'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Resting Motor Units')
plt.figure()
for _muscleid in range(len(data_store[0]['MA'])):
plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['MA'][_muscleid] for d in data_store]), label=muscle_names[_muscleid])
plt.legend()
plt.title('Active Motor Units')
plt.figure()
plt.plot(_env_dt*np.arange(len(data_store)), np.array([np.linalg.norm(d['pose_err']) for d in data_store])), plt.title('Pose Error')
plt.figure()
plt.plot(_env_dt*np.arange(len(data_store)), np.array([d['reward'] for d in data_store])), plt.title(f"Reward (Total: {np.array([d['reward'] for d in data_store]).sum():.2f})")
if "solved" in data_store[0]:
plt.figure()
plt.scatter(_env_dt*np.arange(len(data_store))[np.array([d['solved'] for d in data_store])], np.array([d['solved'] for d in data_store])[np.array([d['solved'] for d in data_store])]), plt.title(f"Success")
print(f"Muscle Fatigue Equilibrium: {data_store[-1]['MF']}")
他にもいくつか結果が出力されるが,結構多いのとよくわからないのでわかりやすいPose Errorの画像だけ表示
本Tutorialのまとめ
- 疲労を考慮したモデルの学習の仕方,結果の表示についてのTutorial
- 用意されている筋肉の状態はFatigued, Resting, Activeの3状態
-
gym.make.unwrapped.muscle_fatigue
のインスタンスで疲労度の調整が可能