はじめに
深層学習と強化学習を組み合わせた Deep Q Network、通称DQNでOpenAI GymのClassic controlを解くプログラムを作ってみました。
今回はその実装について紹介したいと思います。
DQN自体については
の記事がとてもわかりやすく、私もこちらで紹介されている論文やGitHubのコードを参考に実装しました。
強化学習やDQNの理論を知りたい方はこちらをご参考ください。
DQN"もどき"?
Deep Q Networkという名前からも分かる通り、DQNは強化学習の1つであるQ学習を多層ニューラルネットで関数近似します。
それに加え、下記の3つの手法を取り入れて初めてDQNと呼べるみたいです。
- Experience Replay
- Fixed Target Q-Network
- 報酬のClipping
今回私が実装した手法は1,2のみで、3の報酬のClippingは行いませんでした。つまり正確に言うとDQNではないのです。なのでDQN"もどき"と題させていただきました。
OpenAI Gymについて
強化学習のための環境を簡単に構築できるオープンソースのプラットフォームです。
pythonのライブラリとなっており、
$ pip install gym
で簡単にインストールできます。
詳しくは公式サイトをご覧ください。
実装
では実装したコードの紹介をしたいと思います。
ここでお見せするコードは省略している部分もあるので、全体はこちらからご確認下さい。
ニューラルネットワーク
Chainerを使って実装しました。
100ユニットが3層、活性化関数はLeaky ReLUにしました。
class Neuralnet(Chain):
def __init__(self, n_in, n_out):
super(Neuralnet, self).__init__(
L1 = L.Linear(n_in, 100),
L2 = L.Linear(100, 100),
L3 = L.Linear(100, 100),
Q_value = L.Linear(100, n_out, initialW=np.zeros((n_out, 100), dtype=np.float32))
)
def Q_func(self, x):
h = F.leaky_relu(self.L1(x))
h = F.leaky_relu(self.L2(h))
h = F.leaky_relu(self.L3(h))
h = self.Q_value(h)
return h
Agent
強化学習のエージェント部分の実装です。
パラメータ設定
強化学習を行う上でのパラメーターを定義しています。
先ほど紹介したニューラルネットワークも読み込んだ状態数と行動数に合わせて定義しています。そしてFixed Target Q-Networkのため、作成したQ関数をdeepcopyしておきます。つまりQ関数が2つになるわけですね。初めはこの部分が理解できず苦労しました・・・
class Agent():
def __init__(self, n_st, n_act, seed):
self.n_act = n_act
self.model = Neuralnet(n_st, n_act)
self.target_model = copy.deepcopy(self.model)
self.optimizer = optimizers.Adam()
self.optimizer.setup(self.model)
self.memory = deque()
self.loss = 0
self.step = 0
self.gamma = 0.99 # 割引率
self.mem_size = 1000 # Experience Replayのために覚えておく経験の数
self.batch_size = 100 # Experience Replayの際のミニバッチの大きさ
self.train_freq = 10 # ニューラルネットワークの学習間隔
self.target_update_freq = 20 # ターゲットネットワークの同期間隔
# ε-greedy
self.epsilon = 1 # εの初期値
self.epsilon_decay = 0.005 # εの減衰値
self.epsilon_min = 0 # εの最小値
self.exploration = 1000 # εを減衰し始めるまでのステップ数(今回はメモリーが貯まるまで)
経験の蓄積
Experience Replayの為
- 状態: st
- 行動: act
- 報酬: r
- 次の状態: st_dash
- エピソード終了の有無: ep_end
の5つの要素を経験としてタプルにしてmemoryに保存しています。最初に定義したmemoryサイズを超えると先に入れたものからトコロテン式に捨てられる形になっています。
最初memoryはただのリストにしていたのですが、両端のappendやpopを拘束で行えるdequeなるものが存在すると聞いたので、そちらを利用させていただきました。
def stock_experience(self, st, act, r, st_dash, ep_end):
self.memory.append((st, act, r, st_dash, ep_end))
if len(self.memory) > self.mem_size:
self.memory.popleft()
Experience Replay
DQNで重要な手法の1つであるExperience Replayの実装部分です。
貯めてきたmemoryをシャッフルし、定義したミニバッチサイズで切り出して学習していきます。
def suffle_memory(self):
mem = np.array(self.memory)
return np.random.permutation(mem)
def parse_batch(self, batch):
st, act, r, st_dash, ep_end = [], [], [], [], []
for i in xrange(self.batch_size):
st.append(batch[i][0])
act.append(batch[i][1])
r.append(batch[i][2])
st_dash.append(batch[i][3])
ep_end.append(batch[i][4])
st = np.array(st, dtype=np.float32)
act = np.array(act, dtype=np.int8)
r = np.array(r, dtype=np.float32)
st_dash = np.array(st_dash, dtype=np.float32)
ep_end = np.array(ep_end, dtype=np.bool)
return st, act, r, st_dash, ep_end
def experience_replay(self):
mem = self.suffle_memory()
perm = np.array(xrange(len(mem)))
for start in perm[::self.batch_size]:
index = perm[start:start+self.batch_size]
batch = mem[index]
st, act, r, st_d, ep_end = self.parse_batch(batch)
self.model.zerograds()
loss = self.forward(st, act, r, st_d, ep_end)
loss.backward()
self.optimizer.update()
Q関数更新部分
ニューラルネットワークを使ったQ関数の更新部分です。
次の状態(st_dash)のQ値の最大値を計算する部分ではコピーしたQ関数(self.target_model.Q_func)を使うところが重要です。
def forward(self, st, act, r, st_dash, ep_end):
s = Variable(st)
s_dash = Variable(st_dash)
Q = self.model.Q_func(s)
tmp = self.target_model.Q_func(s_dash)
tmp = list(map(np.max, tmp.data))
max_Q_dash = np.asanyarray(tmp, dtype=np.float32)
target = np.asanyarray(copy.deepcopy(Q.data), dtype=np.float32)
for i in xrange(self.batch_size):
target[i, act[i]] = r[i] + (self.gamma * max_Q_dash[i]) * (not ep_end[i])
loss = F.mean_squared_error(Q, Variable(target))
return loss
ここでlossを計算する際に、Q値とtargetの差を-1~1にクリップすると学習が早くなるようですが、勉強不足で理論が理解できなかった為実装できませんでした(くやしい...
行動を返す
学習したQ関数に従って入力された状態の時に取るべき行動を返す部分です。行動選択の手法はε-greedyを使っています。
def get_action(self, st):
if np.random.rand() < self.epsilon:
return np.random.randint(0, self.n_act)
else:
s = Variable(st)
Q = self.model.Q_func(s)
Q = Q.data[0]
a = np.argmax(Q)
return np.asarray(a, dtype=np.int8)
学習を進める
memoryが十分に溜まったら学習を進める部分です。
毎回stepを刻んでおり、一定周期でtarget用のQ関数を同期しています。
また、ある程度探索を終えると毎step毎にεが減衰していきます。
def reduce_epsilon(self):
if self.epsilon > self.epsilon_min and self.exploration < self.step:
self.epsilon -= self.epsilon_decay
def train(self):
if len(self.memory) >= self.mem_size:
if self.step % self.train_freq == 0:
self.experience_replay()
self.reduce_epsilon()
if self.step % self.target_update_freq == 0:
self.target_model = copy.deepcopy(self.model)
self.step += 1
実行部分
Classic controlの環境名を入れると状態数や行動数を勝手に判断してくれるように作ってみました
ちょっとごちゃごちゃして逆にわかりづらくなっちゃったかもしれません^^;
def main(env_name):
env = gym.make(env_name)
view_path = "./video/" + env_name
n_st = env.observation_space.shape[0]
if type(env.action_space) == gym.spaces.discrete.Discrete:
# CartPole-v0, Acrobot-v0, MountainCar-v0
n_act = env.action_space.n
action_list = range(0, n_act)
elif type(env.action_space) == gym.spaces.box.Box:
# Pendulum-v0
action_list = [np.array([a]) for a in [-2.0, 2.0]]
n_act = len(action_list)
agent = Agent(n_st, n_act, seed)
env.monitor.start(view_path, video_callable=None, force=True, seed=seed)
for i_episode in xrange(1000):
observation = env.reset()
for t in xrange(200):
env.render()
state = observation.astype(np.float32).reshape((1,n_st))
act_i = agent.get_action(state)
action = action_list[act_i]
observation, reward, ep_end, _ = env.step(action)
state_dash = observation.astype(np.float32).reshape((1,n_st))
agent.stock_experience(state, act_i, reward, state_dash, ep_end)
agent.train()
if ep_end:
break
env.monitor.close()
実行結果
OpenAI Gymに結果をアップロードしました
AcrobotやPendulumはなかなか良い結果だと思いますが、CartPoleが微妙ですね。
target用Q関数の更新頻度やεの減衰の大きさ、最適化手法などによって結果は色々と変わってくるみたいです。面白い!
おわりに
今後はAtariのゲームで試したいです。その時は報酬のClippingも考える必要がありそうですね。正規化やDropOutとかも考慮したほうがいいのかな?