8
12

More than 3 years have passed since last update.

DDPGでPendulum-v0を学習

Last updated at Posted at 2020-06-19

概要

DQNなどの手法では方策によって各状態のQ(s,a)を計算し、Q値を最大化する行動を選択・行動をしていたが、これでは離散的な行動しか扱えなかった。それに対して、DDPGでは連続行動空間に対応するためQ値を最大化する行動を求めるのではなく、方策をパラメータ化し直接行動を出力することで対応した。そのため、決定的な方策となっている。

実装

リプレイバッファ

深層強化学習ではおなじみのリプレイバッファです。現在の状態、その時の行動、次状態、即時報酬、終端状態かどうかを一つのタプルとして保存しています。

from collections import deque, namedtuple
import random

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayBuffer(object):
    def __init__(self, capacity=1e6):
        self.capacity = capacity
        self.memory = deque([], maxlen=int(capacity))

    def append(self, *args):
        transition = Transition(*args)
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def reset(self):
        self.memory.clear()

    def length(self):
        return len(self.memory)

    def __len__(self):
        return len(self.memory)

モデル

DDPGでは現在の状態から行動を連続値で出力するActor$\mu(s)$と現在の状態と行動からQ値を出力するCritic$Q(s,a)$が存在します。各層の重みの初期化については元論文に沿っているので、詳しくはそちらを確認してください(下にリンクがあります)。特徴的なのはActorの最終層にtanhがあることと、Criticで行動を受け取る際に第二層で受け取ることですかね。もしPendulumで実験する場合には、行動範囲が[-2, 2]なので、出力に2を書けても良いかもしれません。

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

def init_weight(size):
    f = size[0]
    v = 1. / np.sqrt(f)
    return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)

class ActorNetwork(nn.Module):
    def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
        super(ActorNetwork, self).__init__()
        self.fc1 = nn.Linear(num_state[0], hidden1_size)
        self.fc2 = nn.Linear(hidden1_size, hidden2_size)
        self.fc3 = nn.Linear(hidden2_size, num_action[0])

        self.num_state = num_state
        self.num_action = num_action

        self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
        self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
        self.fc3.weight.data.uniform_(-init_w, init_w)

    def forward(self, x):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(h))
        y = torch.tanh(self.fc3(h))  # 2をかけてもよいかも?
        return y

class CriticNetwork(nn.Module):
    def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
        super(CriticNetwork, self).__init__()
        self.fc1 = nn.Linear(num_state[0], hidden1_size)
        self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
        self.fc3 = nn.Linear(hidden2_size, 1)

        self.num_state = num_state
        self.num_action = num_action

        self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
        self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
        self.fc3.weight.data.uniform_(-init_w, init_w)

    def forward(self, x, action):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
        y = self.fc3(h)
        return y

エージェント

エージェントでは行動を選択する際にそのままでは行動が決定的になってしまうため、ノイズ$\mathcal{N}$を加えます。このときのノイズはオルンシュタイン=ウーレンベック過程という確立過程に従います。詳しくはわからないです。時間経つに従って平均に近づいていくノイズだと思えばいいと思います。しらんけど。

a = \mu(s) + \mathcal{N}

各モデルの学習についてはCriticはDQNなどと同様にTD誤差を最小化するように勾配を求めてモデルの更新を行います。損失関数については以下のとおりです。Nはバッチサイズです。

L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2

Actorの方はQ値を最大化するようにモデルの更新を行います。このとき最大化を行うので、Lossにマイナスがつくことに注意です。目的関数は以下の通りです。

J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})

上記の目的関数ではダッシュがついてるものはtargetネットワークになります。これは学習を安定化させるためによく使われるものです。DQNなどではこのtargetネットワークの更新が数エポック毎に行われるのに対して、DDPGではハイパパラメータ$\tau(\ll 1)$を用いて

\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}

のように、緩やかに更新されます。これにより、学習が安定しますが学習時間が若干長くなってしまうらしいです。

import torch
import torch.nn.functional as F
import numpy as np
import copy

class OrnsteinUhlenbeckProcess:
    def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.size = size
        self.num_steps = 0

        self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)

        if sigma_min is not None:
            self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
            self.c = sigma
            self.sigma_min = sigma_min
        else:
            self.m = 0
            self.c = sigma
            self.sigma_min = sigma

    def current_sigma(self):
        sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
        return sigma

    def sample(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
        self.x_prev = x
        self.num_steps += 1
        return x


class DDPG:
    def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
        self.actor = actor
        self.critic = critic
        self.actor_target = copy.deepcopy(self.actor)
        self.critic_target = copy.deepcopy(self.critic)
        self.optimizer_actor = optimizer_actor
        self.optimizer_critic = optimizer_critic
        self.replay_buffer = replay_buffer
        self.device = device
        self.gamma = gamma
        self.tau = tau
        self.epsilon = epsilon
        self.batch_size = batch_size
        self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])

        self.num_state = actor.num_state
        self.num_action = actor.num_action

    def add_memory(self, *args):
        self.replay_buffer.append(*args)

    def reset_memory(self):
        self.replay_buffer.reset()

    def get_action(self, state, greedy=False):
        state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
        action = self.actor(state_tensor)
        if not greedy:
            action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)

        return action.squeeze(0).detach().cpu().numpy()

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return None
        transitions = self.replay_buffer.sample(self.batch_size)
        batch = Transition(*zip(*transitions))

        state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
        action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
        next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
        reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
        not_done = np.array([(not done) for done in batch.done])
        not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)

        # need to change
        qvalue = self.critic(state_batch, action_batch)
        next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
        target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch) 

        critic_loss = F.mse_loss(qvalue, target_qvalue)
        self.optimizer_critic.zero_grad()
        critic_loss.backward()
        self.optimizer_critic.step()

        actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
        self.optimizer_actor.zero_grad()
        actor_loss.backward()
        self.optimizer_actor.step()

        # soft parameter update
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)

学習

ここに関しては特に新しい点はありません。その他の強化学習のアルゴリズムと同様に環境からの状態を受け取って、行動、学習をしている感じになります。各ハイパーパラメータは元論文に沿っています。(多分)

import torch
import torch.optim as optim
import gym

max_episodes = 300
memory_capacity = 1e6  # バッファの容量
gamma = 0.99  # 割引率
tau = 1e-3  # ターゲットの更新率
epsilon = 1.0  # ノイズの量をいじりたい場合、多分いらない
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2

env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)

for episode in range(max_episodes):
    observation = env.reset()
    total_reward = 0

    for step in range(max_steps):
        action = agent.get_action(observation)
        next_observation, reward, done, _ = env.step(action)
        total_reward += reward
        agent.add_memory(observation, action, next_observation, reward, done)

        agent.train()

        observation = next_observation

        if done:
            break

    if episode % logger_interval == 0:
        print("episode:{} total reward:{}".format(episode, total_reward))

for episode in range(3):
    observation = env.reset()
    env.render()
    for step in range(max_steps):
        action = agent.get_action(observation, greedy=True)
        next_observation, reward, done, _ = env.step(action)
        observation = next_observation
        env.render()

        if done:
            break

env.close()

gym.makeの環境を変えれば他の環境での学習もできるはず。

結果

累積報酬と学習エピソードのグラフです。いい感じに学習できているのではないでしょうか。
Figure_1-1.png
ちゃんとたてられていますね。えらい。(gifは作り方がわからないので、載せられてないです)

おわりに

基本的に上のコードをそのまま一つファイルにまとめて実行してもらえれば、動作の確認ができると思います。実装で手一杯だったのでいくつか余分な変数などが残っています。Pendulum-v0であれば、GPUが利用可能な環境でなくてもcpuのみで学習はできます。ただ、少し学習が不安定になるときがあるのでそのときには再度実行してください。 機会があれば、他の手法の実装も行っていくつもりです。

参考文献

Continuous Control with Deep Reinforcement Learning

8
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
12