1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

WindowsでのDQN(Deep Q-learning Network)

Last updated at Posted at 2023-12-03

この動画で使っているプログラムを、こちらの記事に転記しています。

Google ColabとWindowsで実行できるプログラムを作成しています。
この記事ではWIndowsで実行できるプログラムを記載しています。
Google Colabで実行できるプログラムは、↓↓↓この記事に転記しています。

OpenAI GYM(CartPole)

CartPoleを強化学習するためのプログラムです。

import copy
from collections import deque
import random
import numpy as np
import gym
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
import torch.nn.functional as F
import time  # 時間計測のために追加

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def add(self, state, action, reward, next_state, done):
        # stateがタプルの場合、最初の要素のみを取り出す
        if isinstance(state, tuple):
            state = state[0]
        if isinstance(next_state, tuple):
            next_state = next_state[0]
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)
    
    def __len__(self):
        return len(self.buffer)
    
    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)
        state, action, reward, next_state, done = zip(*data)
        
        return (torch.tensor(np.array(state), dtype=torch.float32),
                torch.tensor(action),
                torch.tensor(reward, dtype=torch.float32),
                torch.tensor(np.array(next_state), dtype=torch.float32),
                torch.tensor(done, dtype=torch.float32))

class QNet(nn.Module):
    def __init__(self, action_size):
        # super(QNet, self).__init__()
        super().__init__()
        self.fc1 = nn.Linear(4, 128)  # CartPoleの状態は4次元
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class DQNAgent:
    def __init__(self):
        self.gamma = 0.98
        self.lr = 0.0005
        self.epsilon = 0.1
        self.buffer_size = 10000
        self.batch_size = 32
        self.action_size = 2
        
        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size).cuda()
        self.qnet_target = QNet(self.action_size).cuda()
        self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr)
        # self.optimizer.setup(self.qnet)
    
    def sync_qnet(self):
        # self.qnet_target = copy.deepcopy(self.qnet)
        self.qnet_target.load_state_dict(self.qnet.state_dict())
    
    def get_action(self, state):
        print('state:', state)
        # stateがNumPy配列の場合、そのまま使用
        if isinstance(state, np.ndarray):
            state_array = state
        # stateがリストの場合、NumPy配列に変換
        elif isinstance(state, list):
            state_array = np.array(state)
        # stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
        elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
            state_array = state[0]
        # それ以外の場合はエラー
        else:
            raise TypeError(f"Unrecognized state format: {state}")
        
        # state_array = np.array(state) #tuple型をリスト型に変換
        state_tensor = torch.from_numpy(state_array).float().unsqueeze(0).cuda()  # NumPy配列をTensorに変換
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_size)
        else:
            qs = self.qnet(state_tensor)
            return qs.argmax().item()  # .item() を追加してPythonの数値に変換
    
    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        if len(self.replay_buffer) < self.batch_size:
            return
        
        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        state = state.cuda()
        action = action.cuda()
        reward = reward.cuda()
        next_state = next_state.cuda()
        done = done.cuda()
        
        # ネットワークの出力と損失の計算
        qs = self.qnet(state)
        q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
        
        next_qs = self.qnet_target(next_state)
        next_q = next_qs.max(1)[0]
        target = reward + (1 - done) * self.gamma * next_q
        loss = F.mse_loss(q, target)
        
        # バックプロパゲーション
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

episodes = 300
sync_interval = 20

env = gym.make('CartPole-v1', render_mode='rgb_array')
agent = DQNAgent()

# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time()  # 開始時刻を記録

for episode in range(episodes):
    print('episode:', episode)
    state = env.reset()
    done = False
    total_reward = 0
    total_loss = 0
    step_count = 0
    
    while not done:
        # 行動をランダムで選択
        action = agent.get_action(state)
        
        # 行動後の状態を変数に代入
        next_state, reward, done, info = env.step(action)[:4]
        
        loss = agent.update(state, action, reward, next_state, done)
        
        if loss is not None:
            total_loss += loss.item()  # 損失の合計を更新
        
        state = next_state
        total_reward += reward
        step_count += 1
        
        # ゲームの状態を描画
        env.render()
    
    if episode % sync_interval == 0:
        agent.sync_qnet()
    
    reward_history.append(total_reward)
    loss_history.append(total_loss / step_count if step_count > 0 else 0)
    print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")

end_time = time.time()  # エピソードの終了時刻を記録
episode_duration = end_time - start_time  # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)

# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_cartpole_model.pth")
env.close()

OpenAI GYM(CartPole)評価用

CartPoleを評価するためのプログラムです。

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

class QNet(nn.Module):
    def __init__(self, action_size):
        super().__init__()
        self.fc1 = nn.Linear(4, 128)  # CartPoleの状態は4次元
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def load_model(model_path):
    model = QNet(action_size=2).cuda()
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

def evaluate_model(model, num_episodes=100):
    env = gym.make('CartPole-v1', render_mode='rgb_array')
    total_rewards = []
    
    for _ in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            # stateがタプルの場合、最初の要素を取り出す
            if isinstance(state, tuple):
                state_array = state[0]
            else:
                state_array = state
            
            state_tensor = torch.from_numpy(state_array).float().unsqueeze(0).cuda()
            action = model(state_tensor).max(1)[1].item()
            state, reward, done, _ = env.step(action)[:4]
            total_reward += reward
            
            env.render()
        
        total_rewards.append(total_reward)
    
    env.close()
    return total_rewards

# モデルのロード
model_path = "dqn_cartpole_model.pth"
model = load_model(model_path)

# モデルの評価
rewards = evaluate_model(model, num_episodes=100)
min_reward = min(rewards)
max_reward = max(rewards)
avg_reward = np.mean(rewards)
print(f"Minimum Reward: {min_reward}")
print(f"Maximum Reward: {max_reward}")
print(f"Average Reward: {avg_reward}")

8パズル

8パズルをプレイするためのプログラムです。

class TileSlidePuzzleEnv:
    def __init__(self, size=3):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)
        self.reset()
    
    def reset(self):
        # ゴール状態を設定してからランダムに混ぜる
        self.board = np.arange(self.size**2).reshape(self.size, self.size)
        for _ in range(50):  # 例えば50回のランダムな動き
            action = np.random.choice(4)
            self.slide_tile(action)
        return self.board.flatten()
    
    def step(self, action):
        moved = self.slide_tile(action)
        next_state = self.board.flatten()
        reward = self.calculate_reward()
        done = self.is_solved()
        info = {}
        return next_state, reward, done, info
    
    def slide_tile(self, direction):
        x, y = np.where(self.board == 0)
        x, y = int(x), int(y)
        if direction == 0 and x < self.size - 1:  # 上にスライド
            self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
        elif direction == 1 and x > 0:  # 下にスライド
            self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
        elif direction == 2 and y < self.size - 1:  # 左にスライド
            self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
        elif direction == 3 and y > 0:  # 右にスライド
            self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
        return True
    
    def calculate_reward(self):
        correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
        return correct_tiles
    
    def is_solved(self):
        return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
    
    def display(self):
        print(self.board)
    
    def user_play(self):
        self.reset()
        self.display()
        while not self.is_solved():
            move = input("Move (0:Up, 1:Down, 2:Left, 3:Right): ")
            if move.isdigit() and int(move) in [0, 1, 2, 3]:
                self.step(int(move))
                self.display()
            else:
                print("Invalid move. Please enter 0, 1, 2, or 3.")
        print("Congratulations! Puzzle solved.")

# ゲームの実行
env = TileSlidePuzzleEnv()
env.user_play()

PytorchによるDQN(8パズル)

8パズルを強化学習するためのプログラムです。

import copy
from collections import deque
import random
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
import torch.nn.functional as F
import time  # 時間計測のために追加

# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def add(self, state, action, reward, next_state, done):
        # stateがタプルの場合、最初の要素のみを取り出す
        if isinstance(state, tuple):
            state = state[0]
        if isinstance(next_state, tuple):
            next_state = next_state[0]
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)
    
    def __len__(self):
        return len(self.buffer)
    
    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)
        state, action, reward, next_state, done = zip(*data)
        return (torch.tensor(np.array(state), dtype=torch.float32),
                torch.tensor(action),
                torch.tensor(reward, dtype=torch.float32),
                torch.tensor(np.array(next_state), dtype=torch.float32),
                torch.tensor(done, dtype=torch.float32))

class QNet(nn.Module):
    def __init__(self, action_size):
        super().__init__()
        self.fc1 = nn.Linear(9, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 128)
        self.fc4 = nn.Linear(128, action_size)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

class DQNAgent:
    def __init__(self):
        self.gamma = 0.98
        self.lr = 0.0005
        self.epsilon = 0.2 # 0.2にすると10回に2回の確率でランダムに実行する。
        self.buffer_size = 10000
        self.batch_size = 32
        self.action_size = 4 # 'up', 'down', 'left', 'right'
        
        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size).to(device)
        self.qnet_target = QNet(self.action_size).to(device)
        self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr)
    
    def sync_qnet(self):
        self.qnet_target.load_state_dict(self.qnet.state_dict())
    
    def get_action(self, state):
        print('state:', state)
        # stateがNumPy配列の場合、そのまま使用
        if isinstance(state, np.ndarray):
            state_array = state
        # stateがリストの場合、NumPy配列に変換
        elif isinstance(state, list):
            state_array = np.array(state)
        # stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
        elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
            state_array = state[0]
        # それ以外の場合はエラー
        else:
            raise TypeError(f"Unrecognized state format: {state}")
        
        # state_array = np.array(state) #tuple型をリスト型に変換
        state_tensor = torch.from_numpy(state_array.flatten()).float().unsqueeze(0).to(device)  # NumPy配列をTensorに変換
        if np.random.rand() < self.epsilon:
            print('random mode')
            return np.random.choice(self.action_size)
        else:
            print('qnet mode')
            qs = self.qnet(state_tensor)
            return qs.argmax().item()  # .item() を追加してPythonの数値に変換
    
    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        if len(self.replay_buffer) < self.batch_size:
            return
        
        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        state = state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
        action = action.to(device)
        reward = reward.to(device)
        next_state = next_state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
        done = done.to(device)
        
        # ネットワークの出力と損失の計算
        qs = self.qnet(state)
        q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
        
        next_qs = self.qnet_target(next_state)
        next_q = next_qs.max(1)[0]
        target = reward + (1 - done) * self.gamma * next_q
        loss = F.mse_loss(q, target)
        
        # バックプロパゲーション
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss  # 損失の値を返す

class TileSlidePuzzleEnv:
    def __init__(self, size=3):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)
        self.reset()
    
    def reset(self):
        # nums = np.arange(1, self.size**2)
        # np.random.shuffle(nums)
        # self.board = np.insert(nums, 0, 0).reshape(self.size, self.size)
        # return self.board.flatten()
        
        # ゴール状態を設定
        self.board = np.arange(self.size**2).reshape(self.size, self.size)
        
        # 一定回数のランダムな動きでタイルを混ぜる
        for _ in range(50):  # 例えば50回のランダムな動き
            action = np.random.choice(4)
            self.slide_tile(action)
        
        return self.board.flatten()
    
    def step(self, action):
        # タイルをスライドさせるロジックを実装
        # actionはタイルを移動する方向(例: 0=上, 1=下, 2=左, 3=右)
        print('action:', action)
        moved = self.slide_tile(action)
        self.display()
        
        # 次の状態(フラット化されたボード)
        next_state = self.board.flatten()
        
        # 報酬の計算(例:正しい位置にあるタイルの数)
        reward = self.calculate_reward()
        print('reward:', reward)
        
        # ゲーム終了条件のチェック
        done = self.is_solved()
        
        # 追加情報(空の辞書を返す)
        info = {}
        
        # next_state, reward, done, infoを返す
        return next_state, reward, done, info
    
    def slide_tile(self, direction):
        # 空白タイルの位置を見つける
        x, y = np.where(self.board == 0)
        x, y = int(x), int(y)
        
        if direction == 0 and x < self.size - 1:
            self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
        elif direction == 1 and x > 0:
            self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
        elif direction == 2 and y < self.size - 1:
            self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
        elif direction == 3 and y > 0:
            self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
    
    def calculate_reward(self):
        # 報酬の計算ロジック
        # 例:正しい位置にあるタイルの数を報酬とする
        correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
        return correct_tiles
    
    def is_solved(self):
        # パズルが解かれたかどうかをチェック
        return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
    
    def display(self):
        # パズルの現在の状態を表示
        print(self.board)
    
    def render(self):
        print(self.board)

episodes = 10000
sync_interval = 20
limit_slide_count = 200

# タイルスライドパズル環境のインスタンス化
env = TileSlidePuzzleEnv()
agent = DQNAgent()

# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time()  # 開始時刻を記録

for episode in range(episodes):
    print('----- ----- episode:', episode, ' ----- -----')
    state = env.reset()
    done = False
    total_reward = 0
    total_loss = 0
    slide_count = 0
    step_count = 0
    
    while not done:
        slide_count += 1
        print('----- ----- episode:', episode, ' ----- -----')
        print('slide _count:', slide_count)
        
        # 行動をランダムで選択
        action = agent.get_action(state)
        
        # 行動後の状態を変数に代入
        next_state, reward, done, info = env.step(action)[:4]
        
        loss = agent.update(state, action, reward, next_state, done)
        
        if loss is not None:
            total_loss += loss.item()  # 損失の合計を更新
        
        state = next_state
        total_reward += reward
        step_count += 1
        
        if done:
            total_reward = 10000
        print('total_reward:', total_reward)
        
        if slide_count >= limit_slide_count:
            done = True
    
    if episode % sync_interval == 0:
        agent.sync_qnet()
    
    reward_history.append(total_reward)
    loss_history.append(total_loss / step_count if step_count > 0 else 0)
    print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")

end_time = time.time()  # エピソードの終了時刻を記録
episode_duration = end_time - start_time  # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)

# エピソード番号のリストを生成
episodes = list(range(len(reward_history)))

# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_tile_slide_model.pth")

# グラフの作成
plt.figure(figsize=(10, 6))
plt.plot(episodes, reward_history, marker='o')
plt.title('Episode vs Total Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?