概要
DQNなどの手法では方策によって各状態のQ(s,a)を計算し、Q値を最大化する行動を選択・行動をしていたが、これでは離散的な行動しか扱えなかった。それに対して、DDPGでは連続行動空間に対応するためQ値を最大化する行動を求めるのではなく、方策をパラメータ化し直接行動を出力することで対応した。そのため、決定的な方策となっている。
実装
リプレイバッファ
深層強化学習ではおなじみのリプレイバッファです。現在の状態、その時の行動、次状態、即時報酬、終端状態かどうかを一つのタプルとして保存しています。
from collections import deque, namedtuple
import random
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer(object):
def __init__(self, capacity=1e6):
self.capacity = capacity
self.memory = deque([], maxlen=int(capacity))
def append(self, *args):
transition = Transition(*args)
self.memory.append(transition)
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def reset(self):
self.memory.clear()
def length(self):
return len(self.memory)
def __len__(self):
return len(self.memory)
モデル
DDPGでは現在の状態から行動を連続値で出力するActor$\mu(s)$と現在の状態と行動からQ値を出力するCritic$Q(s,a)$が存在します。各層の重みの初期化については元論文に沿っているので、詳しくはそちらを確認してください(下にリンクがあります)。特徴的なのはActorの最終層にtanhがあることと、Criticで行動を受け取る際に第二層で受け取ることですかね。もしPendulumで実験する場合には、行動範囲が[-2, 2]なので、出力に2を書けても良いかもしれません。
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
def init_weight(size):
f = size[0]
v = 1. / np.sqrt(f)
return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)
class ActorNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
super(ActorNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size, hidden2_size)
self.fc3 = nn.Linear(hidden2_size, num_action[0])
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
y = torch.tanh(self.fc3(h)) # 2をかけてもよいかも?
return y
class CriticNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
super(CriticNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
self.fc3 = nn.Linear(hidden2_size, 1)
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x, action):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
y = self.fc3(h)
return y
エージェント
エージェントでは行動を選択する際にそのままでは行動が決定的になってしまうため、ノイズ$\mathcal{N}$を加えます。このときのノイズはオルンシュタイン=ウーレンベック過程という確立過程に従います。詳しくはわからないです。時間経つに従って平均に近づいていくノイズだと思えばいいと思います。しらんけど。
a = \mu(s) + \mathcal{N}
各モデルの学習についてはCriticはDQNなどと同様にTD誤差を最小化するように勾配を求めてモデルの更新を行います。損失関数については以下のとおりです。Nはバッチサイズです。
L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2
Actorの方はQ値を最大化するようにモデルの更新を行います。このとき最大化を行うので、Lossにマイナスがつくことに注意です。目的関数は以下の通りです。
J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})
上記の目的関数ではダッシュがついてるものはtargetネットワークになります。これは学習を安定化させるためによく使われるものです。DQNなどではこのtargetネットワークの更新が数エポック毎に行われるのに対して、DDPGではハイパパラメータ$\tau(\ll 1)$を用いて
\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}
のように、緩やかに更新されます。これにより、学習が安定しますが学習時間が若干長くなってしまうらしいです。
import torch
import torch.nn.functional as F
import numpy as np
import copy
class OrnsteinUhlenbeckProcess:
def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.size = size
self.num_steps = 0
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0
self.c = sigma
self.sigma_min = sigma
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
return sigma
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.num_steps += 1
return x
class DDPG:
def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
self.actor = actor
self.critic = critic
self.actor_target = copy.deepcopy(self.actor)
self.critic_target = copy.deepcopy(self.critic)
self.optimizer_actor = optimizer_actor
self.optimizer_critic = optimizer_critic
self.replay_buffer = replay_buffer
self.device = device
self.gamma = gamma
self.tau = tau
self.epsilon = epsilon
self.batch_size = batch_size
self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])
self.num_state = actor.num_state
self.num_action = actor.num_action
def add_memory(self, *args):
self.replay_buffer.append(*args)
def reset_memory(self):
self.replay_buffer.reset()
def get_action(self, state, greedy=False):
state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
action = self.actor(state_tensor)
if not greedy:
action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)
return action.squeeze(0).detach().cpu().numpy()
def train(self):
if len(self.replay_buffer) < self.batch_size:
return None
transitions = self.replay_buffer.sample(self.batch_size)
batch = Transition(*zip(*transitions))
state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
not_done = np.array([(not done) for done in batch.done])
not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)
# need to change
qvalue = self.critic(state_batch, action_batch)
next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch)
critic_loss = F.mse_loss(qvalue, target_qvalue)
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
self.optimizer_actor.zero_grad()
actor_loss.backward()
self.optimizer_actor.step()
# soft parameter update
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
学習
ここに関しては特に新しい点はありません。その他の強化学習のアルゴリズムと同様に環境からの状態を受け取って、行動、学習をしている感じになります。各ハイパーパラメータは元論文に沿っています。(多分)
import torch
import torch.optim as optim
import gym
max_episodes = 300
memory_capacity = 1e6 # バッファの容量
gamma = 0.99 # 割引率
tau = 1e-3 # ターゲットの更新率
epsilon = 1.0 # ノイズの量をいじりたい場合、多分いらない
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2
env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)
for episode in range(max_episodes):
observation = env.reset()
total_reward = 0
for step in range(max_steps):
action = agent.get_action(observation)
next_observation, reward, done, _ = env.step(action)
total_reward += reward
agent.add_memory(observation, action, next_observation, reward, done)
agent.train()
observation = next_observation
if done:
break
if episode % logger_interval == 0:
print("episode:{} total reward:{}".format(episode, total_reward))
for episode in range(3):
observation = env.reset()
env.render()
for step in range(max_steps):
action = agent.get_action(observation, greedy=True)
next_observation, reward, done, _ = env.step(action)
observation = next_observation
env.render()
if done:
break
env.close()
gym.makeの環境を変えれば他の環境での学習もできるはず。
結果
累積報酬と学習エピソードのグラフです。いい感じに学習できているのではないでしょうか。
ちゃんとたてられていますね。えらい。(gifは作り方がわからないので、載せられてないです)
おわりに
基本的に上のコードをそのまま一つファイルにまとめて実行してもらえれば、動作の確認ができると思います。実装で手一杯だったのでいくつか余分な変数などが残っています。Pendulum-v0であれば、GPUが利用可能な環境でなくてもcpuのみで学習はできます。ただ、少し学習が不安定になるときがあるのでそのときには再度実行してください。 機会があれば、他の手法の実装も行っていくつもりです。