はじめに
この記事は Deep Learningやっていき Advent Calendar 2017 の20日目の記事です.
本当は,Distributed Prioritized Experience Replayを読んだので解説してみる(Qiita記事)で読んだ論文の実装をしたかったんですけど,マルチスレッド分散処理の実装が面倒だったので,とりあえず完成したこちらの実装を紹介したいと思います.
↑の実装もしているので,完成したらまた記事書きます.
実装環境
- Channer 3.2.0
- Windows 10
- Intel Core i7 の何か
- jupyter notebook
自分のChainer環境がv1.0,白亜紀の遺物だったので,これを機にバージョンアップしました.ついでにCuDNNとcudaのバージョンアップもしました.
Prioritized Experience Replay とは
詳しくは以下の記事がとても参考になります.
これから強化学習を勉強する人のための「強化学習アルゴリズム・マップ」と、実装例まとめ
こちらの記事ではKerasを使った実装の解説がされています.
今回はこの記事を参考にChainerで実装してみました.
n番煎じで申し訳ないです.
実装
Githubに.ipynb形式であげたのでよかったら参考にしてください.
prioritized_experience_replay_DQN_chainer_v3.py
# coding: utf-8
# In[2]:
import gym
import numpy as np
import time
import random
import matplotlib.pyplot as plt
from collections import deque
import chainer
from chainer import iterators
from chainer import optimizers
from chainer import cuda
from chainer import Variable
import chainer.links as L
import chainer.functions as F
from matplotlib import animation
from matplotlib import pyplot as plt
get_ipython().magic('matplotlib nbagg')
# In[3]:
env = gym.make('CartPole-v0')
num_episodes = 100 # 総試行回数
max_number_of_steps = 200 # 1試行のstep数
goal_average_reward = 195 # この報酬を超えると学習終了
num_consecutive_iterations = 5 # 学習完了評価の平均計算を行う試行回数
total_reward_vec = np.zeros(num_consecutive_iterations) # 各試行の報酬を格納
gamma = 0.99 # 割引係数
islearned = 0 # 学習が終わったフラグ
hidden_size = 8 # Q-networkの隠れ層のニューロンの数
learning_rate = 0.001 # Q-networkの学習係数
memory_size = 5000 # バッファーメモリの大きさ
batch_size = 32 # Q-networkを更新するバッチの大記載
n_epoch = 1
NUM_OF_STATES = env.observation_space.shape[0]
NUM_OF_ACTIONS = env.action_space.n
gpu_device = 0
cuda.get_device(gpu_device).use()
# In[4]:
class QNetwork(chainer.Chain):
def __init__(self, _units=hidden_size, _out=NUM_OF_ACTIONS):
super(QNetwork, self).__init__()
with self.init_scope():
self.l1 = L.Linear(None, _units)
self.l2 = L.Linear(_units, _units)
self.l3 = L.Linear(_units, _out)
def __call__(self, x, target):
x = Variable(np.array(x).reshape((batch_size, -1)).astype(np.float32))
target = Variable(np.array(target).reshape((batch_size, -1)).astype(np.float32))
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
y = self.l3(h2)
return F.mean(F.huber_loss(y,target,delta=1.0))
# return F.huber_loss(y,target,delta=1.0)
# return F.mean_squared_error(y,target)
def predict(self, x):
x = Variable(np.array(x).reshape((1, -1)).astype(np.float32))
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
y = self.l3(h2)
return y.data
# In[5]:
class QNetwork_GPU(chainer.Chain):
def __init__(self, _units=hidden_size, _out=NUM_OF_ACTIONS):
super(QNetwork, self).__init__()
with self.init_scope():
self.l1 = L.Linear(None, _units)
self.l2 = L.Linear(_units, _units)
self.l3 = L.Linear(_units, _out)
def __call__(self, x, target):
x = Variable(xp.array(x).reshape((batch_size, -1)).astype(xp.float32))
target = Variable(xp.array(target).reshape((batch_size, -1)).astype(xp.float32))
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
y = self.l3(h2)
return F.mean(F.huber_loss(y,target,delta=1.0))
# return F.huber_loss(y,target,delta=1.0)
# return F.mean_squared_error(y,target)
def predict(self, x):
x = Variable(xp.array(x).reshape((1, -1)).astype(xp.float32))
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
y = self.l3(h2)
return y.data
# In[6]:
class Memory:
## memoryは,[state, action, reward, n-state, TD-error] の形で保存する.
def __init__(self, max_size=1000):
self.buffer = deque(maxlen=max_size)
def add(self, experience, mainQN, targetQN, gamma):
TDerror = self.get_TDerror(experience, mainQN, targetQN, gamma)
(state, action, reward, next_state) = experience
experience = (state, action, reward, next_state, TDerror)
self.buffer.append(experience)
def len(self):
return len(self.buffer)
def sample(self, batch_size):
idx = np.random.choice(np.arange(len(self.buffer)), size=batch_size, replace=False)
return [self.buffer[ii][:4] for ii in idx]
def get_TDerror(self, experience, mainQN, targetQN, gamma):
(state, action, reward, next_state) = experience
next_action = np.argmax(mainQN.predict(next_state)[0])
target = reward + gamma * targetQN.predict(next_state)[0][next_action]
TDerror = target - targetQN.predict(state)[0][action]
return TDerror
def update_TDerror(self, mainQN, targetQN, gamma):
for i in range(0, (self.len() - 1)):
(state, action, reward, next_state) = self.buffer[i][:4]
next_action = np.argmax(mainQN.predict(next_state)[0])
target = reward + gamma * targetQN.predict(next_state)[0][next_action]
TDerror = target - targetQN.predict(state)[0][action]
self.buffer[i] = (state, action, reward, next_state, TDerror)
def get_sum_absolute_TDerror(self):
sum_absolute_TDerror = 0
for i in range(0, (self.len() - 1)):
sum_absolute_TDerror += abs(self.buffer[i][4]) + 0.0001 # 最新の状態データを取り出す
return sum_absolute_TDerror
# In[7]:
class Actor:
def get_action(self, state, episode, targetQN):
epsilon = 0.001 + 0.9 / (1.0 + episode)
if epsilon <= np.random.uniform(0, 1):
nextQ = targetQN.predict(state)[0]
action = np.argmax(nextQ)
else:
action = random.randint(0, NUM_OF_ACTIONS - 1)
return action
# In[8]:
def replay(mainQN, targetQN, optimizer, memory, gamma):
inputs = np.zeros((batch_size, 4))
targets = np.zeros((batch_size, 2))
mini_batch = memory.sample(batch_size)
for i, (_state, _action, _reward, _next_state) in enumerate(mini_batch):
inputs[i] = _state
target = _reward
if not (_next_state.reshape(-1) == np.zeros(NUM_OF_STATES)).all():
nextQ = mainQN.predict(_next_state)[0]
next_action = np.argmax(nextQ) # 最大の報酬を返す行動を選択する
target = _reward + gamma * targetQN.predict(_next_state)[0][next_action]
targets[i] = mainQN.predict(_state)
targets[i][_action] = target
mainQN.zerograds()
loss = mainQN(inputs, targets)
loss.backward()
optimizer.update()
def pioritized_experience_replay(mainQN, targetQN, optimizer, memory, gamma):
for i in range(n_epoch):
inputs = np.zeros((batch_size, 4))
targets = np.zeros((batch_size, 2))
sum_absolute_TDerror = memory.get_sum_absolute_TDerror()
generatedrand_list = np.random.uniform(0, sum_absolute_TDerror, batch_size)
generatedrand_list = np.sort(generatedrand_list)
batch_memory = []
idx = -1
tmp_sum_absolute_TDerror = 0
for (i,randnum) in enumerate(generatedrand_list):
while tmp_sum_absolute_TDerror < randnum:
idx += 1
tmp_sum_absolute_TDerror += abs(memory.buffer[idx][4]) + 0.0001
batch_memory.append(memory.buffer[idx][:4])
for i, (_state, _action, _reward, _next_state) in enumerate(batch_memory):
inputs[i] = _state
target = _reward
if not (_next_state.reshape(-1) == np.zeros(NUM_OF_STATES)).all():
nextQ = mainQN.predict(_next_state)[0]
next_action = np.argmax(nextQ) # 最大の報酬を返す行動を選択する
target = _reward + gamma * targetQN.predict(_next_state)[0][next_action]
targets[i] = mainQN.predict(_state)
targets[i][_action] = target
mainQN.zerograds()
loss = mainQN(inputs, targets)
loss.backward()
optimizer.update()
# In[ ]:
mainQN = QNetwork()
targetQN = QNetwork()
memory = Memory(max_size=memory_size)
actor = Actor()
optimizer = optimizers.Adam(alpha=learning_rate)
optimizer.setup(mainQN)
frames = []
losses = []
for episode in range(num_episodes+1):
env.reset()
state, reward, done, _ = env.step(env.action_space.sample())
# state = np.reshape(state, [1, 4])
episode_reward = 0
for t in range(max_number_of_steps):
if (islearned) or (episode == num_episodes):
frames.append(env.render(mode = 'rgb_array'))
action = actor.get_action(state, episode, targetQN)
next_state, reward, done, info = env.step(action)
# next_state = np.reshape(next_state, [1, 4])
# 報酬を設定し、与える
if done:
next_state = np.zeros(state.shape)
if t < goal_average_reward:
reward = -1
else:
reward = 1
else:
reward = 0
episode_reward += 1
memory.add((state, action, reward, next_state), mainQN, targetQN, gamma) # memory update
state = next_state
if (memory.len() > batch_size) and not islearned:
if (episode < 10):
replay(mainQN, targetQN, optimizer, memory, gamma)
else:
pioritized_experience_replay(mainQN, targetQN, optimizer, memory, gamma)
if done:
# mainQN.copyparams(targetQN)
targetQN=mainQN
memory.update_TDerror(mainQN, targetQN, gamma)
total_reward_vec = np.hstack((total_reward_vec[1:], episode_reward)) # 報酬を記録
print('%d Episode finished after %f time steps / mean %f' % (episode, t + 1, total_reward_vec.mean()))
losses.append(total_reward_vec.mean())
break
# 複数施行の平均報酬で終了を判断
if total_reward_vec.mean() >= goal_average_reward:
if (islearned):
break
print('Episode %d train agent successfuly!' % episode)
islearned = 1
# In[ ]:
plt.cla()
ax = plt.gca()
ax.invert_yaxis()
plt.plot(losses)
# In[ ]:
plt.cla()
fig = plt.gcf()
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(fig, animate, frames = len(frames), interval=50)
anim
感想
Chainerすこ.