LoginSignup
6
12

More than 5 years have passed since last update.

ゼロから作る A3C

Posted at

概要

深層強化学習技術の基礎を学ぶ一環として、前回の記事では DQN をゼロから (すなわち、外部のライブラリや既成の深層学習フレームワークを用いずに) 実装しましたが、今回はより強力な深層強化学習アルゴリズムである A3C を、同様にゼロから実装します。原論文に従ってアルゴリズムを実装し、在庫管理問題に適用して結果を見ます。また、DQN と比べたときに A3C の方が優れている理由の一つとして、連続的な行動空間を伴う強化学習課題に対しても使うことができるという点があるため、同じ課題を離散的、連続的な行動選択によってこなす二種類のエージェントをそれぞれ訓練します。

(今回扱った在庫管理問題自体は、非線形のニューラルネットや A3C を用いなくても解くことができるでしょう。しかし、この記事の趣旨は A3C というアルゴリズムを実装し、それを用いて強化学習問題を解く例を実際にゼロから作ってみるという点にあり、手早く実験結果をみることができる例として在庫管理問題を選んでいます。ここで掲載している A3C の実装は、他の強化学習問題に対しても利用できるように書いてあります。)

A3C について

A3C は DeepMind の論文

V. Mnih et al., Asynchronous methods for deep reinforcement learning, in: Proceedings of ICML 2016, pp. 1928-1937, 2016

で提案された深層強化学習アルゴリズムです。登場して 2 年ほど経ちましたが、最近でも広く使われている印象があります。

状態空間 $S$ および行動空間 $A$ を持つ強化学習問題に対して、DQN ではベルマン作用素の不動点として定まる関数 $Q^\ast\in\mathbb{R}^{S\times A}$ をニューラルネットワークで近似し、状態 $s\in S$ に対して $Q^\ast(s,a)$ が最大となるような行動 $a\in A$ を選択することによって行動していました。特に、DQN においてはニューラルネットワークが計算するのは $Q^\ast$ という抽象的な関数であり、それをより正確に近似するためにパラメターの調整を行うことで訓練していました。対して、A3C においては、状態 $s\in S$ に対してエージェントがそれに従って行動を選択する $A$ 上の確率分布 $\pi(-|s)$ (つまり、状態 $s$ において行動 $a$ を $\pi(a|s)$ の確率で選択する) をニューラルネットワークによって直接計算し、その確率分布の定まり方を調整していくことによって訓練を行います。

(DQN においては、$Q^\ast(s,a)$ が最大となる $a$ を決定する必要があり、(全ての $a^{\prime}\in A$ に対して $Q^{\ast}(a^{\prime},s)$ を見る以外の方法がなければ) $A$ が無限集合のときには計算できなくなります。これに対して、A3C においては (より一般に、方策勾配法 (policy gradient method) に分類される強化学習アルゴリズムにおいては) 確率分布 $\pi(-|s)$ を決定するだけで良く、全ての $a\in A$ に対して $\pi(a|s)$ を直接計算する必要は必ずしもないため、$A$ が無限集合の場合でも使用することができます。)

より具体的には、A3C においては

  • 状態 $s$ から先、現在の行動方策に従って行動を行なった場合に得られると見込まれる、その後の総収益の推定値 $V(s)$ を出力する関数 $V(-)$;
  • 状態 $s$ において、エージェントがそこからのサンプリングによって行動を選択する確率分布 $\pi(-|s)$ を与える関数 $\pi(-|-)$;

をそれぞれ計算する二種類のニューラルネットワークをエージェントに備えます。そして、実際に行動することで状態 $s$ 以降に得られた総収益を $R$ とおき、$V(s)$ と $R$ の差 $R-V(s)$ を考えます。この差がどれだけ $0$ に近いかということが、現在の行動方策の下での $s$ から先の総収益の推定をどれだけ正確に行えているかを測る指標に他ならないため、$V(s)$ を出力するネットワークは $(R-V(s))^2$ を最小化する方向にパラメターを更新します。つまり、$V$ を計算するニューラルネットワークのパラメターを $\theta$ と書くとき、勾配 $\operatorname{grad}_{\theta}(R-V(s))^2$ を逆伝播によって計算し、この逆方向に $\theta$ を更新します。

また、$R-V(s)$ の値は、エージェントが実際に取った行動が、現行採用している行動則と照らし合わせてどれだけ良いものかを評価する指標にもなっています。例えば $R-V(s)$ の値が正の場合、現在の行動則の下で見込まれる収益よりも実際には高い収益を得たということで、エージェントが実際に行なった行動は現在の行動則よりもより良いものだと考えられるため、状態 $s$ において実際に取った行動 $a$ について、その選択をさらに encourage するために、行動確率 $\pi(a|s)$ を増大させる方向にパラメターを更新します。逆に、$R-V(s)$ の値が負の場合、エージェントが実際に行った行動は現在の行動則よりも悪いものだと考えられるため、状態 $s$ において実際に取った行動 $a$ について、その選択を discourage するために、行動確率 $\pi(a|s)$ を減少させる方向にパラメターを更新します。これらの更新は、$\pi(-|s)$ を計算するニューラルネットワークのパラメターを $\omega$ とおくとき、$(R-V(s))\operatorname{grad}_{\omega}\operatorname{log}\pi(a|s)$ の方向に $\omega$ を更新することで一纏めに行うことができます。

これらの更新を繰り返していくことで、関数 $V(s)$ は現行の行動則の下で状態 $s$ 以降に得られると見込まれる総収益をより正確に推定できるようになっていき、それと並行して、確率分布 $\pi(-|s)$ によって与えられる現行の行動則それ自体も、$s$ 以降の総収益の見込み $V(s)$ と照らし合わせてさらに良い結果が得られる行動をエージェントが選択しやすくなるように調整されていくのです。

ある状態以降の実際の総収益 $R$ を得るために、課題を終えるまでエージェントを動かすことが現実的でない場合は、$t$ ステップ目の状態 $s_t$ から $k$ ステップ先までは実際に得られた即時報酬の和を用いるが、それ以降得られるであろう総収益については $V(s_{t+k})$ をもってその推定値とし、$k$ ステップ先までの即時報酬の和に $V(s_{t+k})$ を加算した値によって $R$ を代替します (bootstrapping)。

A3C では、エージェントの行動および上記の更新の過程を複数の非同期なスレッドによって並列に行います。

いくつかの注意

行動空間が連続な場合

行動空間 $A$ が連続的な空間 (e.g. $\mathbb{R}^n$) の場合、$\pi(-|s)$ は、エージェントがそれに従って行動を選択する $A$ 上の確率分布の確率密度関数とします。例えば $A=\mathbb{R}^n$ の場合、平均 $\mu(s)$ と分散 $\sigma^2(s)$ をニューラルネットワークによって計算し (分散は正のスカラー値である必要がありますが、一般の実数値を取る関数に softplus 関数 $z\mapsto\operatorname{log}(1+e^z)$ を合成するなどして与えることが多いようです)、それらによって決まる正規分布からサンプリングすることで行動を決定するのが標準的なやり方のようです。つまりこの場合、平均 $\mu$、分散 $\sigma^2$ の正規分布の密度関数を $f(a,\mu,\sigma^2)$ と書くと、$\pi(a|s)=f(a,\mu(s),\sigma^2(s))$ となります。そして、$\operatorname{log}\pi(a|s)=\operatorname{log}f(a,\mu(s),\sigma^2(s))$ の勾配は $\mu(s)$ と $\sigma^2(s)$ を出力するネットワークの逆伝播に $\partial\operatorname{log}f/\partial\mu$ および $\partial\operatorname{log}f/\partial\sigma^2$ をそれぞれ入力して足すことで計算できます。

ネットワークの共有部分

$V(s)$ と $\pi(-|s)$ を出力するネットワークは、途中までは一つのネットワークを共有し、その後ろにそれぞれ固有の出力部をつなげたものを用いることも多いようです。その場合、共有部分のパラメター $\nu$ は、

\kappa\operatorname{grad}_{\nu}(R-V(s))^2-\eta(R-V(s))\operatorname{grad}_{\nu}\operatorname{log}\pi(a|s)

を用いて更新を行います。(ここで $\kappa,\eta>0$ は両者の balancing を行う係数であり、$\pi(a|s)$ の方に負号が付いているのは、gradient descent として辻褄を合わせるためです。)

Entropy regularization

行動の選択に多様性を持たせるために、エントロピーを考えてその勾配を更新において考慮に入れることも多いようです。 エントロピーは $H(\pi(-|s))=-\int_A\pi(a|s)\operatorname{log}\pi(a|s)da$ (行動空間 $A$ が離散的な場合は $\int_A\cdots da$ は $\sum_{a\in A}\cdots$ に読み替えます) で定義され、その値が大きいほど行動の選択の多様性が増すことになります。行動空間が有限、離散的で $\pi$ ネットワークの出力部に softmax 関数を用いた場合や、行動空間が連続でも $\pi(-|s)$ を正規分布の確率密度関数とした場合のように、$H$ の Jacobian を具体的に計算できる場合はそれを $\pi$ ネットワークの逆伝播に入れることで $\operatorname{grad}_{\omega}H(\pi(-|s))$ を計算できます。エントロピーを考慮に入れる場合、$\pi$ ネットワークのパラメターは

\eta(R-V(s))\operatorname{grad}_{\omega}\operatorname{log}\pi(a|s)+\beta\operatorname{grad}_{\omega}H(\pi(-|s))

($\eta,\beta>0$) を用いて更新を行います。

教師あり学習との組み合わせ

以下のようなやり方で A3C による強化学習を教師あり学習によって補完することもできるかも知れません。

教師エージェントとして使えるような、ある程度良い成績を示したデータ (人間によるゲームのプレイのリプレイなど) が利用できる場合、教師エージェントが採用している行動則の下で状態 $s$ 以降に見込まれる総収益を推定する関数 $V(s)$ を、教師あり学習によって訓練することができます。そのようにして事前に用意した $V(s)$ を用いて、上述のように $(R-V(s))$ によってその行動を評価することで、被訓練エージェントの行動を教師エージェントに似せていくように訓練することができます (この段階では被訓練エージェントの $\pi$ ネットワークのパラメターのみ更新し、$V$ ネットワークは事前に用意したもののままとします)。十分に教師エージェントと類似した行動が取れるようになったのち、通常の A3C アルゴリズムに乗せて訓練を行えば、教師エージェントの性能を上回るエージェントへと成長していくのではと期待できます。

この方法は例えば、課題を終了して初めて勝ち/負け/引き分け等の結果が得られるような、sparse な報酬の与えられ方をする強化学習課題において助けとなるかも知れません。

実装

問題設定

今回の実装では、「速習 強化学習」(C. Szepesvari 著、小山田創哲他訳、共立出版、2017) の第 1 章、例 1 で記述されている在庫管理問題を解くエージェントを A3C によって訓練します。

エージェントは現在の在庫量を見て新たに発注する量を決定します。1 単位以上の発注を行う場合、固定された初期費用 $K>0$ がかかり、発注量の 1 単位あたり $c>0$ の費用がかかります。商品が売れると 1 単位あたり $p>0$ の収入が入りますが、売れずに在庫として残った分については 1 単位あたり $h>0$ の費用がかかります。商品がどれだけ売れるかは、固定された同一の確率分布に従って毎回独立に確率的に決まります。

今回の実装では、初期在庫量 0 から始めて、発注量の決定を 30 回し終えた時点で終了として、それまでに得られた利益の総計を成績とします。最大在庫量は 99 とします。

A3C は離散的な行動空間を伴う強化学習課題についても、連続的な行動空間を伴う強化学習課題についても、どちらにも利用することができるため、今回の実装では発注量を整数値単位で決定する (つまり、行動空間 $=\{0,1,2,\ldots,99\}\subset\mathbb{Z}$ とする) バージョンと、発注量を実数値で決定する (つまり、行動空間 $=(0,99)\subset\mathbb{R}$ とする) バージョンを共に扱います。両バージョンともに、売れる量は実数値で決まり、状態空間 $=[0,99]\subset\mathbb{R}$ としています。

実装について

ゼロから作るという方針の下、外部のライブラリや既成の深層学習フレームワークは用いず、numpy と並列処理用の threading、グラフ描画用の matplotlib のみを用いて、Python で実装しました。(A3C が出てきたばかりの頃の記事などを読んでいると、深層学習フレームワークをマルチスレッド動かすために工夫をしなければならず、その点に A3C を実装する上での面倒があったようです。その意味で、特段高速化する必要性があるのでなければ、アルゴリズムを実装するだけなら今回のようにゼロから作ったほうがむしろ手軽かも知れません。)

発注量を整数値単位で決めるエージェント (「Agent_Ex1_Discrete」) については、$V$ および $\pi$ ネットワークは途中まで共通のネットワーク (「SharedBody」) を用い、その後ろに繋げた $10\times 1$ 行列の affine レイヤを $V$ ネットワークの出力部として、また、$10\times 100$ 行列の affine レイヤに softmax を施す処理を $\pi$ ネットワークの出力部として、それぞれ用いています。(affine レイヤは、行ベクトルに行列を右からかけることで処理を行うものとしています。)

発注量を実数値で決めるエージェント (「Agent_Ex1_Continuous」) については、共通ネットワーク (「SharedBody」) の後ろに繋げた、

  • $10\times 1$ 行列の affine レイヤを出力部とする $V$ ネットワーク;
  • $10\times 1$ 行列の affine レイヤを出力部とし、1 次元正規分布の平均を出力する $\mu$ ネットワーク;
  • $10\times 1$ 行列の affine レイヤに softplus 関数を合成したものを出力部とし、1 次元正規分布の分散を出力する $\sigma^2$ ネットワーク;

を用意し、行動の決定は平均 $\mu(s)$、分散 $\sigma^2(s)$ の 1 次元正規分布からサンプリングすることで行い、選ばれた実数値に sigmoid 関数を施して $99$ 倍してできる値 $\in(0,99)$ を発注量とします。

A3C アルゴリズムの実装の主要部分は「A3C」クラスであり、スレッド数分のエージェントと環境を与えるインスタンスのリスト、および幾つかのハイパーパラメターを指定して、訓練を担うインスタンスを生成します。今回は在庫管理問題用のエージェントおよび環境でデモを行っていますが、エージェントおよび環境を与えるクラスが今回用いたものと共通のインターフェースを持って実装されていれば、ほかの強化学習問題についてもそのまま使うことができます。

訓練の詳細

訓練の詳細は離散的な行動空間を用いるエージェントの訓練も、連続的な行動空間を用いるエージェントの訓練も、共通して以下のようになっています。

パラメターの更新は原論文と同様に Shared RMSProp を用いて行い、entropy regularization も $\beta=0.01$ として行います。原論文と同様に $V$ および $\pi$ ネットワークの間でパラメター共有を行い、$\eta=0.0007,\kappa=\eta/2$ とおいています。原論文と同様に、$\eta$ は訓練の過程で線形に減衰させます。$t_{max}=5$ とし、16 のスレッドを用います。総計で 90000 steps が経過するまで訓練が継続します。

コード

以下にコードを載せます。

a3c.py
import numpy as np
import matplotlib.pylab as plt
import threading

def softmax(z):
    c=np.max(z)
    return np.exp(z-c)/np.sum(np.exp(z-c),axis=1).reshape(-1,1)

# Reinforcement learning task described in Szepesvari's textbook, Example 1.
class Environment_Ex1:
    def __init__(self,batchsize=1,stepsperepisode=30,M=100-1,K=10,c=1,p=3,h=2,mu_d=30,sigma2_d=15):
        self.batchsize=batchsize
        self.state=np.zeros(batchsize)
        self.step=np.zeros(batchsize)
        self.score=np.zeros(batchsize)
        self.flag=np.ones(batchsize).astype(np.bool)
        self.M=M
        self.K=K
        self.c=c
        self.p=p
        self.h=h
        self.stepsperepisode=stepsperepisode
        self.mu_d=mu_d
        self.sigma2_d=sigma2_d
    def transition(self,action):
        x=self.state.copy()
        if action.shape[1]==1:
            a=self.M*Sigmoid().forward(action.flatten())
        else:
            a=np.argmax(action,axis=1)
        d=np.maximum(0,np.minimum(self.M,np.sqrt(self.sigma2_d)*np.random.randn(self.batchsize)+self.mu_d))
        self.state[self.flag]=np.maximum(np.minimum(x+a,self.M)-d,0)[self.flag]
        self.score[self.flag]+=(-self.K*(a>=1)-self.c*np.maximum(0,np.minimum(x+a,self.M)-x)-self.h*x+self.p*np.maximum(0,np.minimum(x+a,self.M)-self.state))[self.flag]
        self.step[self.flag]+=1
        self.flag[self.step>=self.stepsperepisode]=False
    def initialize(self,flag=None):
        if flag==None:
            flag=np.ones(self.state.size).astype(np.bool)
        self.state[flag]*=0
        self.step[flag]*=0
        self.score[flag]*=0
        self.flag[flag]=np.ones(np.sum(flag)).astype(np.bool)

# Agent to play the above RL task with finite, discrete action space.
class Agent_Ex1_Discrete:
    def __init__(self,M=100):
        self.sharedbody=SharedBody()
        self.vhead=Affine()
        self.pihead=Affine()
        self.V=None
        self.probdist=None
        self.a=None
        self.H=None
        self.M=M
    def action(self,state,params):
        s=state.reshape(-1,1).copy()
        latentvect=self.sharedbody.forward(s,params['sharedbody'])
        self.V=self.vhead.forward(latentvect,params['vhead'])
        preprobdist=self.pihead.forward(latentvect,params['pihead'])
        self.probdist=softmax(preprobdist)
        self.a=np.zeros([s.shape[0],self.M])
        u=np.random.uniform(size=(s.shape[0],1))
        idx=np.sum(np.cumsum(self.probdist,axis=1)<u,axis=1).astype(np.int)
        self.a[np.arange(0,s.shape[0],1),idx]=1
        self.H=-np.sum(self.probdist*np.log(self.probdist),axis=1).reshape(-1,1)
        return self.a
    def grads(self,E1,E2,E3):
        Edict={}
        grads={}
        Edict['pihead']=self.pihead.backprop(E1*(self.a-self.probdist))
        Edict['vhead']=self.vhead.backprop(E2*np.ones([self.a.shape[0],1]))
        Edict['sharedbody']=self.sharedbody.backprop(Edict['vhead'][0]+Edict['pihead'][0])
        grads['sharedbody']=Edict['sharedbody'][1]
        grads['vhead']=Edict['vhead'][1]
        grads['pihead']=Edict['pihead'][1]
        EdictH={}
        gradsH={}
        EdictH['pihead']=self.pihead.backprop(-E3*self.probdist*(self.H+np.log(self.probdist)))
        EdictH['vhead']=self.vhead.backprop(np.zeros([self.a.shape[0],1]))
        EdictH['sharedbody']=self.sharedbody.backprop(EdictH['pihead'][0])
        gradsH['sharedbody']=EdictH['sharedbody'][1]
        gradsH['vhead']=EdictH['vhead'][1]
        gradsH['pihead']=EdictH['pihead'][1]
        return grads,gradsH
    def initialparams(self):
        params={}
        params['sharedbody']={}
        params['vhead']={}
        params['pihead']={}
        params['sharedbody']['W1']=np.random.randn(1,10)
        params['sharedbody']['b1']=np.zeros(10)
        params['sharedbody']['W2']=np.random.randn(10,10)/np.sqrt(10)
        params['sharedbody']['b2']=np.zeros(10)
        params['vhead']['W']=np.random.randn(10,1)/np.sqrt(10)
        params['vhead']['b']=np.zeros(1)
        params['pihead']['W']=np.random.randn(10,self.M)/np.sqrt(10)
        params['pihead']['b']=np.zeros(self.M)
        return params

# Agent to play the above RL task with continuous action space.
class Agent_Ex1_Continuous:
    def __init__(self):
        self.sharedbody=SharedBody()
        self.vhead=Affine()
        self.muhead=Affine()
        self.sigma2head=Sigma2Head()
        self.V=None
        self.mu=None
        self.sigma2=None
        self.a=None
        self.H=None
    def action(self,state,params):
        s=state.reshape(-1,1).copy()
        latentvect=self.sharedbody.forward(s,params['sharedbody'])
        self.V=self.vhead.forward(latentvect,params['vhead'])
        self.mu=self.muhead.forward(latentvect,params['muhead'])
        self.sigma2=self.sigma2head.forward(latentvect,params['sigma2head'])
        self.a=self.mu+np.random.randn(s.shape[0],1)*self.sigma2**0.5
        self.H=0.5*(np.log(2*np.pi*self.sigma2)+1)
        return self.a
    def grads(self,E1,E2,E3):
        Edict={}
        grads={}
        Edict['sigma2head']=self.sigma2head.backprop(E1*(((self.a-self.mu)**2)/(2*self.sigma2**2)-0.5/self.sigma2))
        Edict['muhead']=self.muhead.backprop(E1*(self.a-self.mu)/self.sigma2)
        Edict['vhead']=self.vhead.backprop(E2*np.ones([self.a.shape[0],1]))
        Edict['sharedbody']=self.sharedbody.backprop(Edict['vhead'][0]+Edict['muhead'][0]+Edict['sigma2head'][0])
        grads['sharedbody']=Edict['sharedbody'][1]
        grads['vhead']=Edict['vhead'][1]
        grads['muhead']=Edict['muhead'][1]
        grads['sigma2head']=Edict['sigma2head'][1]
        EdictH={}
        gradsH={}
        EdictH['sigma2head']=self.sigma2head.backprop(0.5*E3/self.sigma2)
        EdictH['muhead']=self.muhead.backprop(np.zeros([self.a.shape[0],1]))
        EdictH['vhead']=self.vhead.backprop(np.zeros([self.a.shape[0],1]))
        EdictH['sharedbody']=self.sharedbody.backprop(EdictH['sigma2head'][0])
        gradsH['sharedbody']=EdictH['sharedbody'][1]
        gradsH['vhead']=EdictH['vhead'][1]
        gradsH['muhead']=EdictH['muhead'][1]
        gradsH['sigma2head']=EdictH['sigma2head'][1]
        return grads,gradsH
    def initialparams(self):
        params={}
        params['sharedbody']={}
        params['vhead']={}
        params['muhead']={}
        params['sigma2head']={}
        params['sharedbody']['W1']=np.random.randn(1,10)
        params['sharedbody']['b1']=np.zeros(10)
        params['sharedbody']['W2']=np.random.randn(10,10)/np.sqrt(10)
        params['sharedbody']['b2']=np.zeros(10)
        params['vhead']['W']=np.random.randn(10,1)/np.sqrt(10)
        params['vhead']['b']=np.zeros(1)
        params['muhead']['W']=np.random.randn(10,1)/np.sqrt(10)
        params['muhead']['b']=np.zeros(1)
        params['sigma2head']['W']=np.random.randn(10,1)/np.sqrt(10)
        params['sigma2head']['b']=np.zeros(1)
        return params

class Sigma2Head:
    def __init__(self):
        self.layers={}
        self.layers['aff']=Affine()
        self.presigma2=None
    def forward(self,s,params):
        self.presigma2=self.layers['aff'].forward(s,{'W':params['W'],'b':params['b']})
        return np.log(1+np.exp(self.presigma2))
    def backprop(self,E):
        return self.layers['aff'].backprop(E*np.exp(self.presigma2)/(1+np.exp(self.presigma2)))

class SharedBody:
    def __init__(self):
        self.layers={}
        self.layers['aff1']=Affine()
        self.layers['tanh']=Tanh()
        self.layers['aff2']=Affine()
        self.layers['sigm']=Sigmoid()
    def forward(self,s,params):
        z={}
        z['aff1']=self.layers['aff1'].forward(s,{'W':params['W1'],'b':params['b1']})
        z['tanh']=self.layers['tanh'].forward(z['aff1'])
        z['aff2']=self.layers['aff2'].forward(z['tanh'],{'W':params['W2'],'b':params['b2']})
        z['sigm']=self.layers['sigm'].forward(z['aff2'])
        return z['sigm']
    def backprop(self,E):
        grad={}
        Edict={}
        Edict['sigm']=self.layers['sigm'].backprop(E)
        Edict['aff2']=self.layers['aff2'].backprop(Edict['sigm'])
        Edict['tanh']=self.layers['tanh'].backprop(Edict['aff2'][0])
        Edict['aff1']=self.layers['aff1'].backprop(Edict['tanh'])
        grad['W1'],grad['b1']=Edict['aff1'][1]['W'],Edict['aff1'][1]['b']
        grad['W2'],grad['b2']=Edict['aff2'][1]['W'],Edict['aff2'][1]['b']
        return Edict['aff1'][0],grad

class Affine:
    def __init__(self):
        self.z=None
        self.W=None
        self.b=None
    def forward(self,z,params):
        self.z=z.copy()
        self.W=params['W'].copy()
        self.b=params['b'].copy()
        return np.dot(self.z,self.W)+self.b
    def backprop(self,E):
        Jac_zE=np.dot(E,self.W.T)
        Jac_WE=np.dot(self.z.T,E)
        Jac_bE=np.sum(E,axis=0)
        Jac_params={}
        Jac_params['W']=Jac_WE
        Jac_params['b']=Jac_bE
        return Jac_zE,Jac_params

class ReLU:
    def __init__(self):
        self.mask=None
    def forward(self,z):
        self.mask=z>0
        return z*self.mask
    def backprop(self,E):
        return self.mask*E

class Sigmoid:
    def __init__(self):
        self.y=None
    def forward(self,z):
        self.y=1/(1+np.exp(-z))
        return self.y
    def backprop(self,E):
        return (1-self.y)*self.y*E

class Tanh:
    def __init__(self):
        self.y=None
    def forward(self,z):
        self.y=np.tanh(z)
        return self.y
    def backprop(self,E):
        return (1-self.y**2)*E

# Global computation in the A3C algorithm.
class A3C:
    def __init__(self,t_max,T_max,agents,environments,params=None,gamma=0.99,alpha=0.99,learningrate=7e-4,vweight=0.5,entropyweight=0.01,epsilon=0.1,epoch=100):
        self.t_max=t_max
        self.T_max=T_max
        self.N=len(agents)
        self.gamma=gamma
        self.lr=learningrate
        self.kappa=vweight
        self.beta=entropyweight
        self.epsilon=epsilon
        self.c=epoch
        self.scorehistory=np.array([])
        if params==None:
            self.params=agents[0].initialparams()
        else:
            self.params=params
        self.g={}
        for key1 in self.params.keys():
            self.g[key1]={}
            for key2 in self.params[key1].keys():
                self.g[key1][key2]=np.zeros_like(self.params[key1][key2])
        self.alpha=alpha
        self.T=0
        self.threadinstances={}
        for i in range(self.N):
            self.threadinstances[i]=A3Cthread(agents[i],environments[i])
        self.accumulatedscores=0.0
        self.numberofsamples=0
    def update(self,threadinstance):
        while self.T<self.T_max:
            eta=-self.T*self.lr/float(self.T_max)+self.lr
            threadgrads,cnt,ifterminated,threadscore=threadinstance.grads(self.params,self.gamma,self.t_max,self.kappa,self.beta)
            for key1 in self.params.keys():
                for key2 in self.params[key1].keys():
                    self.g[key1][key2]*=self.alpha
                    self.g[key1][key2]+=(1-self.alpha)*threadgrads[key1][key2]**2
                    self.params[key1][key2]-=eta*threadgrads[key1][key2]/np.sqrt(self.g[key1][key2]+self.epsilon)
            self.T+=cnt
            self.accumulatedscores+=ifterminated*threadscore
            self.numberofsamples+=ifterminated
            if self.numberofsamples>self.c:
                self.scorehistory=np.append(self.scorehistory,self.accumulatedscores/float(max(1,self.numberofsamples)))
                self.accumulatedscores*=0
                self.numberofsamples*=0
                print(str(self.T)+" steps done with average score:")
                print(self.scorehistory[-1])
    def train(self):
        threads=[]
        for i in range(self.N):
            threads.append(threading.Thread(target=self.update,args=(self.threadinstances[i],)))
        for i in range(self.N):
            threads[i].start()
        for i in range(self.N):
            threads[i].join()
        return self.params

# Local computation for each A3C thread.
class A3Cthread:
    def __init__(self,agent,environment):
        self.agent=agent
        self.environment=environment
        self.t=0
        self.vlast=None
        self.alast=None
    def grads(self,globalparams,gamma,t_max,kappa=0.5,beta=0.01):
        params={}
        grads={}
        for key1 in globalparams.keys():
            params[key1]={}
            grads[key1]={}
            for key2 in globalparams[key1].keys():
                params[key1][key2]=globalparams[key1][key2].copy()
                grads[key1][key2]=np.zeros_like(globalparams[key1][key2])
        gradstoaccumulate={}
        rewards={}
        vvalues={}
        t_start=self.t
        while bool((self.t-t_start<t_max)*(np.sum(self.environment.flag)>0)):
            rewards[self.t]=-self.environment.score.copy()
            if bool((self.t>0)*(self.t==t_start)):
                a=self.alast.copy()
                vvalues[self.t]=self.vlast.copy()
            else:
                a=self.agent.action(self.environment.state,params)
                vvalues[self.t]=self.agent.V.copy()
            gradstoaccumulate[self.t]=self.agent.grads(1,2*kappa,beta)
            self.environment.transition(a)
            rewards[self.t]+=self.environment.score.copy()
            self.t+=1
        self.alast=self.agent.action(self.environment.state,params)
        self.vlast=self.agent.V.copy()
        R=np.sum(self.environment.flag)*self.vlast
        for i in range(self.t-t_start):
            R=gamma*R+rewards[self.t-i-1]
            for key1 in globalparams.keys():
                for key2 in globalparams[key1].keys():
                    grads[key1][key2]-=np.sum(R-vvalues[self.t-i-1])*gradstoaccumulate[self.t-i-1][0][key1][key2]+gradstoaccumulate[self.t-i-1][1][key1][key2]
        threadscore=np.sum(self.environment.score.copy())
        ifterminated=bool(1-np.sum(self.environment.flag))
        if ifterminated:
            self.environment.initialize()
        return grads,self.t-t_start,ifterminated,threadscore

# Demonstrating the A3C implementation.

# Showing how the agent with finite, discrete action space performs before training.
testagent=Agent_Ex1_Discrete()
untrainedparams=testagent.initialparams()
testenvironment=Environment_Ex1(batchsize=1000)
while np.sum(testenvironment.flag)>0:
    a=testagent.action(testenvironment.state,untrainedparams)
    testenvironment.transition(a)
print("Average performance of the agent with finite, discrete action space before training:")
print(np.sum(testenvironment.score)/float(testenvironment.batchsize))
# Training the agent.
agents=[Agent_Ex1_Discrete()]
environments=[Environment_Ex1()]
for i in range(15):
    agents.append(Agent_Ex1_Discrete())
    environments.append(Environment_Ex1())
trainer=A3C(t_max=5,T_max=90000,agents=agents,environments=environments)
print("Training the agent by the A3C algorithm:")
trainedparams=trainer.train()
scorehistory_discrete=trainer.scorehistory.copy()
# Showing how the A3C-trained agent with finite, discrete action space performs.
testagent=Agent_Ex1_Discrete()
testenvironment=Environment_Ex1(batchsize=1000)
while np.sum(testenvironment.flag)>0:
    a=testagent.action(testenvironment.state,trainedparams)
    testenvironment.transition(a)
print("Average performance of the A3C-trained agent with finite, discrete action space:")
print(np.sum(testenvironment.score)/float(testenvironment.batchsize))

# Showing how the agent with continuous action space performs before training.
testagent=Agent_Ex1_Continuous()
untrainedparams=testagent.initialparams()
testenvironment=Environment_Ex1(batchsize=1000)
while np.sum(testenvironment.flag)>0:
    a=testagent.action(testenvironment.state,untrainedparams)
    testenvironment.transition(a)
print("Average performance of the agent with continuous action space before training:")
print(np.sum(testenvironment.score)/float(testenvironment.batchsize))
# Training the agent.
agents=[Agent_Ex1_Continuous()]
environments=[Environment_Ex1()]
for i in range(15):
    agents.append(Agent_Ex1_Continuous())
    environments.append(Environment_Ex1())
trainer=A3C(t_max=5,T_max=90000,agents=agents,environments=environments)
print("Training the agent by the A3C algorithm:")
trainedparams=trainer.train()
scorehistory_continuous=trainer.scorehistory.copy()
# Showing how the A3C-trained agent with continuous action space performs.
testagent=Agent_Ex1_Continuous()
testenvironment=Environment_Ex1(batchsize=1000)
while np.sum(testenvironment.flag)>0:
    a=testagent.action(testenvironment.state,trainedparams)
    testenvironment.transition(a)
print("Average performance of the A3C-trained agent with continuous action space:")
print(np.sum(testenvironment.score)/float(testenvironment.batchsize))

# Showing the learning curves.
plt.plot(np.arange(0,scorehistory_discrete.size,1),scorehistory_discrete,linestyle="--",label="agent with discrete action space")
plt.plot(np.arange(0,scorehistory_continuous.size,1),scorehistory_continuous,label="agent with continuous action space")
plt.xlabel("epochs")
plt.ylabel("Average score")
plt.title("Average performances of the agents with discrete and continuous action spaces over the course of A3C training")
plt.legend()
plt.show()

実行結果

コードを実行すると、はじめに発注量を整数値単位で決定するエージェントの、訓練前の平均成績 (1000 episodes の成績の平均) が表示されます。大体 $-2500$ から $-500$ 程度の負の値であり、大きな損失を出してしまっているということです。訓練が開始されると、100 episodes ごとに平均の成績が表示されながら訓練が進行して行きます。2, 3 分程度で訓練が終了し、訓練後のエージェントの平均成績 (1000 episodes の成績の平均) が表示されます。大体 $200$ から $1200$ 程度の正の値となり、しっかりと利益を上げられるように学習できたことが分かります。

その後、今度は発注量を実数値で決定するエージェントについて同様の処理が実行されます。訓練前の平均成績は大体 $-2500$ から $-500$ 程度の負の値であり、大きな損失を出しているわけですが、2, 3 分程度の訓練を終えた後の平均成績は大体 $500$ から $1200$ 程度の正の値となり、こちらについても利益を上げられるように学習できていることが分かります。

最後に、訓練開始から終了までの成績の推移を示すグラフが、発注量を整数値単位で決めるエージェント、実数値で決めるエージェントの 2 つを並べて表示されます。大抵の場合、発注量を実数値で決めるエージェントの方がより急激に学習が進む印象があります。

6
12
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
12