前回の続きです
参考文献
「強化学習と深層学習 C言語によるシミュレーション」2017/10/14
(著)小高 知宏
文献元
準備
console
pip install numpy
ソースコード
sample.py
#/********************************************/
#/* 強化学習とNNの例題プログラム */
#/********************************************/
import numpy as np
#/****************/
#/* main()関数 */
#/****************/
def main():
#/* 記号定数の定義 */
#/*強化学習関連*/
GENMAX =50000 #/*学習の繰り返し回数*/
STATENO =64 #/*状態の数*/
ACTIONNO =4 #/*行動の数*/
ALPHA =0.1 #/*学習係数*/
GAMMA =0.9 #/*割引率*/
EPSILON =0.3 #/*行動選択のランダム性を決定*/
REWARD =1 #/*ゴール到達時の報酬*/
GOAL =54 #/*状態54がゴール状態*/
UP =0 #/*上方向の行動*/
DOWN =1 #/*下方向の行動*/
LEFT =2 #/*左方向の行動*/
RIGHT =3 #/*右方向の行動*/
LEVEL =512 #/*1試行における最大ステップ数*/
#/*ニューラルネット関連*/
#/*畳み込み演算関連*/
IMAGESIZE =8 #/*入力画像の1辺のピクセル数*/
F_SIZE =3 #/*畳み込みフィルタのサイズ*/
F_NO =2 #/*フィルタの個数*/
POOLOUTSIZE =3 #/*プーリング層の出力のサイズ*/
#/*全結合増関連*/
INPUTNO =18 #/*入力層のセル数*/
HIDDENNO =6 #/*中間層のセル数*/
OUTPUTNO =4 #/*出力層のセル数*/
NNALPHA =1 #/*学習係数*/
#/*強化学習関連*/
i=0
s=snext=0#/*現在の状態と、次の状態*/
t=0#/*時刻*/
a=0#/*行動*/
#/*ニューラルネット関連*/
#/*畳み込み演算関連*/
filter_=np.full((F_NO,F_SIZE,F_SIZE),0.)#/*畳み込みフィルタ*/
#/*全結合層関連*/
wh=np.full((HIDDENNO,INPUTNO+1 ),0.)#/*中間層の重み*/
wo=np.full((OUTPUTNO,HIDDENNO+1),0.)#/*出力層の重み*/
e =np.full((INPUTNO+OUTPUTNO, ),0.)#/*学習データセット*/
hi=np.full((HIDDENNO+1, ),0.)#/*中間層の出力*/
o =np.full((OUTPUTNO, ),0.)#/*出力*/
count_=0 #/*繰り返し回数のカウンタ*/
#/*乱数の初期化*/
np.random.seed(None);
#/*畳み込みフィルタの初期化*/
filter_=initfilter(filter_,F_NO,F_SIZE)
#/*重みの初期化*/
wh=initwh(wh,HIDDENNO,INPUTNO)#/*中間層の重みの初期化*/
wo=initwo(wo,OUTPUTNO,HIDDENNO)#/*出力層の重みの初期化*/
printweight(wh,wo)
#/*学習の本体*/
for i in range(GENMAX):
if i%1000==0:
print("step",i)
s=0#/*行動の初期状態*/
for t in range(LEVEL):#/*最大ステップ数まで繰り返す*/
#/*行動選択*/
a=selecta(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,EPSILON)
print("s,a",s,a)
snext=nexts(s,a)
#/*Q値の更新*/
#/*ネットワーク入力データeの設定*/
e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
e[INPUTNO+a]=updateq(s,snext,a,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,GOAL,ALPHA,REWARD,GAMMA,ACTIONNO,UP,DOWN,LEFT,RIGHT)#/*行動*/
#/*順方向の計算*/
o[a]=forward(wh,wo[a],hi,e,HIDDENNO,INPUTNO)
#/*出力層の重みの調整*/
wo[a]=olearn(wo[a],hi,e,o[a],a,NNALPHA,HIDDENNO,INPUTNO)
#/*中間層の重みの調整*/
wh=hlearn(wh,wo[a],hi,e,o[a],a,NNALPHA,HIDDENNO,INPUTNO)
#/*行動aによって次の状態snextに遷移*/
s=snext
#/*ゴールに到達したら初期状態に戻る*/
if s==GOAL:
break
#/*Q値の出力*/
printqvalue(wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,STATENO,ACTIONNO,IMAGESIZE,POOLOUTSIZE)
#/******************/
#/* 下請け関数 */
#/* 強化学習関連 */
#/******************/
#/******************/
#/*calcqvalue()関数*/
#/*Q値の計算 */
#/******************/
def calcqvalue(wh,wo,hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO):
#/*移動できない方向へのQ値を0にする*/
if(s<=7)and(a==UP):
return 0#/*最上段ではUP方向に進まない*/
if(s>=56)and(a==DOWN):
return 0#/*最下段ではDOWN方向に進まない*/
if(s%8==0)and(a==LEFT):
return 0#/*左端ではLEFT方向に進まない*/
if(s%8==7)and(a==RIGHT):
return 0#/*右端ではRIGHT方向に進まない*/
#/*移動できる場合のQ値*/
return forward(wh,wo,hi,e,HIDDENNO,INPUTNO)
#/****************************/
#/* updateq()関数 */
#/* Q値を更新する */
#/****************************/
def updateq(s,snext,a,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,GOAL,ALPHA,REWARD,GAMMA,ACTIONNO,UP,DOWN,LEFT,RIGHT):
qv=0.#/*更新されるQ値*/
qvalue_sa=0.#/*現在のQ値*/
qvalue_snexta=0.#/*次の状態での最大Q値*/
e=np.full((INPUTNO+OUTPUTNO,),0.)
#/*現在状態sでのQ値を求める*/
#/*ネットワーク入力データeの設定*/
e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
qvalue_sa=calcqvalue(wh,wo[a],hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)#/*行動a*/
#/*次の状態snextでの最大Q値を求める*/
#/*ネットワーク入力データeの設定*/
e=set_e_by_s(snext,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
u=set_a_by_q(snext,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO)
qvalue_snexta=calcqvalue(wh,wo[u],hi,e,snext,u,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)
#/*Q値の更新*/
if snext==GOAL:#/*報酬が付与される場合*/
qv=qvalue_sa+ALPHA*(REWARD-qvalue_sa) ;
else:#/*報酬なし*/
qv=qvalue_sa+ALPHA*(GAMMA*qvalue_snexta-qvalue_sa)
return qv
#/****************************/
#/* selecta()関数 */
#/* 行動を選択する */
#/****************************/
def selecta(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,EPSILON):
a=0#/*選択された行動*/
e=np.full((INPUTNO+OUTPUTNO,),0.)
#/*ニューラルネットへの入力設定*/
e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
#/*ε-greedy法による行動選択*/
if frand()<EPSILON:
#/*ランダムに行動*/
a=rand03()
while(calcqvalue(wh,wo[a],hi,e,s,a,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)==0):
a=rand03()#/*移動できない方向ならやり直し*/
else:
#/*Q値最大値を選択*/
a=set_a_by_q(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO)
return a
#/****************************/
#/* set_a_by_q()関数 */
#/* Q値最大値を選択 */
#/****************************/
def set_a_by_q(s,wh,wo,hi,filter_,INPUTNO,OUTPUTNO,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,ACTIONNO,UP,DOWN,LEFT,RIGHT,HIDDENNO):
maxq=0#/*Q値の最大値候補*/
maxaction=0#/*Q値最大に対応する行動*/
i=0
e=np.full((INPUTNO+OUTPUTNO,),0.)
#/*ネットワーク入力データeの設定*/
e=set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
for i in range(ACTIONNO):
if calcqvalue(wh,wo[i],hi,e,s,i,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO) > maxq:
maxq=calcqvalue(wh,wo[i],hi,e,s,i,UP,DOWN,LEFT,RIGHT,HIDDENNO,INPUTNO)#/*最大値の更新*/
maxaction=i#/*対応する行動*/
return maxaction
#/****************************/
#/* nexts()関数 */
#/*行動によって次の状態に遷移*/
#/****************************/
def nexts(s,a):
next_s_value=np.array([-8,8,-1,1])#/*行動aに対応して次の状態に遷移するための加算値*/
return s+next_s_value[a]
#/****************************/
#/* printqvalue()関数 */
#/* Q値を出力する */
#/****************************/
def printqvalue(wh,wo,hi,filter_,INPUTNO,OUTPUTNO,HIDDENNO,F_NO,F_SIZE,STATENO,ACTIONNO,IMAGESIZE,POOLOUTSIZE):
e=np.full((INPUTNO+OUTPUTNO,),0.)
for i in range(STATENO):
for j in range(ACTIONNO):
e=set_e_by_s(i,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO)
print("frwrd ",forward(wh,wo[j],hi,e,HIDDENNO,INPUTNO))
#/****************************/
#/* frand()関数 */
#/*0〜1の実数を返す乱数関数 */
#/****************************/
def frand():
#/*乱数の計算*/
return np.random.sample()
#/****************************/
#/* rand03()関数 */
#/* 0〜3の値を返す乱数関数 */
#/****************************/
def rand03():
#/*乱数の最大値を除く*/
#/*乱数の計算*/
return np.random.randint(0,4)
#/**************************/
#/* 下請け関数 */
#/* ニューラルネット関連 */
#/**************************/
#/**********************/
#/* initwh()関数 */
#/*中間層の重みの初期化*/
#/**********************/
def initwh(wh,HIDDENNO,INPUTNO):
#/*乱数による重みの決定*/
for i in range(HIDDENNO):
for j in range(INPUTNO+1):
wh[i][j]=drnd()
return wh
#/**********************/
#/* initwo()関数 */
#/*出力層の重みの初期化*/
#/**********************/
def initwo(wo,OUTPUTNO,HIDDENNO):
#/*繰り返しの制御*/
#/*乱数による重みの決定*/
for i in range(OUTPUTNO):
for j in range(HIDDENNO+1):
wo[i][j]=drnd()
return wo
#/**********************/
#/* forward()関数 */
#/* 順方向の計算 */
#/**********************/
def forward(wh,wo,hi,e,HIDDENNO,INPUTNO):
i=j=0#/*繰り返しの制御*/
u=0.#/*重み付き和の計算*/
o=0.#/*出力の計算*/
#/*hiの計算*/
for i in range(HIDDENNO):
u=0 #/*重み付き和を求める*/
j=np.arange(INPUTNO)
u=(e[j]*wh[i][j]).sum()
u-=wh[i][j[-1]+1]#/*しきい値の処理*/
hi[i]=f(u)
#/*出力oの計算*/
o=0
i=np.arange(HIDDENNO)
o=(hi[i]*wo[i]).sum()
o-=wo[i[-1]+1]#/*しきい値の処理*/
return f(o)
#/**********************/
#/* olearn()関数 */
#/* 出力層の重み学習 */
#/**********************/
def olearn(wo,hi,e,o,k,NNALPHA,HIDDENNO,INPUTNO):
i=0#/*繰り返しの制御*/
d=0.#/*重み計算に利用*/
d=(e[INPUTNO+k]-o)*o*(1-o)#/*誤差の計算*/
i=np.arange(HIDDENNO)
wo[i]+=NNALPHA*hi[i]*d#/*重みの学習*/
wo[i[-1]+1]+=NNALPHA*(-1.0)*d#/*しきい値の学習*/
return wo
#/**********************/
#/* hlearn()関数 */
#/* 中間層の重み学習 */
#/**********************/
def hlearn(wh,wo,hi,e,o,k,NNALPHA,HIDDENNO,INPUTNO):
i=j=0#/*繰り返しの制御*/
dj=0.#/*中間層の重み計算に利用*/
for j in range(HIDDENNO):#/*中間層の各セルjを対象*/
dj=hi[j]*(1-hi[j])*wo[j]*(e[INPUTNO+k]-o)*o*(1-o)
i=np.arange(INPUTNO)#/*i番目の重みを処理*/
wh[j][i]+=NNALPHA*e[i]*dj
wh[j][i[-1]+1]+=NNALPHA*(-1.0)*dj#/*しきい値の学習*/
return wh
#/**********************/
#/* printweight()関数 */
#/* 結果の出力 */
#/**********************/
def printweight(wh,wo):
print("wh",wh)
print("wo",wo)
#/*******************/
#/* f()関数 */
#/* シグモイド関数 */
#/*******************/
def f(u):
return 1.0/(1.0+np.exp(-u))
#/*************************/
#/* drand()関数 */
#/*-1から1の間の乱数を生成*/
#/*************************/
def drnd():
rndno=(np.random.sample(1)*2)-1#/*-1から1の間の乱数を生成*/
return rndno
#/**********************/
#/* initfilter()関数 */
#/* フィルタの初期化 */
#/**********************/
def initfilter(filter_,F_NO,F_SIZE):
for i in range(F_NO):
for j in range(F_SIZE):
for k in range(F_SIZE):
filter_[i][j][k]=drnd()
return filter_
#/**********************/
#/* conv()関数 */
#/* 畳み込みの計算 */
#/**********************/
def conv(filter_,e,convout,F_SIZE,IMAGESIZE):
i=j=0#/*繰り返しの制御用*/
startpoint=np.longlong(F_SIZE/2)#/*畳み込み範囲の下限*/
for i in range(startpoint,IMAGESIZE-startpoint,1):
for j in range(startpoint,IMAGESIZE-startpoint,1):
convout[i][j]=calcconv(filter_,e,i,j,F_SIZE)
return convout
#/**********************/
#/* calcconv()関数 */
#/* フィルタの適用 */
#/**********************/
def calcconv(filter_,e,i,j,F_SIZE):
m=n=0#/*繰り返しの制御用*/
sum_=0.#/*和の値*/
for m in range(F_SIZE):
for n in range(F_SIZE):
sum_+=e[np.longlong(i-F_SIZE/2+m)][np.longlong(j-F_SIZE/2+n)]*filter_[m][n]
return sum_
#/**********************/
#/* pool()関数 */
#/* プーリングの計算 */
#/**********************/
def pool(convout,poolout,IMAGESIZE,POOLOUTSIZE):
i=j=0#/*繰り返しの制御*/
for i in range(POOLOUTSIZE):
for j in range(POOLOUTSIZE):
poolout[i][j]=calcpooling(convout,i*2+1,j*2+1,IMAGESIZE)
return poolout
#/**********************/
#/* calcpooling()関数 */
#/* 平均値プーリング */
#/**********************/
def calcpooling(convout,x,y,IMAGESIZE):
m=n=0#/*繰り返しの制御用*/
ave=0.#/*平均値*/
for m in range(x,x+2,1):
for n in range(y,y+2,1):
ave+=convout[m][n]
return ave/4.0
#/************************************/
#/* set_e_by_s()関数 */
#/* 畳み込みを用いたNN入力データの設定 */
#/************************************/
def set_e_by_s(s,filter_,e,F_NO,F_SIZE,IMAGESIZE,POOLOUTSIZE,OUTPUTNO):
i=j=m=n=0#/*繰り返しの制御用*/
image=np.full((IMAGESIZE,IMAGESIZE),0.)#/*入力データ*/
convout=np.full((IMAGESIZE,IMAGESIZE),0.)#/*畳み込み出力*/
poolout=np.full((POOLOUTSIZE,POOLOUTSIZE),0.)#/*出力データ*/
#/*畳み込み部への入力の設定*/
for i in range(IMAGESIZE):
for j in range(IMAGESIZE):
if (i+j*IMAGESIZE)==s:
image[i][j]=1
else:
image[i][j]=0
for j in range(F_NO):#/*フィルタ毎の繰り返し*/
#/*畳み込みの計算*/
convout=conv(filter_[j],image,convout,F_SIZE,IMAGESIZE)
#/*プーリングの計算*/
poolout=pool(convout,poolout,IMAGESIZE,POOLOUTSIZE)
#/*プーリング出力を全結合層の入力へコピー*/
for m in range(POOLOUTSIZE):
for n in range(POOLOUTSIZE):
#print(e.shape,poolout.shape,j,m,n,j*POOLOUTSIZE*POOLOUTSIZE+POOLOUTSIZE*m+n)
e[j*POOLOUTSIZE*POOLOUTSIZE+POOLOUTSIZE*m+n]=poolout[m][n]
for l in range(OUTPUTNO):
e[POOLOUTSIZE*POOLOUTSIZE*F_NO+l]=0#/*教師データ部のクリア*/
return e
main()