LoginSignup
0
0

DQN(Deep Q-learning Network)とCNNを使った8パズル

Last updated at Posted at 2024-05-12

記事の概要

以下の記事で8パズルをクリアするためのDQNのプログラムを作成している。

上記のプログラムは、環境の情報をパラメータ(「0,1,2,3,4,5,6,7,8」のような形式)としてニューラルネットワークに共有し、ゲームを実行している。

今回の記事では、環境の情報を画像として保存し、その画像をCNNを持つニューラルネットワークに共有することで、ゲームを実行する。

DQNとは

DQN(Deep Q-learning Network)は、深層強化学習のアプローチで、ディープラーニングとQ学習を組み合わせたものを言う。

一般的なDQNのフローを以下に記載する。

DQNの特徴として、以下の3つをあげる。

1.Q値の更新

Q値はディープラーニング以前からある考え方で、既に経験したエピソードから報酬を最大化できるようにパラメータを更新していくプロセスを導入している。
DQNの場合は、更新対象はニューラルネットワークのパラメータとなる。

2.1つのニューラルネットワークと2つのモデル

ニューラルネットワークは1つだけ定義しますが、「①状態」をもとに「②行動」を選択する「エージェントモデル」と、エージェントモデルが行動した結果をもとに実際に学習を実施する「DQNモデル」で同じニューラルネットワークを利用し、インスタンス自体は別として保持する。

この前提で上記のフローを簡略化すると以下になる。

0.諸々初期化
1.「エージェントモデル」が状態から行動し、報酬を得る。
2.リプレイバッファに状態、行動、報酬、次状態を保存する。
3.リプレイバッファからサンプルでN件取得し、「DQNモデル」を学習される。
4.M回に1度、DQNモデルをエージェントモデルに反映させる。(「DQNモデル」→「エージェントモデル」)
5.終了するまで、1から4を繰り返す。

3.ε-greedy方策

また、ε-greedy方策の考え方として、エージェントモデルが選択する行動を、最初はランダム(ε値が高い)に行動を選択させ、エピソードの進捗によりランダムの確立を下げる(ε値が低い)ことで、学習の結果に基づいて行動を選択するようにする。

CNNとは

画像データを処理するためのニューラルネットワークで、カーネルという小さなフィルタを複数使って、特徴を抽出するコンボリューション層と、その抽出したデータの解像度を下げることで、汎用的に画像を処理するためのプーリング層を用いる。

全結合層からCNN(コンボリューション層+プーリング層)への変更

環境の情報をパラメータ(「0,1,2,3,4,5,6,7,8」のような形式)から画像に変更するに当たり、全結合層の一部(インプット側)をCNNに変更する。アウトプット側は全結合層のまま。

環境

OS:Windows 11
GPU:GeForce RTX 3060 laptop
CPU:i7-10750H
memory:16G
python:3.10.11
pytorch:2.1.2
CUDA:11.8
cuDNN:8.8

CNNを使った8パズルの深層強化学習

「一般的なDQNのフロー」として記載したフローはエピソードごとにリプレイバッファに保存しているが、今回のプログラムは、エピソードよりも細かいステップごとにリプレイバッファに保存している。

画像で処理するため、一度画像ファイルを保存することになる。
全てのエピソードを保存すると、とてつもない容量が必要になるため、1エピソード分の画像ファイルを保存する。

画像保存用のフォルダを作成するために、プログラムを実行するフォルダで以下のコマンドを実行する。

mkdir puzzle_states

ライブラリをインストールする。

pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu118
pip install numpy matplotlib Pillow

以下のプログラムを実行する。

import os
import copy
from collections import deque
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optimizers
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
import time
import torchvision
from torchvision.utils import save_image
from PIL import Image, ImageDraw, ImageFont
from torchvision.transforms import ToTensor


# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def __len__(self):
        return len(self.buffer)
    
    def get_batch(self):
        sample = random.sample(self.buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*sample)
        
        # リストをテンソルに変換し、すべての要素が適切に形状付けされていることを確認
        states = torch.stack(states).to(device).float()  # すべてのstatesがテンソルであることを確認し、スタックする
        actions = torch.tensor(actions, dtype=torch.long, device=device)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
        next_states = torch.stack(next_states).to(device).float()  # すべてのnext_statesがテンソルであることを確認し、スタックする
        dones = torch.tensor(dones, dtype=torch.float32, device=device)
        
        return states, actions, rewards, next_states, dones

class QNet(nn.Module):
    def __init__(self, action_size):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(32 * 4 * 4, 256)
        self.dropout1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(256, 128)
        self.dropout2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(128, 64)
        self.dropout3 = nn.Dropout(p=0.2)
        self.fc4 = nn.Linear(64, action_size)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = torch.flatten(x, start_dim=1)
        x = F.leaky_relu(self.fc1(x), 0.01)
        x = self.dropout1(x)
        x = F.leaky_relu(self.fc2(x), 0.01)
        x = self.dropout2(x)
        x = F.leaky_relu(self.fc3(x), 0.01)
        x = self.dropout3(x)
        x = self.fc4(x)
        return x

class DQNAgent:
    def __init__(self, action_size=4, gamma=0.98, lr=0.0005, epsilon_decay=0.9995):
        self.gamma = gamma
        self.epsilon = 1.0  # Initial epsilon value
        self.epsilon_min = 0.01
        self.epsilon_decay = epsilon_decay
        self.replay_buffer = ReplayBuffer(10000, 32)
        self.qnet = QNet(action_size).to(device)
        self.qnet_target = copy.deepcopy(self.qnet)
        self.optimizer = optimizers.Adam(self.qnet.parameters(), lr, weight_decay=1e-5)
        self.scheduler = StepLR(self.optimizer, step_size=100, gamma=self.gamma)
    
    def sync_qnet(self):
        self.qnet_target.load_state_dict(self.qnet.state_dict())
    
    def get_action(self, state):
        state_tensor = state.unsqueeze(0).to(device)
        if random.random() < self.epsilon:
            return random.randrange(4)
        else:
            with torch.no_grad():
                return self.qnet(state_tensor).argmax().item()
    
    def update(self):
        if len(self.replay_buffer) < self.replay_buffer.batch_size:
            return None
        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        qs = self.qnet(state)
        next_qs = self.qnet_target(next_state)
        max_next_qs = next_qs.max(1)[0]
        target_qs = reward + (1 - done) * self.gamma * max_next_qs
        action = action.unsqueeze(1)  # action テンソルを [batch_size, 1] に変換
        loss = F.mse_loss(qs.gather(1, action).squeeze(), target_qs)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()
    
    def update_scheduler(self):
        self.scheduler.step()

class TileSlidePuzzleEnv:
    def __init__(self, size=3):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)
        self.last_state = None
        self.image_count = 0  # 画像ファイルのカウンター
        self.reset()
    
    def reset(self):
        self.board = np.arange(self.size**2).reshape(self.size, self.size)
        self.last_state = None
        self.step_count = 0
        self.image_count = 0  # 画像カウンターをリセット
        for _ in range(50):
            action = np.random.choice(4)
            self.slide_tile(action)
        image_path = self.save_state_as_image(self.board)
        return self.load_image_as_tensor(image_path)
    
    def step(self, action):
        moved = self.slide_tile(action)
        next_state = self.board.copy()
        reward = -1 if not moved else self.calculate_reward() * 0.1
        done = self.is_solved()
        if done:
            reward = 100
        image_path = self.save_state_as_image(next_state)  # 状態を画像として保存
        return self.load_image_as_tensor(image_path), reward, done, {}
    
    def save_state_as_image(self, state):
        tile_size = 12  # タイル一つあたりのサイズ
        image_size = tile_size * self.size
        image = Image.new('RGB', (image_size, image_size), color='white')
        draw = ImageDraw.Draw(image)
        font_path = "arial.ttf"
        font = ImageFont.truetype(font_path, 8) if os.path.exists(font_path) else ImageFont.load_default()
        
        for i in range(self.size):
            for j in range(self.size):
                x = j * tile_size
                y = i * tile_size
                value = state[i, j]
                text = str(value) if value != 0 else ""
                bbox = draw.textbbox((0, 0), text, font=font)
                text_width = bbox[2] - bbox[0]
                text_height = bbox[3] - bbox[1]
                draw.text((x + (tile_size - text_width) / 2, y + (tile_size - text_height) / 2), text, fill='black', font=font)
        
        file_path = f'puzzle_states/state_{self.image_count}.png'
        try:
            image.save(file_path)
        except PermissionError:
            # ファイル名を変更して再試行
            alternative_path = f'puzzle_states/state_{self.image_count}_alternative.png'
            try:
                image.save(alternative_path)
                print(f"Permission denied for original path, saved image to {alternative_path}")
            except Exception as e:
                print(f"Failed to save image on alternative path: {e}")
        except Exception as e:
            print(f"Failed to save image: {e}")
        
        self.image_count += 1
        return file_path
    
    def load_image_as_tensor(self, file_path):
        image = Image.open(file_path).convert('L')  # グレースケールに変換
        return ToTensor()(image)  # PIL Imageをテンソルに変換
    
    def slide_tile(self, direction):
        x, y = np.where(self.board == 0)
        x, y = int(x), int(y)
        moved = False
        if direction == 0 and x < self.size - 1:
            self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
            moved = True
        elif direction == 1 and x > 0:
            self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
            moved = True
        elif direction == 2 and y < self.size - 1:
            self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
            moved = True
        elif direction == 3 and y > 0:
            self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
            moved = True
        return moved
    
    def calculate_reward(self):
        correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
        return correct_tiles
    
    def is_solved(self):
        return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
    
episodes = 60000
sync_interval = 20
limit_step_count = 300

# タイルスライドパズル環境のインスタンス化
env = TileSlidePuzzleEnv()
agent = DQNAgent()

# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time()  # 開始時刻を記録

for episode in range(episodes):
    print('----- ----- episode:', episode, ' ----- -----')
    state = env.reset()
    done = False
    total_reward = 0
    total_loss = 0
    step_count = 0
    
    while not done:
        step_count += 1
        
        # 行動をランダムで選択
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        agent.replay_buffer.add(state, action, reward, next_state, done)
        loss = agent.update()
        
        if loss:
            total_loss += loss
        
        state = next_state
        total_reward += reward
        
        if done:
            total_reward = 10000
        
        if step_count >= limit_step_count:
            done = True
    
    # エピソード終了時にepsilonを更新
    agent.epsilon = max(agent.epsilon_min, agent.epsilon_decay * agent.epsilon)
    
    # エピソード終了時にスケジューラを更新
    agent.update_scheduler()
    
    if episode % sync_interval == 0:
        agent.sync_qnet()
    
    reward_history.append(total_reward)
    loss_history.append(total_loss / step_count if step_count > 0 else 0)
    print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")

end_time = time.time()  # エピソードの終了時刻を記録
episode_duration = end_time - start_time  # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)

# エピソード番号のリストを生成
episodes = list(range(len(reward_history)))

# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_tile_slide_model.pth")

# グラフの作成
plt.figure(figsize=(10, 6))
plt.plot(episodes, reward_history, marker='o')
plt.title('Episode vs Total Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()

約5時間で学習は終了し、結果として以下のようなグラフが出力される。(DQNの性質上結果は毎回異なる)
縦軸が10000に達しているのが成功したエピソードで、30000エピソード目あたりからはほとんどミスしていないことが分かる。

Figure_1.png

学習モデルの検証

以下のプログラムで学習したモデルの性能を確認する。
学習済みモデル(「dqn_tile_slide_model.pth」)とpuzzle_statesフォルダが存在するフォルダで実行する必要あり。

import os
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
from torchvision.transforms import ToTensor

# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 同じQNetクラスとTileSlidePuzzleEnvクラスを使用
class QNet(nn.Module):
    def __init__(self, action_size):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(32 * 4 * 4, 256)
        self.dropout1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(256, 128)
        self.dropout2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(128, 64)
        self.dropout3 = nn.Dropout(p=0.2)
        self.fc4 = nn.Linear(64, action_size)
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = torch.flatten(x, start_dim=1)
        x = F.leaky_relu(self.fc1(x), 0.01)
        x = self.dropout1(x)
        x = F.leaky_relu(self.fc2(x), 0.01)
        x = self.dropout2(x)
        x = F.leaky_relu(self.fc3(x), 0.01)
        x = self.dropout3(x)
        x = self.fc4(x)
        return x

class TileSlidePuzzleEnv:
    def __init__(self, size=3):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)
        self.last_state = None
        self.image_count = 0  # 画像ファイルのカウンター
        self.reset()
    
    def reset(self):
        self.board = np.arange(self.size**2).reshape(self.size, self.size)
        self.last_state = None
        self.step_count = 0
        self.image_count = 0  # 画像カウンターをリセット
        for _ in range(50):
            action = np.random.choice(4)
            self.slide_tile(action)
        image_path = self.save_state_as_image(self.board)
        return self.load_image_as_tensor(image_path)
    
    def step(self, action):
        moved = self.slide_tile(action)
        next_state = self.board.copy()
        reward = -1 if not moved else self.calculate_reward() * 0.1
        done = self.is_solved()
        if done:
            reward = 100
        image_path = self.save_state_as_image(next_state)  # 状態を画像として保存
        return self.load_image_as_tensor(image_path), reward, done, {}
    
    def save_state_as_image(self, state):
        tile_size = 12  # タイル一つあたりのサイズ
        image_size = tile_size * self.size
        image = Image.new('RGB', (image_size, image_size), color='white')
        draw = ImageDraw.Draw(image)
        font_path = "arial.ttf"
        font = ImageFont.truetype(font_path, 8) if os.path.exists(font_path) else ImageFont.load_default()
        
        for i in range(self.size):
            for j in range(self.size):
                x = j * tile_size
                y = i * tile_size
                value = state[i, j]
                text = str(value) if value != 0 else ""
                bbox = draw.textbbox((0, 0), text, font=font)
                text_width = bbox[2] - bbox[0]
                text_height = bbox[3] - bbox[1]
                draw.text((x + (tile_size - text_width) / 2, y + (tile_size - text_height) / 2), text, fill='black', font=font)
        
        file_path = f'puzzle_states/state_{self.image_count}.png'
        try:
            image.save(file_path)
        except PermissionError:
            # ファイル名を変更して再試行
            alternative_path = f'puzzle_states/state_{self.image_count}_alternative.png'
            try:
                image.save(alternative_path)
                print(f"Permission denied for original path, saved image to {alternative_path}")
            except Exception as e:
                print(f"Failed to save image on alternative path: {e}")
        except Exception as e:
            print(f"Failed to save image: {e}")
        
        self.image_count += 1
        return file_path
    
    def load_image_as_tensor(self, file_path):
        image = Image.open(file_path).convert('L')  # グレースケールに変換
        return ToTensor()(image)  # PIL Imageをテンソルに変換
    
    def slide_tile(self, direction):
        x, y = np.where(self.board == 0)
        x, y = int(x), int(y)
        moved = False
        if direction == 0 and x < self.size - 1:
            self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
            moved = True
        elif direction == 1 and x > 0:
            self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
            moved = True
        elif direction == 2 and y < self.size - 1:
            self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
            moved = True
        elif direction == 3 and y > 0:
            self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
            moved = True
        return moved
    
    def calculate_reward(self):
        correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
        return correct_tiles
    
    def is_solved(self):
        return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))

def test_model(model, num_episodes=10):
    env = TileSlidePuzzleEnv()
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0
        steps = 0
        
        while not done:
            state_tensor = state.unsqueeze(0).to(device)
            with torch.no_grad():
                action = model(state_tensor).argmax().item()
            next_state, reward, done, _ = env.step(action)
            state = next_state
            total_reward += reward
            steps += 1
            if steps >= 50:  # 50ステップ以上で終了
                break
        
        print(f'Episode {episode + 1}: Steps = {steps}, Total Reward = {total_reward}, Solved = {done}')

# モデルのロード
model_path = "dqn_tile_slide_model.pth"
model = QNet(action_size=4).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()

# テスト実行

test_model(model, num_episodes=5)

5回テスト(num_episodes=5)するプログラムを実行すると以下の結果となる。
「Solved = True」となっていることから、5回ともパズルがクリアできていることが分かる。

Episode 1: Steps = 9, Total Reward = 104.1, Solved = True
Episode 2: Steps = 6, Total Reward = 102.5, Solved = True
Episode 3: Steps = 14, Total Reward = 104.7, Solved = True
Episode 4: Steps = 11, Total Reward = 103.8, Solved = True
Episode 5: Steps = 8, Total Reward = 103.8, Solved = True
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0