0
3

More than 1 year has passed since last update.

CNNを使った深層強化学習の実装

Posted at

前回の続きです

自己符号化器の実装

参考文献

「強化学習と深層学習 C言語によるシミュレーション」2017/10/14
(著)小高 知宏
文献元

準備

console
pip install numpy

ソースコード

sample.py
#/********************************************/
#/*   強化学習とNNの例題プログラム          */
#/********************************************/
import numpy as np
 
#/****************/
#/*  main()関数  */
#/****************/
def main():
 #/* 記号定数の定義             */
 #/*強化学習関連*/
 GENMAX   =50000 #/*学習の繰り返し回数*/
 STATENO  =64     #/*状態の数*/
 ACTIONNO =4      #/*行動の数*/
 ALPHA    =0.1    #/*学習係数*/
 GAMMA    =0.9    #/*割引率*/
 EPSILON  =0.3    #/*行動選択のランダム性を決定*/
 REWARD   =1      #/*ゴール到達時の報酬*/
 GOAL     =54     #/*状態54がゴール状態*/
 UP       =0      #/*上方向の行動*/
 DOWN     =1      #/*下方向の行動*/
 LEFT     =2      #/*左方向の行動*/
 RIGHT    =3      #/*右方向の行動*/
 LEVEL    =512    #/*1試行における最大ステップ数*/
 #/*ニューラルネット関連*/
 #/*畳み込み演算関連*/
 IMAGESIZE   =8 #/*入力画像の1辺のピクセル数*/
 F_SIZE      =3 #/*畳み込みフィルタのサイズ*/
 F_NO        =2 #/*フィルタの個数*/
 POOLOUTSIZE =3 #/*プーリング層の出力のサイズ*/
 #/*全結合増関連*/
 INPUTNO  =18   #/*入力層のセル数*/
 HIDDENNO =6    #/*中間層のセル数*/
 OUTPUTNO =4    #/*出力層のセル数*/
 NNALPHA  =1    #/*学習係数*/
 #/*強化学習関連*/
 i=0
 s=snext=0#/*現在の状態と、次の状態*/
 t=0#/*時刻*/
 a=0#/*行動*/
 #/*ニューラルネット関連*/
 #/*畳み込み演算関連*/
 filter_=np.full((F_NO,F_SIZE,F_SIZE),0.)#/*畳み込みフィルタ*/

 #/*全結合層関連*/
 wh=np.full((HIDDENNO,INPUTNO+1 ),0.)#/*中間層の重み*/
 wo=np.full((OUTPUTNO,HIDDENNO+1),0.)#/*出力層の重み*/
 e =np.full((INPUTNO+OUTPUTNO,  ),0.)#/*学習データセット*/
 hi=np.full((HIDDENNO+1,        ),0.)#/*中間層の出力*/
 o =np.full((OUTPUTNO,          ),0.)#/*出力*/
 count_=0 #/*繰り返し回数のカウンタ*/

 #/*乱数の初期化*/
 np.random.seed(None);

 #/*畳み込みフィルタの初期化*/
 filter_=initfilter(filter_,F_NO,F_SIZE)

 #/*重みの初期化*/
 wh=initwh(wh,HIDDENNO,INPUTNO)#/*中間層の重みの初期化*/
 wo=initwo(wo,OUTPUTNO,HIDDENNO)#/*出力層の重みの初期化*/
 printweight(wh,wo)
 
 #/*学習の本体*/
 for i in range(GENMAX):
  if i%1000==0:
   print("step",i)
  s=0#/*行動の初期状態*/
  for t in range(LEVEL):#/*最大ステップ数まで繰り返す*/
   #/*行動選択*/
   a=selecta(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,EPSILON)
   print("s,a",s,a)
   snext=nexts(s,a)

   #/*Q値の更新*/
   #/*ネットワーク入力データeの設定*/
   e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
   e[INPUTNO+a]=updateq(s,snext,a,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,GOAL,ALPHA,REWARD,GAMMA,ACTIONNO,UP,DOWN,LEFT,RIGHT)#/*行動*/
   #/*順方向の計算*/
   o[a]=forward(wh,wo[a],hi,e,HIDDENNO,INPUTNO)
   #/*出力層の重みの調整*/
   wo[a]=olearn(wo[a],hi,e,o[a],a,NNALPHA,HIDDENNO,INPUTNO)
   #/*中間層の重みの調整*/
   wh=hlearn(wh,wo[a],hi,e,o[a],a,NNALPHA,HIDDENNO,INPUTNO)
   #/*行動aによって次の状態snextに遷移*/
   s=snext
   #/*ゴールに到達したら初期状態に戻る*/
   if s==GOAL:
       break
  
  #/*Q値の出力*/
  printqvalue(wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,STATENO,ACTIONNO,IMAGESIZE,POOLOUTSIZE)


#/******************/
#/* 下請け関数  */
#/*  強化学習関連  */
#/******************/

#/******************/
#/*calcqvalue()関数*/
#/*Q値の計算       */
#/******************/
def calcqvalue(wh,wo,hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO):
 #/*移動できない方向へのQ値を0にする*/
 if(s<=7)and(a==UP):
  return 0#/*最上段ではUP方向に進まない*/
 if(s>=56)and(a==DOWN):
  return 0#/*最下段ではDOWN方向に進まない*/  
 if(s%8==0)and(a==LEFT):
  return 0#/*左端ではLEFT方向に進まない*/
 if(s%8==7)and(a==RIGHT):
  return 0#/*右端ではRIGHT方向に進まない*/

 #/*移動できる場合のQ値*/
 return forward(wh,wo,hi,e,HIDDENNO,INPUTNO)

#/****************************/
#/*       updateq()関数      */
#/*      Q値を更新する      */
#/****************************/
def updateq(s,snext,a,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,GOAL,ALPHA,REWARD,GAMMA,ACTIONNO,UP,DOWN,LEFT,RIGHT):
 qv=0.#/*更新されるQ値*/
 qvalue_sa=0.#/*現在のQ値*/
 qvalue_snexta=0.#/*次の状態での最大Q値*/
 e=np.full((INPUTNO+OUTPUTNO,),0.)

 #/*現在状態sでのQ値を求める*/
 #/*ネットワーク入力データeの設定*/
 e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
 qvalue_sa=calcqvalue(wh,wo[a],hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)#/*行動a*/

 #/*次の状態snextでの最大Q値を求める*/
 #/*ネットワーク入力データeの設定*/
 e=set_e_by_s(snext,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
 u=set_a_by_q(snext,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO)
 qvalue_snexta=calcqvalue(wh,wo[u],hi,e,snext,u,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)

 #/*Q値の更新*/
 if snext==GOAL:#/*報酬が付与される場合*/
   qv=qvalue_sa+ALPHA*(REWARD-qvalue_sa) ;
 else:#/*報酬なし*/
   qv=qvalue_sa+ALPHA*(GAMMA*qvalue_snexta-qvalue_sa) 
 return qv

#/****************************/
#/*        selecta()関数     */
#/*      行動を選択する     */
#/****************************/
def selecta(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,EPSILON):
 a=0#/*選択された行動*/
 e=np.full((INPUTNO+OUTPUTNO,),0.)

 #/*ニューラルネットへの入力設定*/
 e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
 #/*ε-greedy法による行動選択*/
 if frand()<EPSILON:
  #/*ランダムに行動*/
  a=rand03()
  while(calcqvalue(wh,wo[a],hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)==0):
   a=rand03()#/*移動できない方向ならやり直し*/
 else:
  #/*Q値最大値を選択*/
  a=set_a_by_q(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO)
 return a

#/****************************/
#/*    set_a_by_q()関数      */
#/*   Q値最大値を選択      */
#/****************************/
def set_a_by_q(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO):
 maxq=0#/*Q値の最大値候補*/
 maxaction=0#/*Q値最大に対応する行動*/
 i=0
 e=np.full((INPUTNO+OUTPUTNO,),0.)

 #/*ネットワーク入力データeの設定*/
 e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
 for i in range(ACTIONNO):
  if calcqvalue(wh,wo[i],hi,e,s,i,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO) > maxq:
   maxq=calcqvalue(wh,wo[i],hi,e,s,i,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)#/*最大値の更新*/
   maxaction=i#/*対応する行動*/
 
 return maxaction

#/****************************/
#/*    nexts()関数           */
#/*行動によって次の状態に遷移*/
#/****************************/
def nexts(s,a):
 next_s_value=np.array([-8,8,-1,1])#/*行動aに対応して次の状態に遷移するための加算値*/
 return s+next_s_value[a]

#/****************************/
#/*    printqvalue()関数     */
#/*    Q値を出力する        */
#/****************************/
def printqvalue(wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,STATENO,ACTIONNO,IMAGESIZE,POOLOUTSIZE):
 e=np.full((INPUTNO+OUTPUTNO,),0.)
 for i in range(STATENO):
  for j in range(ACTIONNO):
   e=set_e_by_s(i,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
   print("frwrd ",forward(wh,wo[j],hi,e,HIDDENNO,INPUTNO))

#/****************************/
#/*     frand()関数          */
#/*0〜1の実数を返す乱数関数  */
#/****************************/
def frand():
 #/*乱数の計算*/
 return np.random.sample()

#/****************************/
#/*     rand03()関数         */
#/*  0〜3の値を返す乱数関数 */
#/****************************/
def rand03():
 #/*乱数の最大値を除く*/
 #/*乱数の計算*/
 return np.random.randint(0,4)

#/**************************/
#/* 下請け関数          */
#/*  ニューラルネット関連  */
#/**************************/
#/**********************/
#/*    initwh()関数    */
#/*中間層の重みの初期化*/
#/**********************/
def initwh(wh,HIDDENNO,INPUTNO):
 #/*乱数による重みの決定*/ 
 for i in range(HIDDENNO):
  for j in range(INPUTNO+1):
   wh[i][j]=drnd()
 return wh
 
#/**********************/
#/*    initwo()関数    */
#/*出力層の重みの初期化*/
#/**********************/
def initwo(wo,OUTPUTNO,HIDDENNO):
 #/*繰り返しの制御*/
 #/*乱数による重みの決定*/
 for i in range(OUTPUTNO):
  for j in range(HIDDENNO+1):
    wo[i][j]=drnd()
 return wo
 
#/**********************/
#/*  forward()関数     */
#/*  順方向の計算      */
#/**********************/
def forward(wh,wo,hi,e,HIDDENNO,INPUTNO):
 i=j=0#/*繰り返しの制御*/
 u=0.#/*重み付き和の計算*/
 o=0.#/*出力の計算*/
 #/*hiの計算*/
 for i in range(HIDDENNO):
  u=0 #/*重み付き和を求める*/
  j=np.arange(INPUTNO)
  u=(e[j]*wh[i][j]).sum()
  u-=wh[i][j[-1]+1]#/*しきい値の処理*/
  hi[i]=f(u)
 #/*出力oの計算*/
 o=0
 i=np.arange(HIDDENNO)
 o=(hi[i]*wo[i]).sum()
 o-=wo[i[-1]+1]#/*しきい値の処理*/
 return f(o)
 
#/**********************/
#/*  olearn()関数      */
#/*  出力層の重み学習  */
#/**********************/
def olearn(wo,hi,e,o,k,NNALPHA,HIDDENNO,INPUTNO):
 i=0#/*繰り返しの制御*/
 d=0.#/*重み計算に利用*/
 d=(e[INPUTNO+k]-o)*o*(1-o)#/*誤差の計算*/
 i=np.arange(HIDDENNO)
 wo[i]+=NNALPHA*hi[i]*d#/*重みの学習*/
 wo[i[-1]+1]+=NNALPHA*(-1.0)*d#/*しきい値の学習*/
 return wo
 
#/**********************/
#/*  hlearn()関数      */
#/*  中間層の重み学習  */
#/**********************/
def hlearn(wh,wo,hi,e,o,k,NNALPHA,HIDDENNO,INPUTNO):
 i=j=0#/*繰り返しの制御*/
 dj=0.#/*中間層の重み計算に利用*/
 for j in range(HIDDENNO):#/*中間層の各セルjを対象*/
  dj=hi[j]*(1-hi[j])*wo[j]*(e[INPUTNO+k]-o)*o*(1-o)
  i=np.arange(INPUTNO)#/*i番目の重みを処理*/
  wh[j][i]+=NNALPHA*e[i]*dj
  wh[j][i[-1]+1]+=NNALPHA*(-1.0)*dj#/*しきい値の学習*/
 return wh
 
#/**********************/
#/*  printweight()関数 */
#/*   結果の出力       */
#/**********************/
def printweight(wh,wo):
 print("wh",wh)
 print("wo",wo)
 
#/*******************/
#/* f()関数         */
#/* シグモイド関数  */
#/*******************/
def f(u):
 return 1.0/(1.0+np.exp(-u))
 
#/*************************/
#/* drand()関数           */
#/*-1から1の間の乱数を生成*/
#/*************************/
def drnd():
 rndno=(np.random.sample(1)*2)-1#/*-1から1の間の乱数を生成*/
 return rndno

#/**********************/
#/*  initfilter()関数  */
#/*  フィルタの初期化 */
#/**********************/
def initfilter(filter_,F_NO,F_SIZE):
 for i in range(F_NO):
  for j in range(F_SIZE):
   for k in range(F_SIZE):
    filter_[i][j][k]=drnd()
 return filter_

#/**********************/
#/*  conv()関数        */
#/*  畳み込みの計算    */
#/**********************/
def conv(filter_,e,convout,F_SIZE,IMAGESIZE):
 i=j=0#/*繰り返しの制御用*/
 startpoint=np.longlong(F_SIZE/2)#/*畳み込み範囲の下限*/
 for i in range(startpoint,IMAGESIZE-startpoint,1):
  for j in range(startpoint,IMAGESIZE-startpoint,1):
   convout[i][j]=calcconv(filter_,e,i,j,F_SIZE)
 return convout

#/**********************/
#/*  calcconv()関数    */
#/*  フィルタの適用    */
#/**********************/
def calcconv(filter_,e,i,j,F_SIZE):
 m=n=0#/*繰り返しの制御用*/
 sum_=0.#/*和の値*/
 
 for m in range(F_SIZE):
  for n in range(F_SIZE):
   sum_+=e[np.longlong(i-F_SIZE/2+m)][np.longlong(j-F_SIZE/2+n)]*filter_[m][n]
  
 return sum_

#/**********************/
#/*  pool()関数        */
#/* プーリングの計算   */
#/**********************/
def pool(convout,poolout,IMAGESIZE,POOLOUTSIZE):
 i=j=0#/*繰り返しの制御*/
 for i in range(POOLOUTSIZE):
  for j in range(POOLOUTSIZE):
   poolout[i][j]=calcpooling(convout,i*2+1,j*2+1,IMAGESIZE)
 return poolout

#/**********************/
#/* calcpooling()関数  */
#/* 平均値プーリング   */
#/**********************/
def calcpooling(convout,x,y,IMAGESIZE):
 m=n=0#/*繰り返しの制御用*/
 ave=0.#/*平均値*/
 for m in range(x,x+2,1):
  for n in range(y,y+2,1):
   ave+=convout[m][n]
 return ave/4.0
 
#/************************************/
#/* set_e_by_s()関数                 */
#/* 畳み込みを用いたNN入力データの設定  */
#/************************************/
def set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO):
 i=j=m=n=0#/*繰り返しの制御用*/
 image=np.full((IMAGESIZE,IMAGESIZE),0.)#/*入力データ*/
 convout=np.full((IMAGESIZE,IMAGESIZE),0.)#/*畳み込み出力*/
 poolout=np.full((POOLOUTSIZE,POOLOUTSIZE),0.)#/*出力データ*/
 
 #/*畳み込み部への入力の設定*/
 for i in range(IMAGESIZE):
  for j in range(IMAGESIZE):
   if (i+j*IMAGESIZE)==s:
    image[i][j]=1
   else:
    image[i][j]=0

 for j in range(F_NO):#/*フィルタ毎の繰り返し*/
  #/*畳み込みの計算*/
  convout=conv(filter_[j],image,convout,F_SIZE,IMAGESIZE)
  #/*プーリングの計算*/
  poolout=pool(convout,poolout,IMAGESIZE,POOLOUTSIZE)
  #/*プーリング出力を全結合層の入力へコピー*/
  for m in range(POOLOUTSIZE):
   for n in range(POOLOUTSIZE):
    #print(e.shape,poolout.shape,j,m,n,j*POOLOUTSIZE*POOLOUTSIZE+POOLOUTSIZE*m+n)
    e[j*POOLOUTSIZE*POOLOUTSIZE+POOLOUTSIZE*m+n]=poolout[m][n]
    for l in range(OUTPUTNO):
     e[POOLOUTSIZE*POOLOUTSIZE*F_NO+l]=0#/*教師データ部のクリア*/
 return e


main()

0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3