概要
本記事では、4足歩行ロボット(生物?)の歩行動作の学習をPythonを用いて行います。
前提知識として、強化学習の基礎(Q値や報酬、状態、行動、割引率、ポリシー、Actor-Criticなどについて)とニューラルネットワークの基礎(中間層、ニューロン数、学習率、活性化関数、最適化関数などについて)とPythonの基礎文法に関する知識があれば、スムーズに読めると思います。
実行環境
・Windows10
・Python3.7.9 ダウンロード
ライブラリ
以下のライブラリをpipでインストールしてください。
・ numpy (数値計算用の便利なやつ)
・ chainer (ディープラーニング用)
・ chainerrl (強化学習の色々なアルゴリズムが実装されている)
・ pybullet (物理エンジン、シミュレータ用)
・ gym (強化学習の環境を楽に定義できる便利なやつ)
今回の学習対象について
Pybulletのサンプル環境である"AntBulletEnv-v0"を用います。
蟻を模した四足歩行ロボットの歩行動作の学習を行う環境です。
状態空間は、連続値8次元(体の位置と姿勢と速度)+ 連続値16次元(関節の角度と角速度)+ 離散値4次元(足の設置状態)= 28次元です。
行動空間は、全て連続値で8次元(足4本×2関節ずつ)です。行動は各関節に及ぼすトルクの値です。
報酬は前進することによって貰え、ロボットが倒れたらエピソード終了です。
強化学習アルゴリズムについて
今回の学習対象の行動空間は全て連続値です。
よって、DQNなどのQ関数をニューラルネットワーク(以下、NN)で近似する、いわゆる価値ベースなアルゴリズムは使えません。
そこで今回はActor-Criticという方策関数もNNで表現したアルゴリズムを用います。
Actor-Critic系のアルゴリズムでは、Soft-Actor-Critic(SAC)とProximal-Policy-Optimization(PPO)がよく用いられます。
SACは経験再生を使う、いわゆるオフポリシーなアルゴリズムで学習速度が速いです。一方、PPOは学習速度は遅いですが、より優れた方策が得られるというメリットがあります。
今回は学習速度が速いSACを使うことにします。
コード全体
以下にコードの全体を示します。これをそのままコピペして実行すると、四足歩行ロボットが足をじたばた動かしていると思います。
最初に以下のような警告が表示されることもありますが、無視して大丈夫です。
Warning (from warnings module):
File "C:\Users\SUSID\AppData\Local\Programs\Python\Python37\lib\site-packages\gym\logger.py", line 30
warnings.warn(colorize('%s: %s'%('WARN', msg % args), 'yellow'))
UserWarning: [33mWARN: Box bound precision lowered by casting to float32[0m
import chainer
import chainer.functions as F
import chainer.links as L
import chainerrl
from chainerrl.agents.soft_actor_critic import SoftActorCritic
from chainerrl import distribution
import numpy as np
import gym
import pybullet_envs
class PolicyNetwork(chainer.Chain):
def __init__(self):
w = chainer.initializers.HeNormal(scale=1.0)
super(PolicyNetwork, self).__init__()
with self.init_scope():
self.l1 = L.Linear(28, 128, initialW = w)
self.l2 = L.Linear(128, 128, initialW = w)
self.l3 = L.Linear(128, 128, initialW = w)
self.l4m = L.Linear(128, 8, initialW = w)
self.l4v = L.Linear(128, 8, initialW = w)
def __call__(self, s):
h = F.relu(self.l1(s))
h = F.relu(self.l2(h))
h = F.relu(self.l3(h))
m = F.tanh(self.l4m(h))
log_scale = F.tanh(self.l4v(h))
log_scale = F.clip(log_scale, -20., 2.)
v = F.exp(log_scale * 2)
return chainerrl.distribution.SquashedGaussianDistribution(m, v)
class QFunction(chainer.Chain):
def __init__(self):
w = chainer.initializers.HeNormal(scale=1.0)
super(QFunction, self).__init__()
with self.init_scope():
self.l1 = L.Linear(36, 128, initialW = w)
self.l2 = L.Linear(128, 128, initialW = w)
self.l3 = L.Linear(128, 128, initialW = w)
self.l4 = L.Linear(128, 1, initialW = w)
def __call__(self, s, a):
h = F.concat((s, a), axis=1)
h = F.relu(self.l1(h))
h = F.relu(self.l2(h))
h = F.relu(self.l3(h))
return self.l4(h)
def main():
env = gym.make('AntBulletEnv-v0')
env.render(mode='human')
num_episodes = 30000
q_func1 = QFunction()
q_func2 = QFunction()
policy = PolicyNetwork()
optimizer_p = chainer.optimizers.Adam(alpha=1e-3)
optimizer_q1 = chainer.optimizers.Adam(alpha=1e-3)
optimizer_q2 = chainer.optimizers.Adam(alpha=1e-3)
optimizer_p.setup(policy)
optimizer_q1.setup(q_func1)
optimizer_q2.setup(q_func2)
replay_buffer = chainerrl.replay_buffer.ReplayBuffer(capacity=10 ** 6)
phi = lambda x: x.astype(np.float32, copy=False)
def burnin_action_func():return np.random.uniform(env.action_space.low, env.action_space.high).astype(np.float32)
agent = SoftActorCritic(policy, q_func1, q_func2, optimizer_p, optimizer_q1, optimizer_q2,
replay_buffer, gamma=0.99,
replay_start_size=128,
update_interval=8,
soft_update_tau=0.005,
phi=phi, gpu=None, minibatch_size=128,
initial_temperature=1.0,
temperature_optimizer=None,
act_deterministically=False,
burnin_action_func=burnin_action_func,
entropy_target=-env.action_space.low.size)
outdir = 'result/'
#agent.load(outdir + "agent_sac_ant_trained")
reward = 0
for episode in range(1, num_episodes + 1):
done = False
obs = env.reset()
while not done:
action = agent.act_and_train(obs, reward)
obs, reward, done, info = env.step(action)
agent.stop_episode_and_train(obs, reward, done)
if episode % 10 == 0:
print('Episode {0:4d}: statistics: {1}'.format(episode, agent.get_statistics()))
if episode % 100 == 0:
agent.save(outdir + 'agent_sac_ant_trained')
if __name__ == '__main__':
main()
コードの解説
最初に必要なライブラリのimportを行っています。
import chainer
import chainer.functions as F
import chainer.links as L
import chainerrl
from chainerrl.agents.soft_actor_critic import SoftActorCritic
from chainerrl import distribution
import numpy as np
import gym
import pybullet_envs
次に以下のクラスで方策ネットワークの定義を行っております。
方策ネットワークは状態を入力とし、行動を出力とします。
class PolicyNetwork(chainer.Chain):
def __init__(self):
(省略)
def __call__(self, s):
(省略)
コンストラクタ(init)の内部でNNの定義をしています。
PolicyNetworkのインスタンスを作成したときに、このコンストラクタが一回実行され、NNが生成されます。
self.l1 = L.Linear(28, 128, initialW = w)
self.l2 = L.Linear(128, 128, initialW = w)
self.l3 = L.Linear(128, 128, initialW = w)
self.l4m = L.Linear(128, 8, initialW = w)
self.l4v = L.Linear(128, 8, initialW = w)
今回は中間層2層の全結合NNを用いました。中間層(l2とl3)のニューロン数は128です。
入力層(l1)への入力は先ほど述べました状態空間の28次元です。
出力層(l4mとl4v)の出力は行動空間の8次元です。
l4mでは行動(トルク指令)の平均、l4vでは行動の分散を出力しています。
最終的に行動は、ここで出力した平均と分散に基づくガウス分布から値を一つサンプリングすることによって決定しています。ここの説明は後でもう少し詳しくします。
上記では、NNの定義を行いました。次は、実際に入力された状態から行動を出力する関数についての説明です。以下の__call__関数は状態sを引数として、8次元の行動を返します。
def __call__(self, s):
(省略)
return chainerrl.distribution.SquashedGaussianDistribution(m, v)
sは、l1 → relu関数 → l2 → relu関数 → l3 → relu関数 → l4m, l4v → tanh関数 → m, vという流れで
処理されていきます。relu関数およびtanh関数は活性化関数と呼ばれる関数です。途中のlog_scaleを介している部分は値のスケールを調節するための処理です。
h = F.relu(self.l1(x))
h = F.relu(self.l2(h))
h = F.relu(self.l3(h))
m = F.tanh(self.l4m(h))
log_scale = F.tanh(self.l4v(h))
log_scale = F.clip(log_scale, -20., 2.)
v = F.exp(log_scale * 2)
そして、最後にSquashedGaussianDistribution関数を用いて、m(平均)とv(分散)からガウス分布に基づくサンプリングにより行動を決定しています。ガウス分布を用いることによって確率的な行動決定が可能になります。
return chainerrl.distribution.SquashedGaussianDistribution(m, v)
次は、以下のQ関数ネットワーククラスの定義についてです。
class QFunction(chainer.Chain):
def __init__(self):
(省略)
def __call__(self, s, action):
(省略)
それではコンストラクタの内部から見ていきましょう。コンストラクタでは、ポリシーネットワークと同様にNNの定義を行っております。
self.l1 = L.Linear(36, 128, initialW = w)
self.l2 = L.Linear(128, 128, initialW = w)
self.l3 = L.Linear(128, 128, initialW = w)
self.l4 = L.Linear(128, 1, initialW = w)
ポリシーネットワークと異なる点は、入力の次元数と出力の次元数です。中間層の数とニューロン数はポリシーネットワークと同様の値にしていますが、異なっていても学習には問題ありません。
Q関数ネットワークは、状態と行動の組み合わせから、その行動に対するQ値を出力します。
Q値は1次元なのでl4の出力は1次元となっています。また、l1の入力は状態と行動の組み合わせなので、
28次元(状態空間) + 8次元(行動空間) = 36次元となっています。
次は、状態sと行動aからQ値を出力する関数についてです。
基本的にはポリシーネットワークと同様ですが、Q関数ネットワークの方が少しシンプルです。
引数として状態sと行動aをとり、Q値としてl4の出力を返しています。
def __call__(self, s, action):
h = F.concat((s, action), axis=1)
h = F.relu(self.l1(h))
h = F.relu(self.l2(h))
h = F.relu(self.l3(h))
return self.l4(h)
さて、次はいよいよメイン関数の中身についてです。まず初めに今回の学習環境であるPybulletのサンプル環境"AntBulletEnv-v0"を生成しています。以下のコードの2行目はビジュアライズを行うための関数です。ビジュアライズしない方がプログラムが軽くなるため、実際に学習させるときは2行目はコメントアウトして学習を行います。
env = gym.make('AntBulletEnv-v0')
env.render(mode='human')
次は強化学習のパラメータおよび最適化関数のセットアップを行っています。今回は最適化関数にはAdamを用いました。
num_episodes = 30000
q_func1 = QFunction()
q_func2 = QFunction()
policy = PolicyNetwork()
optimizer_p = chainer.optimizers.Adam(alpha=1e-3)
optimizer_q1 = chainer.optimizers.Adam(alpha=1e-3)
optimizer_q2 = chainer.optimizers.Adam(alpha=1e-3)
optimizer_p.setup(policy)
optimizer_q1.setup(q_func1)
optimizer_q2.setup(q_func2)
replay_buffer = chainerrl.replay_buffer.ReplayBuffer(capacity=10 ** 6)
phi = lambda x: x.astype(np.float32, copy=False)
def burnin_action_func():return np.random.uniform(env.action_space.low, env.action_space.high).astype(np.float32)
一通り学習に必要な変数が定義できたら、いよいよ強化学習の主体であるSACエージェントのインスタンスを生成します。
引数として、方策ネットワークと価値ネットワーク、最適化関数、リプレイバッファを渡します。
次は、重要なデフォルト引数の説明をします。
gammaは割引率で、今回は0.99としました。
replay_start_sizeは学習開始から数えて何ステップ目からNNを更新し始めるかを決める値です。
update_intervalはNNを更新する間隔です。つまり、以下のコードだと、128ステップ目から8ステップ間隔でNNの更新を行うことになります。
minibatch_sizeとはNNの更新を行うときに、リプレイバッファから何個のデータを読み込むかを決める値です。
act_deterministicallyは行動を決定論的に行うか否かを決める真偽値です。
agent = SoftActorCritic(policy, q_func1, q_func2, optimizer_p, optimizer_q1, optimizer_q2,
replay_buffer, gamma=0.99,
replay_start_size=128,
update_interval=8,
soft_update_tau=0.005,
phi=phi, gpu=None, minibatch_size=128,
initial_temperature=1.0,
temperature_optimizer=None,
act_deterministically=False,
burnin_action_func=burnin_action_func,
entropy_target=-env.action_space.low.size)
次は学習データを保存するフォルダを指定しています。以下のコードの2行目のコメントを外せば、学習済みデータを読み込むことができます。
outdir = 'result/'
agent.load(outdir + "agent_sac_ant_trained")
ここまでで一通り学習に必要な準備はできました。いよいよ学習スタートです。
先ほど定義したエピソード数分だけ学習のループを回しています。
10エピソード毎に学習の統計データを表示し、100エピソード毎に学習データを保存しています。
for episode in range(1, num_episodes + 1):
(省略)
if episode % 10 == 0:
print('Episode {0:4d}: statistics: {1}'.format(episode, agent.get_statistics()))
if episode % 100 == 0:
agent.save(outdir + 'agent_sac_ant')
エピソードの初めはまず環境の初期化を行います。本環境では、毎回同じ状態から学習をスタートします。
obs = env.reset()
そしてエピソードの終了条件を満たすまで学習を行います。以下のコードの2行目で行動を選択するとともに学習も行っています。
3行目では、選択した行動を環境に対して実行します。そして、次の状態や報酬などの情報が環境から返ってきます。
while not done:
action = agent.act_and_train(obs, reward)
obs, reward, done, info = env.step(action)
一連のエピソードが終了したら、以下の関数を実行して、エピソードが終了したことをエージェントに伝えます。
agent.stop_episode_and_train(obs, reward, done)
コード解説は以上となります。