概要
「ゼロから作る Deep Learning」(斎藤康毅著、O'Reilly Japan, 2016) という素晴らしい教科書を読んで深層学習の基礎を習得したので、身に付けた知識を足がかりに今度は深層強化学習の基礎を学ぼうと思い、深層強化学習が発展するきっかけになった DeepMind の論文
D. Hassabis et al., Human-level control through deep reinforcement learning, Nature, 518(7540):529-533, 2015
を読んで DQN を実装しました。この論文自体は 3 年前のもので、AI 研究の最先端から見れば既に大昔かも知れませんが、深層強化学習の発展はこの論文を端緒とするものであり、はじめに取り組むものとしてはちょうど良いと思います。
「ゼロから作る Deep Learning」の、外部のライブラリ等を用いずに「ゼロから」実装するという方針に倣い、tensorflow や chainer といった深層学習用のフレームワークは用いずに、Numpy だけを使って Python で実装しました。GPU を使った高速化は行っていません。手製の単純な gridworld 的ゲームをプレイするエージェントを DQN のアルゴリズムによって訓練するという内容です。(このゲームは非常に単純なので、うまくプレイするエージェントは by hand で簡単に設計できますが、今回の目的はこのゲームをプレイするエージェントを作ること自体ではなく、DQN のアルゴリズムを実装し、それを用いることで強化学習の問題を解く例を実際に「ゼロから」作ってみる、ということにあります。)
DQN について
強化学習の問題設定 (エージェントが行動を行う環境、その可能な状態の集合 $S$、エージェントが取りうる行動の集合 $A$、エージェントの行動に対して環境が状態をどのように確率的に遷移させるか、そして、エージェントがその行動に対して受け取り、その最大化を目指す報酬の決まり方の設定、など) が与えられると、マルコフ判断過程 (Markov decision process) が定まり、特に、ベルマン作用素と呼ばれる、関数空間 $\mathbb{R}^{S\times A}$ に作用する作用素が定まります。ベルマン作用素は不動点 $Q^{\ast}$ を持ち、状態 $s\in S$ に対して行動 $a^{\ast}(s)=\operatorname{max}_{a\in A}Q^{\ast}(s,a)$ を取ることが最適な行動戦略となることが知られています。
適当に与えた関数を調整することによりこの不動点 $Q^{\ast}$ に近似していくプロセスは Q-learning と呼ばれています。近似のために用いる関数をディープなニューラルネットワーク (Deep Q-Network) によって表し、そのパラメターの調整によってこのプロセスを行うのが deep Q-learning であり、そのネットワーク (およびそれを訓練するアルゴリズム) が DQN です。
実装
手製の単純な gridworld 的ゲームを用意し、それをより上手くプレイするための行動戦略を DQN のアルゴリズムを用いて習得させます。
ゲームの詳細
縦 5 行、横 6 列の 30 個のマス目からなる盤面上を、プレイヤーと二人の敵、一人のゴールが移動していくことで進行するゲームです。
- プレイヤーの初期位置は第 1 列か第 2 列のいずれかのマス目、二人の敵はそれぞれ第 3 列および 第 6 列のいずれかのマス目、ゴールの初期位置は第 4 列または第 5 列のいずれかのマス目としてゲームが開始します。
- プレイヤーは上下左右に動くことができ、敵とゴールは上下のみに動きます。
- ただし、最も上/下の行にいる状態から「上/下に移動」することは、同じ列内で最も下/上の行に移動することを意味するものとします。また、最も右/左の列にいる状態から「右/左に移動」することは、同じ行内で最も左/右の列に移動することを意味するものとします。(つまり、盤面として $(\mathbb{Z}\times\mathbb{Z})/(5\mathbb{Z}\times 6\mathbb{Z})$ を考えることと同じです。)
- プレイヤーは自らの周囲 (左上、真上、右上、真左、真右、左下、真下、右下) のマス目のみを見ることができ、そこに敵またはゴールがいるかどうかの情報を受け取ります。(ただし、最も左の列にいる場合の「左」は最も右の列のことであり、最も上の行から見た「上」は最も下の行のこと、他も同様。)
- また、盤面は 4 つのエリア (第 1,2 列、第 3 列、第 4,5 列、第 6 列) に区切られており、プレイヤーは自分がどのエリアにいるかの情報も受け取ります。
プレイヤーは、移動によって敵のいずれかとぶつかった場合は -1 の、ゴールとぶつかった場合は +1 の報酬を受け取り、それを最終成績としてゲームが終了します。プレイヤーの移動によって敵、ゴールのどちらにもぶつからなかった場合は報酬は 0 です。定められた最大行動可能回数内に敵またはゴールのどちらにもぶつからなかった場合は最終成績を 0 としてゲームが終了します。
このゲームの状態の例は以下のようなものです:
column 1 | column 2 | column 3 | column 4 | column 5 | column 6 |
---|---|---|---|---|---|
player | $\circ$ | . | . | . | $\circ$ |
$\circ$ | $\circ$ | . | . | goal | $\circ$ |
. | . | . | . | . | enemy 2 |
. | . | enemy 1 | . | . | . |
$\circ$ | $\circ$ | . | . | . | $\circ$ |
この状態においては、「$\circ$」で示されたマス目のみがプレイヤーには見えています。また、プレイヤーは自分が 4 つのエリア (columns 1,2; column 3; columns 4,5; column 6) のどれにいるかの情報を受け取ります。Goal と enemies は上下のみに移動しますが、プレイヤーは上下左右に移動できます。プレイヤーは、enemies を避け、動く goal に到達することを目指します。
下記に掲載した実装においては、このゲームの環境を与える「GridWorld」クラスのインスタンスが、ゲームの状態を numpy 配列として保持し、このゲームをプレイするエージェント (「QAgentforGW」クラスのインスタンス) は「GridWorld」のインスタンスが持っているゲーム状態を表す numpy 配列を受けて、「自分の見える範囲」のみの情報が反映されたデータを sensing 関数によって作ります。sensing 関数が返すのは $(s_1,s_2,s_3)$ という三つ組で、$s_1$ と $s_2$ は長さ 8 の one-hot vectors, $s_3$ については長さ 4 の one-hot vector です。 $s_1$ および $s_2$ の 8 つの entries はそれぞれプレイヤーが見ている左上、真上、右上、真左、真右、左下、真下、右下に対応し、enemy および goal がいるマス目だけが 1 となります (自分が見える周囲のどのマス目にも enemy および goal がいない場合は 0 vector となります)。長さ 4 の one-hot vector $s_3$ は自分が 4 つのエリアのどれにいるかを示します。
DQN のアルゴリズムによって、これらの one-hot vectors の意味を理解し、適切な行動を取れるようにエージェントを訓練することが今回の目的となります。
訓練の詳細
このゲームをエージェントとしてプレイする「QAgentforGW」クラスのインスタンスは、状態 s から sensing を施してできる三つ組 $(s_1,s_2,s_3)$ を入力してベクトル
Q(s_1,s_2,s_3)=(Q(s_1,s_2,s_3)_a)_{a\in A}\in\mathbb{R}^A
を出力する (これは、ペア $((s_1,s_2,s_3),a)\in S\times A)$ に対してスカラー値 $Q(s_1,s_2,s_3)_a$ を返すことと同じことです) 関数 $Q$ を持ち、$Q(s_1,s_2,s_3)_a$ が最大となる行動 $a$ を選びます。関数 $Q$ はニューラルネットワーク「qnet」とそのパラメター「qnetparams」によって決定されます。このパラメターを、原論文の Algorithm 1 をに従って更新していくことによって学習が行われます。
今回はこの「qnet」として「DQN」クラスが与えるニューラルネットワークを持たせた 「QAgtentforGW」のインスタンスを、「DQNtrainer」クラスのインスタンスによって訓練します。ニューラルネットワークの初期パラメターは正規分布 (平均 0, 標準偏差は各レイヤごとに定める) からランダムにサンプルすることで与えます。原論文とは異なり、パラメターの更新は学習率 0.1 単純な SDG によって行います。ただし、error clipping を行うことや $\epsilon$ を線形に減衰させることなどは原論文と同じです。
(「DQNtrainer」クラスは、このゲーム以外の環境およびエージェントにも (それらを定めるクラスが、今回扱った「GridWorld」クラスおよび「QAgentforGW」クラスと同じインターフェースを持つ限り) 対応できるように実装されています。使用するゲーム環境を決め、deep Q-network を設計し、それによって計算される $Q$ 関数を持ったエージェントを入れて、適切なハイパーパラメターを設定すれば、単純な SGD なので効率的とは限りませんが、訓練は行えます。)
コード
以下にコードを載せます。
import numpy as np
# Game environment.
class GridWorld:
def __init__(self,batch_size=1,maxsteps=30):
self.batch_size=batch_size
self.state=np.zeros([batch_size,4,2])
self.state[:,0,0]=np.random.choice(5,batch_size)# Player's y coordinate.
self.state[:,0,1]=0+np.random.choice(2,batch_size)# Player's x coordinate.
self.state[:,1,0]=np.random.choice(5,batch_size)# Enemy1's y coordinate.
self.state[:,1,1]=2# Enemy1's x coordinate.
self.state[:,2,0]=np.random.choice(5,batch_size)# Enemy2's y coordinate.
self.state[:,2,1]=5# Enemy2's x coordinate.
self.state[:,3,0]=np.random.choice(5,batch_size)# Goal's y coordinate.
self.state[:,3,1]=3+np.random.choice(2,batch_size)# Goal's x coordinates.
self.step=np.zeros(batch_size)
self.score=np.zeros(batch_size)
self.flag=(np.ones(batch_size)).astype(np.bool)
self.maxsteps=maxsteps
def transition(self,action):
self.state[self.flag,0,0]=(self.state[self.flag,0,0]-action[self.flag,0]+action[self.flag,1])%5
self.state[self.flag,0,1]=(self.state[self.flag,0,1]-action[self.flag,2]+action[self.flag,3])%6
self.state[self.flag,1,0]=(self.state[self.flag,1,0]+(-1)**np.random.choice(2,np.sum(self.flag)))%5
self.state[self.flag,2,0]=(self.state[self.flag,2,0]+(-1)**np.random.choice(2,np.sum(self.flag)))%5
self.state[self.flag,3,0]=(self.state[self.flag,3,0]+(-1)**np.random.choice(2,np.sum(self.flag)))%5
flagenm1=np.prod(self.state[:,0,:]==self.state[:,1,:],axis=1)
flagenm2=np.prod(self.state[:,0,:]==self.state[:,2,:],axis=1)
flaggoal=np.prod(self.state[:,0,:]==self.state[:,3,:],axis=1)
self.score[self.flag]=flaggoal[self.flag]-flagenm1[self.flag]-flagenm2[self.flag]
self.flag[flagenm1.astype(np.bool)]=False
self.flag[flagenm2.astype(np.bool)]=False
self.flag[flaggoal.astype(np.bool)]=False
self.step[self.flag]+=1
self.flag[self.step==self.maxsteps]=False
def initialize(self,flag=None):
if flag==None:
flag=np.ones(self.flag.size).astype(np.bool)
self.state[flag,0,0]=np.random.choice(5,np.sum(flag))
self.state[flag,0,1]=0+np.random.choice(2,np.sum(flag))
self.state[flag,1,0]=np.random.choice(5,np.sum(flag))
self.state[flag,1,1]=2
self.state[flag,2,0]=np.random.choice(5,np.sum(flag))
self.state[flag,2,1]=5
self.state[flag,3,0]=np.random.choice(5,np.sum(flag))
self.state[flag,3,1]=3+np.random.choice(2,np.sum(flag))
self.score[flag]=0
self.step[flag]=0
self.flag[flag]=True
def sampleplays(self):
samples={}
samples['s_t']=self.state.copy()
samples['r_t']=-self.score.copy()
samples['a_t']=np.zeros([self.batch_size,4])
samples['a_t'][np.arange(0,self.batch_size,1),np.random.choice(4,self.batch_size)]=1
self.transition(samples['a_t'])
samples['r_t']+=self.score.copy()
samples['s_t+1']=self.state.copy()
samples['flag_t+1']=self.flag.copy()
for i in np.arange(1,self.maxsteps,1):
flag=self.flag.copy()
state=self.state[flag].copy()
samples['s_t']=np.vstack((samples['s_t'],state.copy()))
reward=-self.score[flag].copy()
a=np.zeros([self.batch_size,4])
idx=np.random.choice(4,self.batch_size)
a[flag,idx[flag]]=1
samples['a_t']=np.vstack((samples['a_t'],a[flag]))
self.transition(a)
reward+=self.score[flag].copy()
samples['r_t']=np.append(samples['r_t'],reward.copy())
samples['s_t+1']=np.vstack((samples['s_t+1'],self.state[flag].copy()))
samples['flag_t+1']=np.append(samples['flag_t+1'],self.flag[flag].copy())
self.initialize()
return samples
# DQN agent to play the GridWorld game.
class QAgentforGW:
def __init__(self,qnet,qnetparams=None):
self.qnet=qnet
if qnetparams==None:
self.qnetparams=qnet.initialparams()
else:
self.qnetparams=qnetparams
self.projector=Projection()
self.Q=(lambda s:self.qnet.forward(s,self.qnetparams))
self.qvalues=None
self.qvalue=None
def sensing(self,state,flag):
player=state[flag,0,:].copy()
enemy1=state[flag,1,:].copy()
enemy2=state[flag,2,:].copy()
goal=state[flag,3,:].copy()
vert=np.zeros_like(player)
vert[:,0]=1
horz=np.zeros_like(player)
horz[:,1]=1
esensor=np.zeros([np.sum(flag),8])
esensor[:,0]=np.prod((player-vert)%np.array([5,6])==enemy1,axis=1)+np.prod((player-vert)%np.array([5,6])==enemy2,axis=1)
esensor[:,1]=np.prod((player+vert)%np.array([5,6])==enemy1,axis=1)+np.prod((player+vert)%np.array([5,6])==enemy2,axis=1)
esensor[:,2]=np.prod((player-horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player-horz)%np.array([5,6])==enemy2,axis=1)
esensor[:,3]=np.prod((player+horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player+horz)%np.array([5,6])==enemy2,axis=1)
esensor[:,4]=np.prod((player-vert-horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player-vert-horz)%np.array([5,6])==enemy2,axis=1)
esensor[:,5]=np.prod((player-vert+horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player-vert+horz)%np.array([5,6])==enemy2,axis=1)
esensor[:,6]=np.prod((player+vert-horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player+vert-horz)%np.array([5,6])==enemy2,axis=1)
esensor[:,7]=np.prod((player+vert+horz)%np.array([5,6])==enemy1,axis=1)+np.prod((player+vert+horz)%np.array([5,6])==enemy2,axis=1)
gsensor=np.zeros([np.sum(flag),8])
gsensor[:,0]=np.prod((player-vert)%np.array([5,6])==goal,axis=1)
gsensor[:,1]=np.prod((player+vert)%np.array([5,6])==goal,axis=1)
gsensor[:,2]=np.prod((player-horz)%np.array([5,6])==goal,axis=1)
gsensor[:,3]=np.prod((player+horz)%np.array([5,6])==goal,axis=1)
gsensor[:,4]=np.prod((player-vert-horz)%np.array([5,6])==goal,axis=1)
gsensor[:,5]=np.prod((player-vert+horz)%np.array([5,6])==goal,axis=1)
gsensor[:,6]=np.prod((player+vert-horz)%np.array([5,6])==goal,axis=1)
gsensor[:,7]=np.prod((player+vert+horz)%np.array([5,6])==goal,axis=1)
areainf=np.zeros([np.sum(flag),4])
areainf[:,0]=(player[:,1]<2).astype(np.float)
areainf[:,1]=(player[:,1]==2).astype(np.float)
areainf[:,2]=(player[:,1]>2).astype(np.float)*(player[:,1]<5).astype(np.float)
areainf[:,3]=(player[:,1]==5).astype(np.float)
return esensor,gsensor,areainf
def action(self,state,flag,idx=None):
s=self.sensing(state,flag)
self.qvalues=self.Q(s)
if idx==None:
idx=np.zeros(flag.size).astype(np.int)
idx[flag]=np.argmax(self.qvalues,axis=1)
a=np.zeros([flag.size,4])
a[flag,idx[flag]]=1
self.qvalue=self.projector.forward(self.qvalues,idx[flag])
return a
def explore(self,state,flag,epsilon=0):
s=self.sensing(state,flag)
self.qvalues=self.Q(s)
a=np.zeros([flag.size,4])
idx=np.random.choice(4,np.sum(flag))
u=np.random.uniform(size=np.sum(flag))
idx[epsilon<u]=np.argmax(self.qvalues[epsilon<u],axis=1)
a[flag,idx]=1
self.qvalue=self.projector.forward(self.qvalues,idx)
return a
class DQN:
def __init__(self):
self.layers={}
self.layers['w2v1']=Linear()
self.layers['w2v2']=Linear()
self.layers['w2v3']=Linear()
self.layers['afa1']=Affine()
self.layers['afa2']=Affine()
self.layers['adda']=Addition()
self.layers['tnha']=Tanh()
self.layers['gtva']=Gater()
self.layers['mlta']=Multiplication()
self.layers['tnhb']=Tanh()
self.layers['gtvb']=Gater()
self.layers['mltb']=Multiplication()
self.layers['addc']=Addition()
self.layers['ltaf']=Affine()
def forward(self,s,params):
z={}
z['w2v1']=self.layers['w2v1'].forward(s[0],params['w2v1'])
z['w2v2']=self.layers['w2v2'].forward(s[1],params['w2v2'])
z['w2v3']=self.layers['w2v3'].forward(s[2],params['w2v3'])
z['afa1']=self.layers['afa1'].forward(z['w2v1'],params['afa1'])
z['afa2']=self.layers['afa2'].forward(z['w2v2'],params['afa2'])
z['adda']=self.layers['adda'].forward(z['afa1'],z['afa2'])
z['tnha']=self.layers['tnha'].forward(z['adda'])
z['gtva']=self.layers['gtva'].forward(z['w2v1'],z['w2v2'],z['w2v3'],params['gtva'])
z['mlta']=self.layers['mlta'].forward(z['tnha'],z['gtva'])
z['tnhb']=self.layers['tnhb'].forward(z['w2v3'])
z['gtvb']=self.layers['gtvb'].forward(z['w2v1'],z['w2v2'],z['w2v3'],params['gtvb'])
z['mltb']=self.layers['mltb'].forward(z['tnhb'],z['gtvb'])
z['addc']=self.layers['addc'].forward(z['mlta'],z['mltb'])
z['ltaf']=self.layers['ltaf'].forward(z['addc'],params['ltaf'])
return z['ltaf']
def gradient(self,E):
grad={}
Edict={}
Edict['ltaf']=self.layers['ltaf'].backprop(E)
Edict['addc']=self.layers['addc'].backprop(Edict['ltaf'][0])
Edict['mlta']=self.layers['mlta'].backprop(Edict['addc'][0])
Edict['mltb']=self.layers['mltb'].backprop(Edict['addc'][1])
Edict['tnha']=self.layers['tnha'].backprop(Edict['mlta'][0])
Edict['tnhb']=self.layers['tnhb'].backprop(Edict['mltb'][0])
Edict['adda']=self.layers['adda'].backprop(Edict['tnha'])
Edict['afa1']=self.layers['afa1'].backprop(Edict['adda'][0])
Edict['afa2']=self.layers['afa2'].backprop(Edict['adda'][1])
Edict['gtva']=self.layers['gtva'].backprop(Edict['mlta'][1])
Edict['gtvb']=self.layers['gtvb'].backprop(Edict['mltb'][1])
Edict['w2v1']=self.layers['w2v1'].backprop(Edict['gtva'][0]+Edict['gtvb'][0]+Edict['afa1'][0])
Edict['w2v2']=self.layers['w2v2'].backprop(Edict['gtva'][1]+Edict['gtvb'][1]+Edict['afa2'][0])
Edict['w2v3']=self.layers['w2v3'].backprop(Edict['gtva'][2]+Edict['gtvb'][2]+Edict['tnhb'])
grad['w2v1']=Edict['w2v1'][1]
grad['w2v2']=Edict['w2v2'][1]
grad['w2v3']=Edict['w2v3'][1]
grad['gtva']=Edict['gtva'][3]
grad['gtvb']=Edict['gtvb'][3]
grad['afa1']=Edict['afa1'][1]
grad['afa2']=Edict['afa2'][1]
grad['ltaf']=Edict['ltaf'][1]
return grad
def initialparams(self):
params={}
for key in self.layers.keys():
params[key]={}
params['w2v1']['W']=np.random.randn(8,40)
params['w2v2']['W']=np.random.randn(8,40)
params['w2v3']['W']=np.random.randn(4,12)
params['afa1']['W']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['afa1']['b']=np.zeros(12)
params['afa2']['W']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['afa2']['b']=np.zeros(12)
params['ltaf']['W']=np.sqrt(1.0/12.0)*np.random.randn(12,4)
params['ltaf']['b']=np.zeros(4)
params['gtva']['1']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['gtva']['2']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['gtva']['3']=np.sqrt(1.0/12.0)*np.random.randn(12,12)
params['gtvb']['1']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['gtvb']['2']=np.sqrt(1.0/40.0)*np.random.randn(40,12)
params['gtvb']['3']=np.sqrt(1.0/12.0)*np.random.randn(12,12)
return params
class DQNtrainer:
def __init__(self,iter_num,target_update_frequency,batchsize,memorycapacity,gamma,agent,environment,params_update_rate=0.1,epsilon_annealing_rate=1.0/1000.0):
self.M=iter_num
self.C=target_update_frequency
self.batchsize=batchsize
self.gamma=gamma
self.N=memorycapacity
self.agent=agent
self.targetparams={}
for layer in self.agent.qnetparams.keys():
self.targetparams[layer]={}
for key in self.agent.qnetparams[layer].keys():
self.targetparams[layer][key]=self.agent.qnetparams[layer][key].copy()
self.env=environment
self.memory={}
for i in range(20):
self.memory[i]={}
D=self.env.sampleplays()
self.memory[i]['s_t']=D['s_t'].copy()
self.memory[i]['a_t']=D['a_t'].copy()
self.memory[i]['r_t']=D['r_t'].copy()
self.memory[i]['s_t+1']=D['s_t+1'].copy()
self.memory[i]['flag_t+1']=D['flag_t+1'].copy()
while self.memory[i]['r_t'].size<self.N:
D=self.env.sampleplays()
self.memory[i]['s_t']=np.vstack((self.memory[i]['s_t'],D['s_t']))# Game states should be numpy arrays. (Any shape is acceptable but axis 0 should be reserved for batching.)
self.memory[i]['a_t']=np.vstack((self.memory[i]['a_t'],D['a_t']))# Player actions should be numpy arrays (of any shape, but typically of one-hot vectors. Axis 0 should be reserved for batching.)
self.memory[i]['r_t']=np.append(self.memory[i]['r_t'],D['r_t'])
self.memory[i]['s_t+1']=np.vstack((self.memory[i]['s_t+1'],D['s_t+1']))
self.memory[i]['flag_t+1']=np.append(self.memory[i]['flag_t+1'],D['flag_t+1'])
if self.memory[i]['r_t'].size>self.N:
break
self.pur=params_update_rate
self.ear=epsilon_annealing_rate
def train(self):
epsilon=1.0
t=0
for i in range(self.M):
self.env.initialize()
while np.sum(self.env.flag)>0:
flag=self.env.flag.copy()
memoryboxidx=np.random.choice(20)
labels=np.random.choice(self.memory[memoryboxidx]['r_t'].size,np.sum(flag))# Choosing memories to replace with new ones.
self.memory[memoryboxidx]['s_t'][labels]=self.env.state[flag].copy()
a=self.agent.explore(self.env.state,self.env.flag,epsilon=epsilon)
self.memory[memoryboxidx]['a_t'][labels]=a[flag].copy()
self.env.transition(a)
self.memory[memoryboxidx]['r_t'][labels]=self.env.score[flag].copy()
self.memory[memoryboxidx]['s_t+1'][labels]=self.env.state[flag].copy()
memoryboxidx=np.random.choice(20)
labels=np.random.choice(self.memory[memoryboxidx]['r_t'].size,self.batchsize)
state=self.memory[memoryboxidx]['s_t+1'][labels].copy()
self.agent.Q=(lambda s:self.agent.qnet.forward(s,self.targetparams))
self.agent.action(state,np.ones(self.memory[memoryboxidx]['r_t'][labels].size).astype(np.bool))
y=self.memory[memoryboxidx]['r_t'][labels]+self.gamma*self.memory[memoryboxidx]['flag_t+1'][labels]*self.agent.qvalue
self.agent.Q=(lambda s:self.agent.qnet.forward(s,self.agent.qnetparams))
losslayer=MeanSquareErrorwithErrorClipping()
state=self.memory[memoryboxidx]['s_t'][labels].copy()
self.agent.action(state,np.ones(self.memory[memoryboxidx]['r_t'][labels].size).astype(np.bool),idx=np.argmax(self.memory[memoryboxidx]['a_t'][labels],axis=1))
loss=losslayer.forward(self.agent.qvalue,y)
E=1.0
E=losslayer.backprop(E)
E=self.agent.projector.backprop(E)
grad=self.agent.qnet.gradient(E)
for layer in self.agent.qnetparams.keys():
for key in self.agent.qnetparams[layer].keys():
self.agent.qnetparams[layer][key]-=self.pur*grad[layer][key]
if t%self.C==0:
self.targetparams[layer][key]=self.agent.qnetparams[layer][key].copy()
t+=1
epsilon=max(0.1,1-i*self.ear)
print("Average score for the "+str(i)+"-th episode: "+str(np.sum(self.env.score)/float(self.env.score.size)))
return self.agent
class Projection:
def __init__(self):
self.idx=None
self.inputshape=None
def forward(self,z,idx):
self.idx=idx.copy()
self.inputshape=z.shape
return z[np.arange(0,self.inputshape[0],1),idx]
def backprop(self,E):
Jac_z=np.zeros(self.inputshape)
Jac_z[np.arange(0,self.inputshape[0],1),self.idx]=1
return Jac_z*E.reshape(-1,1)
class MeanSquareErrorwithErrorClipping:
def __init__(self):
self.N=None
self.error=None
def forward(self,z,y):
self.N=z.size
self.error=y-z
return (1.0/self.N)*(np.sum(self.error[np.abs(self.error)<=1]**2)+np.sum(self.error[self.error>1])-np.sum(self.error[self.error<-1]))
def backprop(self,E):
E=np.ones_like(self.error)*E/float(self.N)
u=np.zeros_like(self.error)
u[self.error>1]=1
u[self.error<-1]=-1
u[np.abs(self.error)<=1]=2*self.error[np.abs(self.error)<=1].copy()
return -u*E
class Affine:
def __init__(self):
self.z=None
self.W=None
self.b=None
def forward(self,z,params):
self.z=z.copy()
self.W=params['W'].copy()
self.b=params['b'].copy()
return np.dot(self.z,self.W)+self.b
def backprop(self,E):
Jac_zE=np.dot(E,self.W.T)
Jac_WE=np.dot(self.z.T,E)
Jac_bE=np.sum(E,axis=0)
Jac_params={}
Jac_params['W']=Jac_WE
Jac_params['b']=Jac_bE
return Jac_zE,Jac_params
class Linear:
def __init__(self):
self.z=None
self.W=None
def forward(self,z,params):
self.z=z.copy()
self.W=params['W'].copy()
return np.dot(z,self.W)
def backprop(self,E):
Jac_zE=np.dot(E,self.W.T)
Jac_WE=np.dot(self.z.T,E)
Jac_params={}
Jac_params['W']=Jac_WE
return Jac_zE,Jac_params
class Gater:
def __init__(self):
self.s1=None
self.s2=None
self.s3=None
self.W1=None
self.W2=None
self.W3=None
self.sigm=Sigmoid()
def forward(self,s1,s2,s3,params):
self.s1=s1.copy()
self.s2=s2.copy()
self.s3=s3.copy()
self.W1=params['1'].copy()
self.W2=params['2'].copy()
self.W3=params['3'].copy()
return self.sigm.forward(np.dot(s1,self.W1)+np.dot(s2,self.W2)+np.dot(s3,self.W3))
def backprop(self,E):
E=self.sigm.backprop(E)
Jac_s1E=np.dot(E,self.W1.T)
Jac_s2E=np.dot(E,self.W2.T)
Jac_s3E=np.dot(E,self.W3.T)
Jac_params={}
Jac_params['1']=np.dot(self.s1.T,E)
Jac_params['2']=np.dot(self.s2.T,E)
Jac_params['3']=np.dot(self.s3.T,E)
return Jac_s1E,Jac_s2E,Jac_s3E,Jac_params
class ReLU:
def __init__(self):
self.mask=None
def forward(self,z):
self.mask=z>0
return z*self.mask
def backprop(self,E):
return self.mask*E
class Sigmoid:
def __init__(self):
self.y=None
def forward(self,z):
self.y=1/(1+np.exp(-z))
return self.y
def backprop(self,E):
return (1-self.y)*self.y*E
class Tanh:
def __init__(self):
self.y=None
def forward(self,z):
self.y=np.tanh(z)
return self.y
def backprop(self,E):
return (1-self.y**2)*E
class Addition:
def __init__(self):
pass
def forward(self,z1,z2):
return z1+z2
def backprop(self,E):
return 1*E,1*E
class Multiplication:
def __init__(self):
self.z1=None
self.z2=None
def forward(self,z1,z2):
self.z1=z1.copy()
self.z2=z2.copy()
return z1*z2
def backprop(self,E):
Jac_z1E=self.z2*E
Jac_z2E=self.z1*E
return Jac_z1E,Jac_z2E
# Function to play the GridWorld game with chosen agent.
def playGW(agent,batch_size=100,maxsteps=30):
env=GridWorld(batch_size,maxsteps)
footpath=np.zeros([batch_size,5,6]).astype(np.int)
for t in range(env.maxsteps):
if agent=='random':
a=np.zeros([batch_size,4])
idx=np.random.choice(4,batch_size)
a[env.flag,idx[env.flag]]=1
else:
a=agent.action(env.state,env.flag)
env.transition(a)
footpath[np.arange(0,batch_size,1),env.state[np.arange(0,batch_size,1),0,0].astype(np.int),env.state[np.arange(0,batch_size,1),0,1].astype(np.int)]+=1
print(env.score.reshape(batch_size,-1))
print(footpath)
return np.sum(env.score)/float(batch_size)
# Demonstration.
print("Scores and footpaths of sample random plays:")
avscore=playGW(agent='random')
print("Average score:")
print(avscore)
print("Performance of a DQN agent before training:")
agentbeforetraining=QAgentforGW(qnet=DQN())
print("Scores and footpaths:")
avscore=playGW(agentbeforetraining)
print("Average score:")
print(avscore)
trainer=DQNtrainer(iter_num=1000,target_update_frequency=1500,batchsize=100,memorycapacity=3240,gamma=0.999,agent=agentbeforetraining,environment=GridWorld(batch_size=500))# 3240 is 10-times the number of all possible agent-sensed states: 3240=10*(9*9*4).
print("Training begins:")
trainedagent=trainer.train()
print("Training done! Now showing the performance of the trained agent:")
print("Scores and footpaths:")
avscore=playGW(trainedagent)
print("Average score:")
print(avscore)
実行結果
コードを実行すると、はじめに、完全にランダムに行動を選ぶことでゲームをプレイした場合の成績とプレイヤーの行動した経路が表示されます。(行動経路は、定められた最大行動可能回数 (ここでは 30) のうちに各マス目をプレイヤーが何回通ったかを示す $5\times 6$ の表として表示されます。敵またはゴールにぶつかったマス目には $(\text{最大行動可能回数})-(\text{実際に要した行動回数})$ が加算されて表示されます。Column 3 または 6 に一つだけ高い数字のマス目があればそこで敵にぶつかった可能性が高く、column 4 または 5 に一つだけ高い数字のマス目があればそこでゴールに到達したであろうと想像できます。) バッチ処理を行なっており、100 個のプレイの結果が一斉に表示されます。ゴールに到達するよりも敵にぶつかることの方が多く、成績の平均は大体 $-0.5$ から $-0.4$ 程度のようです。
ランダムなプレイの結果の次に、「QAgentforGW」のインスタンスで与えられるエージェントが訓練を受ける前にゲームをプレイした結果が表示されます。パラメターの初期値によって結果は異なり、ランダムなプレイと同様の結果を示す場合もあれば、ひたすら同じ行動を繰り返しているような場合もあります。(後者の場合、例えば常に上または下に移動することだけを繰り返すようなケースでは、敵にもゴールにもぶつからないため、成績は全て 0 となります。)
その次に訓練が開始されます。訓練中のプレイの平均成績が表示されながら訓練が進行します。訓練が完了するまで 3, 4 分程度かかります。
最後に訓練を経たエージェントがゲームをプレイした結果が表示されます。敵にぶつかって終わるプレイはほぼ無くなり、ゴールに到達して終わるものが圧倒的に多く、平均成績は大体 0.8 から 0.9 程度になります。