python 機械学習 強化学習 エラー
解決したいこと
ここに解決したい内容を記載してください。
例)
pythonでネズミが動いて餌を多く獲得できるように学習する機械学習を行なっています。
発生している問題・エラー
ValueError: shapes (20,) and (1,5) not aligned: 20 (dim 0) != 1 (dim 0)
該当するソースコード
abstract.py
import pygame
import random
class abstract:
def __init__(self):
# 移動先の可否を表すpath_listを取得
self.path = self.path_list()
# 行動回数の初期化
self.count = 0
# 移動先の座標を表すmove_listを取得
self.move = self.move_list()
# エピソードの終了を判定するdoneの初期化
self.done = False
#画面のサイズの設定
self.window_size = (320, 320)
# Pygameを初期化
pygame.init()
# 描画画面の作成
self.screen = pygame.display.set_mode(self.window_size)
#グリッドサイズを設定
self.grid_size = 64
#ネズミ, チーズ, 背景の画像のパスをそれぞれ設定
self.agent_img_path = 'images/agent.png'
self.reward_path = 'images/cheese.png'
self.bg_img_path = 'images/background.png'
#各種画像の設定
self.agent = pygame.image.load(self.agent_img_path)
self.agent = pygame.transform.smoothscale(self.agent,(self.grid_size,self.grid_size))
self.p_reward = pygame.image.load(self.reward_path)
self.p_reward = pygame.transform.smoothscale(self.p_reward,(self.grid_size,self.grid_size))
self.img_bg = pygame.image.load(self.bg_img_path)
#背景画像の左上と右下の角の座標を取得
self.rect_bg = self.img_bg.get_rect()
#ネズミ(=エージェント)とチーズ(=報酬)の初期位置を設定
self.a_pos = 1
self.r_pos = 1
def path_list(self):
"""
各部屋から移動可能な部屋を表すlistを返す
"""
self.effect_path_list = [
[1,1,1,1,1],
[1,1,0,0,0],
[1,0,1,0,0],
[1,0,0,1,0],
[1,0,0,0,1]]
return self.effect_path_list
def move_list(self):
"""
各部屋の座標を返す
"""
self.move = [
[160,160],
[220,90],
[90,220],
[220,220],
[90,90]]
return self.move
def step(self,action):
"""
一回分の行動を実行
"""
# 行動回数を更新
self.count += 1
# エージェントの行動がvalidかをcheck関数で確認
self.a_pos, self.r_pos, self.reward = self.check(action)
# カウントが100に達したらdone = True
if self.count==100:
self.done = True
# 移動した場所の番号と、報酬と、doneを返す
return self.a_pos, self.reward, self.done
def check(self, tar_pos):
"""
チェックした移動先と新しい報酬の座標と報酬を返す
"""
# 報酬を初期化
self.reward = 0
# もしエージェントの選択した部屋へ移動不可能であったら
if self.path[self.a_pos][tar_pos]==0:
return self.a_pos, self.r_pos, self.reward
# もしエージェントの選択した部屋へ移動可能であり、移動先に報酬があったら
#報酬を獲得したので、報酬は+1した上で、位置は新たな場所に更新
elif tar_pos==self.r_pos:
return tar_pos, self.r_pos%4+1, self.reward+1.
# もしエージェントの選択した部屋へ移動可能であり、移動先に報酬がなかったら
else:
return tar_pos, self.r_pos, self.reward
def render(self):
"""
ゲームを画面に表示する
"""
#self.moveかネズミの移動先の座標を取得
self.agent_pos = self.move[self.a_pos]
#self.moveかチーズの移動先の座標を取得
self.reward_pos = self.move[self.r_pos]
#背景画像を表示
self.screen.blit(self.img_bg, self.rect_bg)
#チーズを画面に表示
self.screen.blit(self.p_reward, (self.reward_pos[0]-self.grid_size/2, self.reward_pos[1]-self.grid_size/2))
#ネズミを画面に表示
self.screen.blit(self.agent, (self.agent_pos[0]-self.grid_size/2, self.agent_pos[1]-self.grid_size/2))
#画面を更新
pygame.display.flip()
def reset(self):
"""
一定回数実行したら、環境をリセットする。
"""
#チーズの座標を初期位置に戻す
self.r_pos = 1
#ネズミの座標を初期位置に戻す
self.a_pos = 1
return self.a_pos
policy_gradient.py
import sys, os
import numpy as np
import _pickle as pickle
class RMSProp:
def __init__(self,lr,decay_rate=0.99,):
self.lr=lr
self.decay_rate=decay_rate
def step(self,params,grads,rmsprop):
for i in range(len(params)):
rmsprop[i]=self.decay_rate*rmsprop[i]+(1-self.decay_rate)*grads[i]**2
params[i]+=self.lr*grads[i]/(np.sqrt(rmsprop[i])+1e-5)
grads[i]*=0
class SoftmaxWithLoss:
def __init__(self):
self.params=[]
self.grads=[]
def forward(self,x):
y=self.softmax(x)
return y
def softmax(self,x):
if x.ndim == 2:
x = x - x.max(axis=1, keepdims=True)
x = np.exp(x)
x /= x.sum(axis=1, keepdims=True)
elif x.ndim == 1:
x = x - np.max(x)
x = np.exp(x) / np.sum(np.exp(x))
return x
def backward(self,discounted_epr,epdlogp):
discounted_epr -= np.mean(discounted_epr)
discounted_epr /= np.std(discounted_epr)
epdlogp *= discounted_epr
return epdlogp
def reset(self):
None
class FullyConnected:
def __init__(self,w):
self.params=[w]
self.grads=[np.zeros_like(w)]
self.rmsprop=[np.zeros_like(w)]
self.x=[]
def forward(self,x):
w=self.params[0]
if len(self.x)!=0:
self.x=np.vstack([self.x,np.array(x)])
else:
self.x=np.array(x)
out=np.dot(x,w)
return out
def backward(self,dout):
w=self.params[0]
dx=np.dot(dout,w.T)
dw=np.dot(self.x.T,dout)
self.grads[0]+=dw
self.x=[]
#self.grads.reshape(dw.shape)
return dx
def reset(self):
self.x=[]
class ReLu:
def __init__(self):
self.params=[]
self.grads=[]
self.rmsprop=[]
self.mask=[]
def forward(self,x):
mask=(x<=0)
if len(self.mask)!=0:
self.mask=np.vstack([self.mask,mask])
else:
self.mask=mask
out=x
out[mask]=0
return out
def backward(self,dout):
dout[self.mask]=0
dx=dout
self.mask=[]
return dx
def reset(self):
self.mask=[]
class network:
def __init__(self,input_size,midde,output_size):
I,M,O=input_size,midde,output_size
w1=np.random.randn(I,M)/np.sqrt(I)
w2=np.random.randn(M,O)/np.sqrt(M)
#-------------------------調整するパラメータ---------------------------------------
self.gamma=0.9 #保管した報酬を時系列で関連づける
self.decay_rate=0.99 #RMSpropの移動平均をとる際のパラメータ
self.batch_size=0 #バッチサイズ
self.optimizer=RMSProp(lr=0.1,decay_rate=self.decay_rate) #学習率のパラメータ
#--------------------------------------------------------------------------------------------
self.epsilon=0.
self.drs=[]
self.dlogps=[]
self.layers=[
FullyConnected(w1),
ReLu(),
FullyConnected(w2)
]
self.loss_layer=SoftmaxWithLoss()
self.params,self.grads,self.rmsprop=[],[],[]
for layer in self.layers:
self.params+=layer.params
self.grads+=layer.grads
self.rmsprop+=layer.rmsprop
def forward(self,x):
for layer in self.layers:
x=layer.forward(x)
y=self.loss_layer.forward(x)
return y
def backward(self):
#誤差逆伝播して勾配を計算し、勾配をエージェントに保存する
epdlogp = np.vstack(self.dlogps)
epr = np.vstack(self.drs)
self.dlogps,self.drs=[],[]
discounted_epr = self.discount_rewards(epr)
dout=self.loss_layer.backward(discounted_epr,epdlogp)
for layer in reversed(self.layers):
dout=layer.backward(dout)
def reset(self):
self.dlogps,self.drs=[],[]
for layer in self.layers:
layer.reset()
def discount_rewards(self, r):
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(0, r.size)):
running_add = running_add * self.gamma + r[t]
discounted_r[t] = running_add
return discounted_r
def update(self):
self.optimizer.step(self.params,self.grads,self.rmsprop)
def record_reward(self,reward):
#行動で得られた報酬を保管
self.drs.append(reward)
def select_action(self, aprob):
if np.random.random() < self.epsilon:
action = np.random.choice(range(len(aprob)))
else:
action = np.random.choice(range(len(aprob)), p=aprob)
y = [0] * 5
y[action] = 1
#決定した行動とニューラルネットの出力の差を格納
self.dlogps.append(y - aprob)
return action
def save_model(self,filename):
with open(filename,"wb") as f:
pickle.dump(self.params,f)
main.py
import pygame
from policy_gradient import network
from abstract import abstract
env=abstract()
#----パラメータの調整--------
#ノード数のパラメータ
agent=network(10,20,5)
#---------------------------
reward_sum=0
running_reward = None
prev_x = 0
episode=0
observation=env.reset()
while True:
env.render()
cur_x=observation
x = [0] * 10
x[cur_x] = 1
x[5+prev_x] = 1
prev_x = cur_x
#ニューラルネットに行動を予測させ、確率で行動を決定
aprob=agent.forward(x)
action = agent.select_action(aprob)
#環境に行動を入力し、新たな状況と報酬とチーズを取得できたかどうかを取得
observation,reward,done=env.step(action)
#累積した報酬を記録し、今回の行動で得られた報酬を保管する
reward_sum += reward
agent.record_reward(reward)
#100回繰り返した後、新たなエピソードに移る
if done:
episode+=1
if reward_sum != 0:
#誤差逆伝播して勾配を計算し、勾配をエージェントに保存する
agent.backward()
#繰り返したエピソードが10単位で溜まったら、重みを更新する
if episode%10==0:
agent.update()
#running_rewardの中身が空なら累計した報酬をそのまま保管し、そうでなければ移動平均を求める
running_reward = reward_sum if running_reward is None \
else running_reward * 0.99 + reward_sum * 0.01
print ('resetting env. episode reward total was %f. running mean: %f'
% (reward_sum, running_reward))
#100回行動した後の処理を終えたら累積した報酬と環境をリセット
reward_sum = 0
observation=env.reset() # reset env
if reward != 0: # エピソード内でチーズを得られた場合、その結果を表示する
print (('ep %d: game finished, reward: %f'
% (episode, reward)) + \
('' if reward == -1 else ' !!!!!!!!'))
pygame.display.update()
pygame.quit()
0 likes