3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

強化学習ライブラリOpenAI Gymで独自環境を作って学んだ“やってみないと分からないこと”

Posted at

背景

強化学習に興味がありはしたものの、いろいろな講座や情報を読んでもどうもしっくりこない状態が続いていました。※後述の書籍を理解するベース知識を得るために必要だったと思ってます。
そんな中、O’Reilly社のゼロから作るDeep Learning ❹ 強化学習編(以降、「書籍」と表現します)を読みました。本書は題名の通り実際に強化学習のコーディングをしながら学んでいきます。恥ずかしながら出てきた数式などを完全に理解したとはいいがたい状態ですが、だいぶ理解が進みました。本書の最後の方でOpenAI Gymという強化学習を使って簡単なゲームをプレイさせるライブラリを使用しています。
コーディングの写経では、終わったのちに何か独自要素を加えて改造することで理解が深まります。独自に何か簡単なゲームを定義してプレイさせる事にしました。

想定

強化学習の基本知識を持つ人が、自分の様にOpenAI Gymで独自環境を作って学習したい時の参考の一つになればと思っています。

動作環境

  • WSL2
  • Python 3.10.12
  • gym 0.26.2

環境定義

環境とは、ゲームのルールです。書籍での題材の一つは「3×4マス上に爆弾、壁、ゴールがあり、最短距離でゴールを目指す」というものでした。今回はこれを複雑化します。倉庫番ゲームのルールを採用し「4×4のマスに荷物、ゴールがあり、プレイヤーは荷物を押す事でゴールに到達させる」事にします。荷物を押すというアクションがあることで、荷物の位置という状況が変わっていくことになります。

実装

必要な実装概略

  • gym環境
    • 処理に使用する変数の範囲定義
    • 指定したアクションを受け取り、学習に必要な情報を返却
    • 現時点での状況を描画
  • エージェント
    • 状況から次のアクションの決定
    • 学習処理

※最後に全体を記載しますが、この章では重要部分だけ説明します

gym環境

gym.Envクラスを継承し、定義した環境に基づきコーディングしていきます。以下の記事が大変分かりやすく参考にさせてもらいました。
OpenAI Gym用の環境自作方法と自作例を紹介!

処理に使用する変数の範囲定義

ここでは一部のみ記載します。※全体コードは最後に記載しています。

コンストラクタでの必須設定
    def __init__(self, start_is_random=True, agent_start=None, box_start=None, box_goal=None, max_steps=100):
        # ・・中略・・

        # エージェント(プレイヤー)が取りうる行動種類。上下左右移動なので4種類にします。
        self.action_space = spaces.Discrete(4)

        # 状況を表現する空間を定義。エージェントと荷物それぞれのX,Y座標を観測する
        # それぞれの座標は0からgrid_width-1, grid_height-1までの整数値
        # 例: [0, 1, 2, 3]は、エージェントが(0,1)にいて、荷物が(2,3)にいることを示す
        self.observation_space = gym.spaces.Box(
            low=np.int32([0,0,0,0]),
            high=np.int32([self.grid_width - 1, self.grid_height - 1, self.grid_width - 1, self.grid_height - 1]), dtype=np.int32)

        # 報酬の範囲[最小値と最大値]を定義。(-100から100を設定)
        self.reward_range = (self.fail_reward, self.goal_reward)

指定したアクションを受け取り、学習に必要な情報を返却

ここでは一部のみ記載します。※全体コードは最後に記載しています。

actionに対応する変数処理や報酬の返却
    def step(self, action):
        # ・・中略・・

        # 荷物が隣接していて、エージェントの移動方向に荷物がある場合
        if (next_agent_x, next_agent_y) == self.box_position:
            # 荷物の移動先を計算
            next_box_x += (next_agent_x - agent_x)
            next_box_y += (next_agent_y - agent_y)

            # 荷物がグリッド内に収まる場合のみ移動
            if 0 <= next_box_x <= self.max_x and 0 <= next_box_y <= self.max_y:
                self.box_position = (next_box_x, next_box_y)
                self.agent_position = (next_agent_x, next_agent_y)
        else:
            # 荷物がない場合はエージェントのみ移動
            self.agent_position = (next_agent_x, next_agent_y)

        # 観測を更新
        obs = np.float32([self.agent_position[0], self.agent_position[1], self.box_position[0], self.box_position[1]])

        # 成功
        if self.box_position == self.box_goal:
            return obs, self.goal_reward, True, {"status": "success"}

        # 失敗
        if self.step_count >= self.max_steps:
            return obs, self.fail_reward, True, {"status": "fail (max_steps)"}

現時点での状況を描画

ここではhuman(画面上で可視化するモード)で行っていますが、rgb_array(イメージバイト配列)に対応するとステップ毎の画像を保存して動画にするということも可能になるはずです。

pygameライブラリを使用します
    def render(self, mode='human'):
        # modeとしてhuman, rgb_array, ansiが選択可能

        def __render_grid():
            self.screen.fill((255,255,255))
            # グリッドを描画する関数
            for x in range(self.grid_width):
                for y in range(self.grid_height):
                    rect = pygame.Rect(x * 50, y * 50, 50, 50)
                    pygame.draw.rect(self.screen, (0, 0, 0), rect, 1)

        def __get_pixel_center(pos):
            # エージェントのピクセル座標を取得する関数
            return (
                pos[0] * self.grid_pixel + int(self.grid_pixel / 2),
                pos[1] * self.grid_pixel + int(self.grid_pixel / 2))

        def __get_draw_rect(pos):
            # エージェントのピクセル座標を取得する関数
            offset = int(self.grid_pixel / 4)
            return (pos[0] * self.grid_pixel + offset, pos[1] * self.grid_pixel + offset, offset * 2, offset * 2)

        # humanなら描画し, rgb_arrayならイメージバイト配列をreturnし, ansiなら文字列をreturnする
        if mode == 'human':
            __render_grid()
            pygame.draw.circle(self.screen, self.color_agent, __get_pixel_center(self.agent_position), int(self.grid_pixel / 3))
            pygame.draw.rect(self.screen,self.color_box,__get_draw_rect(self.box_position))
            pygame.draw.circle(self.screen, self.color_box_goal, __get_pixel_center(self.box_goal), int(self.grid_pixel / 3))
            pygame.display.update()
        else:
            super(WarehouseEnv, self).render(mode=mode)

エージェント

状況の情報から次にとるべきアクションを学習していく部分です。

書籍のコードをそのまま流用します。今回の実装では書籍を参考にしているので、書籍のシリーズ3で作成しているディープラーニングフレームワークdezeroを使用していますが、一般的なKeras-RLなどでも大丈夫だと思います。

重みを保存&再現できるよう改造した部分(n_actions引数も追加)
class Agent:
    def __init__(self, n_actions, pretrained=False, pretrained_path=None):
        # ・・中略・・
        if pretrained:
            self.pi = PolicyNet(n_actions)
            self.v = ValueNet()
            self.pi.load_weights(pretrained_path + "_policy_net.npz")
            self.v.load_weights(pretrained_path + "_value_net.npz")
        else:
            self.pi = PolicyNet(n_actions)
            self.v = ValueNet()
        # ・・中略・・

    def save_weights(self, path):
        self.pi.save_weights(path + "_policy_net")
        self.v.save_weights(path + "_value_net")
        # ・・後略・・

書籍中でCartPole-v1を題材にした時、学習後に再現していた時の例

env_play = gym.make("CartPole-v1", render_mode="human")
agent_play = Agent(2, pretrained=True, pretrained_path=weights_path)
state = env_play.reset()[0]
done = False
while not done:
    action, prob = agent_play.get_action(state)
    state, reward, done, info, _ = env_play.step(action)
env_play.close()

結果

ランダムにゴールなどを配置された状態でも最短距離でゴールできるモデルを作成し、プレイさせたかったですが、シンプルな配置の固定でも「ゴールすることがそれなりにある」レベルの学習までにとどまりました。
そのため、学習の間にゴールした時の動作履歴をチェックして、最短のものを保持しておいて、それを再現プレイするという事を行いました。以下その動画になります。

pygame_movie.gif

学べた事

  • 要素が一つ増えるだけで調整が大変になる
  • 報酬や最大ステップの調整が大事(無限ループして報酬得る方針をとりがち)
  • 強化学習にも色々手法があるのでタスクに応じて変えた方がよい(冒頭で紹介した書籍参考)
    • 今回は書籍の最後で使用していたActor-Critic手法をそのまま使用したが、おそらく今回定義した環境ではAlphaGoZeroで使用されているというモンテカルロ木探索手法を使った方がよかったと思われる
  • OpenAI Gymは強化学習の環境定義や可視化を行うが、学習自体は行わない

感じた事

  • 強化学習は何も考えないと相当な量の学習(試行回数)が必要になると思われる
    • ある程度成功している例を教師情報として使うことで計算量削減を行っていると思われる
  • 最近はここら辺の分野でもAIの進歩はすごいらしい
  • 今回の環境の様にゴールのみが最大の報酬を得るようなケースでは局所最適解に陥りやすいと思われる
    • この意味でも報酬の調整が大事だと思われる
    • 学習率などのチューニングも必須になる
    • 実は今回の中途半端な結果に至る程度でも報酬に関して結構試行錯誤した
  • 本来の倉庫番ゲームは一度置き場所に置いてもいったん外して通路を確保するとかの手順が必要になるケースもあり、2つ以上箱があるケースを解かせるのは相当大変になると思われる
  • 今回は同じところを通るとペナルティとする事である程度まともに動いたが、本来の倉庫番ゲームは何度も同じところを通るのは普通なので報酬設定は相当難しいはず

その他

タイトルはChatGPTに相談して出てきた候補を採用してみました。

mp4からgifに変換するプログラムはgithub copilotに作ってもらいました。

AWSのDeepRacerも強化学習と関係していますが、OpenAI Gymの環境サンプルCar Racingと似ていたりするかもしれません。知らんけど。

コード全体

ライブラリのインストールが必要です。

pip install gym
pip install dezero
コード全文
import numpy as np
import gym
from gym import spaces
import pygame

class WarehouseEnv(gym.Env):
    metadata = {'render.modes': ['human','ansi']}
    # グリッドのサイズ
    grid_width = 4
    grid_height = 4
    max_x = grid_width - 1
    max_y = grid_height - 1
    grid_pixel = 50
    color_agent = (0, 0, 255)
    color_box = (255, 0, 0)
    color_box_goal = (0, 255, 0)

    # エージェントの初期位置
    agent_start = None
    # 荷物の初期位置
    box_start = None
    # 荷物置き場の位置
    box_goal = None

    # エージェントの位置
    agent_position = None
    # 荷物の位置
    box_position = None
    # 経過ステップ
    step_count = 0
    # 通過回数保持
    step_count_list = np.zeros((grid_width, grid_height), dtype=np.int32)
    action_history = []

    # 荷物を置いたとき(成功)の報酬は1.0、荷物が置き場がある以外の壁にぶつかったとき(失敗)のペナルティは-1.0
    # 荷物を運んでゴールに近づいたら報酬0.2、遠ざかったら-0.2
    # エージェントが移動したときのペナルティは-0.01(最短距離)
    # 移動できないとき-0.1(壁に移動、箱が壁にあるなど)
    # 荷物に近づいたら報酬0.1、遠ざかったら-0.1、荷物近辺での移動は0(荷物の方に行かせる)
    goal_reward = 100.0
    fail_reward = -100.0
    agent_move_penalty = -0.1
    agent_box_reward = 0.5
    agent_box_penalty = -0.5
    # box_move_reward = 0.1
    # box_move_penalty = -0.1
    stay_penalty = -8

    def __init__(self, start_is_random=True, agent_start=None, box_start=None, box_goal=None, max_steps=100):
        """
        WarehouseEnvの初期化関数
        Args
            start_is_random: Trueなら初期位置をランダムにする。Trueの場合、reset()で初期位置を再設定する
            agent_start: エージェントの初期位置(start_is_randomがFalseのときのみ有効)
            box_start: 荷物の初期位置(start_is_randomがFalseのときのみ有効)
            box_goal: 荷物置き場の位置(start_is_randomがFalseのときのみ有効)
        """
        super(WarehouseEnv, self).__init__()
        self.screen = pygame.display.set_mode((self.grid_width * self.grid_pixel, self.grid_height * self.grid_pixel))
        self.start_is_random = start_is_random
        self.max_steps = max_steps
        if start_is_random:
            self._set_start(None, None, None)
        else:
            self._set_start(agent_start, box_start, box_goal)

        # エージェントが取りうる行動空間。上下左右移動
        self.action_space = spaces.Discrete(4)

        # エージェントが受け取りうる観測空間を定義。エージェントと荷物それぞれのX,Y座標を観測する
        # それぞれの座標は0からgrid_width-1, grid_height-1までの整数値
        # 例: [0, 1, 2, 3]は、エージェントが(0,1)にいて、荷物が(2,3)にいることを示す
        self.observation_space = spaces.Box(
            low=np.int32([0,0,0,0]),
            high=np.int32([self.grid_width - 1, self.grid_height - 1, self.grid_width - 1, self.grid_height - 1]), dtype=np.int32)

        # 報酬の範囲[最小値と最大値]を定義。
        self.reward_range = (self.fail_reward, self.goal_reward)

    def _set_start(self, agent_start=None, box_start=None, box_goal=None):
        # エージェントの初期位置を設定
        self.agent_start = agent_start if agent_start is not None else (np.random.randint(0, self.grid_width), np.random.randint(0, self.grid_height))
        # 荷物の初期位置を設定(壁にくっつけない。最初からゴールできない場合が出てきてしまう)
        self.box_start = box_start if box_start is not None else (np.random.randint(1, self.grid_width), np.random.randint(1, self.grid_height))
        # 荷物置き場の位置を設定
        self.box_goal = box_goal if box_goal is not None else (np.random.randint(0, self.grid_width), np.random.randint(0, self.grid_height))

        # 重複チェックとランダムにずらす処理(エージェントと荷物置き場は重なってよい)
        while self.agent_start == self.box_start or self.box_start == self.box_goal:
            if self.agent_start == self.box_start:
                self.agent_start = (np.random.randint(0, self.grid_width), np.random.randint(0, self.grid_height))
            if self.box_start == self.box_goal:
                self.box_goal = (np.random.randint(0, self.grid_width), np.random.randint(0, self.grid_height))

    def reset(self):
        # 環境を初期状態にする関数
        # 初期状態をreturnする
        if self.start_is_random:
            self._set_start(None, None, None)
        self.agent_position = self.agent_start
        self.box_position = self.box_start
        self.step_count = 0
        self.step_count_list = np.zeros((self.grid_width, self.grid_height), dtype=np.int32)
        self.step_count_list[self.agent_start[0], self.agent_start[1]] += 1
        self.action_history = []

        return np.float32([self.agent_position[0], self.agent_position[1], self.box_position[0], self.box_position[1]])

    def step(self, action):
        self.step_count += 1
        self.action_history.append(action)
        info = {}

        # 行動を受け取り行動後の状態をreturnする
        agent_x, agent_y = self.agent_position
        box_x, box_y = self.box_position
        box_goal_x, box_goal_y = self.box_goal

        # エージェントの移動先を計算
        if action == 0:  # up
            next_agent_x, next_agent_y = agent_x, max(agent_y - 1, 0)
        elif action == 1:  # down
            next_agent_x, next_agent_y = agent_x, min(agent_y + 1, self.max_y)
        elif action == 2:  # left
            next_agent_x, next_agent_y = max(agent_x - 1, 0), agent_y
        elif action == 3:  # right
            next_agent_x, next_agent_y = min(agent_x + 1, self.max_x), agent_y
        else:
            raise ValueError("Invalid action")
        
        next_box_x = box_x
        next_box_y = box_y

        # 最短距離考慮用ペナルティ
        reward = self.agent_move_penalty

        reward_infos = []
        # 荷物が隣接していて、エージェントの移動方向に荷物がある場合
        if (next_agent_x, next_agent_y) == self.box_position:
            # 荷物の移動先を計算
            next_box_x += (next_agent_x - agent_x)
            next_box_y += (next_agent_y - agent_y)

            # 荷物がグリッド内に収まる場合のみ移動
            if 0 <= next_box_x <= self.max_x and 0 <= next_box_y <= self.max_y:
                self.box_position = (next_box_x, next_box_y)
                self.agent_position = (next_agent_x, next_agent_y)
        else:
            # 荷物がない場合はエージェントのみ移動
            self.agent_position = (next_agent_x, next_agent_y)

        # 観測を更新
        obs = np.float32([self.agent_position[0], self.agent_position[1], self.box_position[0], self.box_position[1]])

        # 成功
        if self.box_position == self.box_goal:
            return obs, self.goal_reward, True, {"status": "success"}

        # 失敗
        if self.step_count >= self.max_steps:
            return obs, self.fail_reward, True, {"status": "fail (max_steps)"}

        is_fail = False
        next_box_x, next_box_y = self.box_position # 更新後に呼ぶ
        # 箱が壁にあって、置き場が同じ軸にない時はゴール不可なので失敗
        if (next_box_x == 0 and box_goal_x != 0) or (next_box_x == self.max_x and box_goal_x != self.max_x):
            is_fail = True
        if (next_box_y == 0 and box_goal_y != 0) or (next_box_y == self.max_y and box_goal_y != self.max_y):
            is_fail = True
        if is_fail:
            return obs, self.fail_reward, True, {"status": "fail (box_wall)"}

        # 継続
        if (next_agent_x, next_agent_y) == (agent_x, agent_y):
            # 動かなかったらペナルティ
            return obs, self.stay_penalty, False, {"status": "no move"}

        value_bef = self.__calc_order_value(agent_x, box_x, agent_y, box_y)
        value_next = self.__calc_order_value(next_agent_x, next_box_x, next_agent_y, next_box_y)
        if value_bef < value_next:
            # 位置評価が高くなったら報酬
            reward += value_next * 0.01
            reward_infos.append("position_value_reward")
        else:
            reward -= value_next * 0.1
            reward_infos.append("position_value_penalty")

        # エージェントが荷物に近づいたら報酬を得る
        if abs(next_agent_x - box_x) < abs(agent_x - box_x) or abs(next_agent_y - box_y) < abs(agent_y - box_y):
            reward += self.agent_box_reward
            reward_infos.append("agent_approach_box_reward")
        else:
            reward += self.agent_box_penalty
            reward_infos.append("agent_approach_box_penalty")

        reward -= self.step_count_list[next_agent_x, next_agent_y] # 通過回数をペナルティ
        self.step_count_list[next_agent_x, next_agent_y] += 1

        info["status"] = "continue"
        info["reward_infos"] = reward_infos
        return obs, reward, False, info

    def __calc_order_value(self, agent_pos_x: int, box_pos_x: int, agent_pos_y: int, box_pos_y: int):
        box_goal_x, box_goal_y = self.box_goal
        value_x = self.__calc_order_value_each(agent_pos_x, box_pos_x, box_goal_x)
        value_y = self.__calc_order_value_each(agent_pos_y, box_pos_y, box_goal_y)
        value_total = 0
        if value_x == 0:
            value_total += value_y
        if value_y == 0:
            value_total += value_x
        return value_total

    def __calc_order_value_each(self, agent_pos: int, box_pos: int, goal_pos: int):
        """
        エージェント、荷物、荷物置き場の順番の評価(順序逆は同一)
        [['agent', 'box', 'goal']] ・・・別軸で評価:(サイズが1)
        [['box', 'goal'], 'agent'] ・・・別軸で評価:(サイズが2、2つ目がagent)

        ['agent', 'box', 'goal']  ・・・理想順番:評価5(サイズが3、真ん中がbox)
        [['agent', 'box'], 'goal']・・・悪い順番:評価4(サイズが2、2つ目がgoal)悪い状態からの改善
        ['box', 'agent', 'goal']  ・・・悪い順番:評価3(サイズが3、真ん中がagent)
        [['agent', 'goal'], 'box']・・・悪い順番:評価2(サイズが2、2つ目がbox)最悪からの改善
        ['agent', 'goal', 'box']  ・・・悪い順番:評価1(サイズが3、真ん中がgoal)理想の順番にするための距離が遠い
        """
        order = self.__calc_order(agent_pos, box_pos, goal_pos)
        if len(order) == 1:
            return None
        elif len(order) == 2:
            if order[1] == 'agent':
                return None
            elif order[1] == 'box':
                return 2
            elif order[1] == 'goal':
                return 4
        else:
            if order[1] == 'agent':
                return 3
            elif order[1] == 'box':
                return 5
            elif order[1] == 'goal':
                return 1

    def __calc_order(self, agent_pos: int, box_pos: int, goal_pos: int):
        # エージェント、荷物、荷物置き場の順番を計算する関数
        if agent_pos == box_pos and box_pos == goal_pos:
            return [['agent', 'box', 'goal']]

        order = []
        if agent_pos == box_pos:
            order.append(['agent', 'box'])
            order.append('goal')
            return order

        if goal_pos == box_pos:
            order.append(['box', 'goal'])
            order.append('agent')
            return order

        if agent_pos == goal_pos:
            order.append(['agent', 'goal'])
            order.append('box')
            return order

        order.append({'agent': agent_pos})
        order.append({'box': box_pos})
        order.append({'goal': goal_pos})
        order = sorted(order, key=lambda x: list(x.values())[0])

        return order

    def render(self, mode='human'):
        # modeとしてhuman, rgb_array, ansiが選択可能

        def __render_grid():
            self.screen.fill((255,255,255))
            # グリッドを描画する関数
            for x in range(self.grid_width):
                for y in range(self.grid_height):
                    rect = pygame.Rect(x * 50, y * 50, 50, 50)
                    pygame.draw.rect(self.screen, (0, 0, 0), rect, 1)

        def __get_pixel_center(pos):
            # エージェントのピクセル座標を取得する関数
            return (
                pos[0] * self.grid_pixel + int(self.grid_pixel / 2),
                pos[1] * self.grid_pixel + int(self.grid_pixel / 2))

        def __get_draw_rect(pos):
            # エージェントのピクセル座標を取得する関数
            offset = int(self.grid_pixel / 4)
            return (pos[0] * self.grid_pixel + offset, pos[1] * self.grid_pixel + offset, offset * 2, offset * 2)

        # humanなら描画し, rgb_arrayならイメージバイト配列をreturnし, ansiなら文字列をreturnする
        if mode == 'human':
            __render_grid()
            pygame.draw.circle(self.screen, self.color_agent, __get_pixel_center(self.agent_position), int(self.grid_pixel / 3))
            pygame.draw.rect(self.screen,self.color_box,__get_draw_rect(self.box_position))
            pygame.draw.circle(self.screen, self.color_box_goal, __get_pixel_center(self.box_goal), int(self.grid_pixel / 3))
            pygame.display.update()
        else:
            super(WarehouseEnv, self).render(mode=mode)

# 以下、書籍のコードを使用しています。
# https://github.com/oreilly-japan/deep-learning-from-scratch-4/blob/master/ch09/actor_critic.py#L12-L76

import dezero.functions as F
import dezero.layers as L
from dezero import Model
from dezero import optimizers

class PolicyNet(Model):
    def __init__(self, n_actions=4):
        super().__init__()
        self.l1 = L.Linear(128)
        self.l2 = L.Linear(n_actions)

    def forward(self, x):
        y = F.relu(self.l1(x))
        y = F.softmax(self.l2(y))
        return y
    
class ValueNet(Model):
    def __init__(self):
        super().__init__()
        self.l1 = L.Linear(128)
        self.l2 = L.Linear(1)

    def forward(self, x):
        y = F.relu(self.l1(x))
        y = self.l2(y)
        return y

class Agent:
    def __init__(self, n_actions, pretrained=False, pretrained_path=None):
        self.gamma = 0.98
        self.lr_pi = 0.0002
        self.lr_v = 0.0005
        self.actions_size = n_actions

        if pretrained:
            self.pi = PolicyNet(n_actions)
            self.v = ValueNet()
            self.pi.load_weights(pretrained_path + "_policy_net.npz")
            self.v.load_weights(pretrained_path + "_value_net.npz")
        else:
            self.pi = PolicyNet(n_actions)
            self.v = ValueNet()
        self.optimizer_pi = optimizers.Adam(self.lr_pi)
        self.optimizer_pi.setup(self.pi)
        self.optimizer_v = optimizers.Adam(self.lr_v)
        self.optimizer_v.setup(self.v)

    def save_weights(self, path):
        self.pi.save_weights(path + "_policy_net")
        self.v.save_weights(path + "_value_net")

    def get_action(self, state):
        state = state[np.newaxis, :]
        probs = self.pi(state)
        probs = probs[0]
        action = np.random.choice(len(probs), p=probs.data)
        return action, probs[action]

    def update(self, state, action_prob, reward, next_state, done):
        state = state[np.newaxis, :]
        next_state = next_state[np.newaxis, :]

        target = reward + self.gamma * self.v(next_state) * (1 - done)
        target.unchain()
        v = self.v(state)
        loss_v = F.mean_squared_error(v, target)

        delta = target - v
        delta.unchain()
        loss_pi = -F.log(action_prob) * delta

        self.v.cleargrads()
        self.pi.cleargrads()
        loss_v.backward()
        loss_pi.backward()
        self.optimizer_v.update()
        self.optimizer_pi.update()

env = WarehouseEnv(start_is_random=False, agent_start=(0, 0), box_start=(2, 2), box_goal=(3, 3), max_steps=100)
state = env.reset()
done = False
# 
# # # 環境動作テスト
# while not done:
#     env.render(mode='human')
#     action = np.random.choice([0,1,2,3])
#     infos = env.step(action)
#     next_state, reward, done, info = infos
# env.close()
#
# 訓練モード
agent = Agent(4, pretrained=False)
reward_history = []
episodes = 300

best_steps = 100
best_action_history = []
for episode in range(episodes):
    state = env.reset()
    done = False
    total_reward = 0
    while not done:
        action, prob = agent.get_action(state)
        next_state, reward, done, info = env.step(action)
        if info["status"] == "success":
            if best_steps > env.step_count:
                best_steps = env.step_count
                best_action_history = env.action_history.copy()
        agent.update(state, prob, reward, next_state, done)
        # if episode == episodes - 1:
        #     print(f"Episode: {episode}, info:{info}, state:{state}, action:{action}, reward:{reward}, next_state:{next_state}")
        # if (episode + 1) % 300 == 0:
        #     env.render(mode='human')
        state = next_state
        total_reward += reward

    if episode % 100 == 0:
        print(f"Episode: {episode}, Total Reward: {total_reward}")
    reward_history.append(total_reward)

print(best_action_history)

import time

env.reset()
for action in best_action_history:
    env.render(mode='human')
    env.step(action)
    time.sleep(1.5)
env.render(mode='human')
time.sleep(1.5)

env.close
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?