LoginSignup
0
2

M1 MACで深層強化学習(スペック的には厳しい)

Posted at

記事の概要

m1 mac miniを用いて、深層強化学習を行うための手順をまとめます。
ただ、ある程度大きいニューラルネットワークの場合は、mpsで処理をするよりも、CPUで処理した方が処理が早くなるということが分かったので、メモリの小さいmacの場合はオススメはしません。(メモリ8Gでディープラーニングは流石に厳しい。。。)

環境

OS:macOS Ventura 13.2.1
プロセッサー:M1
memory:8G
python:3.9.12

環境構築

以下のコマンドでpython仮想環境を作成する。

python -m venv myenv

python仮想環境をactivateする。

source myenv/bin/activate

pytorch等のライブラリをインストールする。

pip install torch torchvision torchaudio
pip install matplotlib
pip install IPython

環境確認

以下のプログラムを実行し、「Using device: mps」と表示されることを確認する。

import torch

# GPUが利用可能かどうかをチェック
is_cuda_available = torch.cuda.is_available()

# Apple Siliconの場合、mpsデバイスが利用可能かどうかをチェック
is_mps_available = torch.backends.mps.is_available()

print(f"CUDA is available: {is_cuda_available}")
print(f"MPS is available: {is_mps_available}")

# 利用可能なデバイスを表示(mpsまたはcpu)
device = torch.device("mps" if is_mps_available else "cpu")
print(f"Using device: {device}")

深層強化学習用のプログラムを実行する。

以下の記事プログラムを流用し、m1 macで深層強化学習を行えるように修正したプログラムを実行する。
(Windowsからは「★この行のみを修正」のみを修正して実行可能)

import copy
from collections import deque
import random
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
import time  # 時間計測のために追加

# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu") # ★この行のみを修正
print(f"Using device: {device}")

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def add(self, state, action, reward, next_state, done):
        # stateがタプルの場合、最初の要素のみを取り出す
        if isinstance(state, tuple):
            state = state[0]
        if isinstance(next_state, tuple):
            next_state = next_state[0]
        data = (state, action, reward, next_state, done)
        self.buffer.append(data)
    
    def __len__(self):
        return len(self.buffer)
    
    def get_batch(self):
        data = random.sample(self.buffer, self.batch_size)
        state, action, reward, next_state, done = zip(*data)
        return (torch.tensor(np.array(state), dtype=torch.float32),
                torch.tensor(action),
                torch.tensor(reward, dtype=torch.float32),
                torch.tensor(np.array(next_state), dtype=torch.float32),
                torch.tensor(done, dtype=torch.float32))

class QNet(nn.Module):
    def __init__(self, action_size):
        super().__init__()
        self.fc1 = nn.Linear(9, 512)
        self.dropout1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(512, 512)
        self.dropout2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(512, 256)
        self.dropout3 = nn.Dropout(p=0.2)
        self.fc4 = nn.Linear(256, 128)
        self.dropout4 = nn.Dropout(p=0.2)
        self.fc5 = nn.Linear(128, 64)
        self.dropout5 = nn.Dropout(p=0.2)
        self.fc6 = nn.Linear(64, action_size)
    
    def forward(self, x):
        x = F.leaky_relu(self.fc1(x), 0.01)
        x = self.dropout1(x)
        x = F.leaky_relu(self.fc2(x), 0.01)
        x = self.dropout2(x)
        x = F.leaky_relu(self.fc3(x), 0.01)
        x = self.dropout3(x)
        x = F.leaky_relu(self.fc4(x), 0.01)
        x = self.dropout4(x)
        x = F.leaky_relu(self.fc5(x), 0.01)
        x = self.dropout5(x)
        x = self.fc6(x)
        return x

class DQNAgent:
    def __init__(self):
        self.gamma = 0.98
        self.lr = 0.0005
        self.epsilon_start = 1  # 初期値
        self.epsilon_end = 0.01   # 最小値
        self.epsilon_decay = 0.9995  # 減衰率
        self.epsilon = self.epsilon_start  # 現在のepsilon値
        self.buffer_size = 10000
        self.batch_size = 32
        self.action_size = 4 # 'up', 'down', 'left', 'right'
        
        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
        self.qnet = QNet(self.action_size).to(device)
        self.qnet_target = QNet(self.action_size).to(device)
        self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr, weight_decay=1e-5)
        self.scheduler = StepLR(self.optimizer, step_size=100, gamma=self.gamma)  # 100ステップごとに学習率を0.98倍にする
    
    def sync_qnet(self):
        self.qnet_target.load_state_dict(self.qnet.state_dict())
    
    def get_action(self, state):
        print('state:', state)
        # stateがNumPy配列の場合、そのまま使用
        if isinstance(state, np.ndarray):
            state_array = state
        # stateがリストの場合、NumPy配列に変換
        elif isinstance(state, list):
            state_array = np.array(state)
        # stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
        elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
            state_array = state[0]
        # それ以外の場合はエラー
        else:
            raise TypeError(f"Unrecognized state format: {state}")
        
        # state_array = np.array(state) #tuple型をリスト型に変換
        state_tensor = torch.from_numpy(state_array.flatten()).float().unsqueeze(0).to(device)  # NumPy配列をTensorに変換
        if np.random.rand() < self.epsilon:
            print('random mode')
            return np.random.choice(self.action_size)
        else:
            print('qnet mode')
            qs = self.qnet(state_tensor)
            return qs.argmax().item()  # .item() を追加してPythonの数値に変換
    
    def update(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        if len(self.replay_buffer) < self.batch_size:
            return None  # バッチサイズに達していない場合はNoneを返す
        
        state, action, reward, next_state, done = self.replay_buffer.get_batch()
        state = state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
        action = action.to(device)
        reward = reward.to(device)
        next_state = next_state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
        done = done.to(device)
        
        # ネットワークの出力と損失の計算
        qs = self.qnet(state)
        q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
        
        next_qs = self.qnet_target(next_state)
        next_q = next_qs.max(1)[0]
        target = reward + (1 - done) * self.gamma * next_q
        loss = F.mse_loss(q, target)
        
        # バックプロパゲーション
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss  # 損失の値を返す
    
    # 学習率のスケジューラを更新する関数
    def update_scheduler(self):
        self.scheduler.step()

class TileSlidePuzzleEnv:
    def __init__(self, size=3):
        self.size = size
        self.board = np.zeros((self.size, self.size), dtype=int)
        self.last_state = None  # 前回の状態を保存する変数
        self.step_count = 0  # ステップカウンターを追加
        self.reset()
    
    def reset(self):
        # ゴール状態を設定
        self.board = np.arange(self.size**2).reshape(self.size, self.size)
        self.last_state = None  # 前回の状態をリセット
        self.step_count = 0  # ステップカウンターをリセット
        
        # 一定回数のランダムな動きでタイルを混ぜる
        for _ in range(50):  # 例えば50回のランダムな動き
            action = np.random.choice(4)
            self.slide_tile(action)
        
        return self.board.flatten()
    
    def step(self, action):
        # タイルをスライドさせるロジックを実装
        # actionはタイルを移動する方向(例: 0=上, 1=下, 2=左, 3=右)
        print('action:', action)
        
        # ステップカウントを更新
        self.step_count += 1
        
        # 現在の状態を保存
        current_state = self.board.flatten()
        
        moved = self.slide_tile(action)
        self.display()
        
        # 次の状態(フラット化されたボード)
        next_state = self.board.flatten()
        
        # 報酬の計算
        if not moved:
            reward = -1
        elif self.last_state is not None and np.array_equal(self.last_state, next_state):
            reward = -1  # 2回前と同じ動きの場合
        elif self.step_count > 100:
            reward = -0.1  # 100ステップを超えた場合
        else:
            reward = self.calculate_reward() * 0.1
        
        # ゲーム終了条件のチェック
        done = self.is_solved()
        if done:
            reward = 10
        
        print('reward:', reward)
        
        # 状態の更新
        self.last_state = current_state
        
        # 追加情報(空の辞書を返す)
        info = {}
        
        # next_state, reward, done, infoを返す
        return next_state, reward, done, info
    
    def slide_tile(self, direction):
        # 空白タイルの位置を見つける
        x, y = np.where(self.board == 0)
        x, y = int(x), int(y)
        
        moved = False  # タイルが動いたかどうかを追跡する変数
        
        if direction == 0 and x < self.size - 1:
            self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
            moved = True
        elif direction == 1 and x > 0:
            self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
            moved = True
        elif direction == 2 and y < self.size - 1:
            self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
            moved = True
        elif direction == 3 and y > 0:
            self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
            moved = True
        
        return moved
    
    def calculate_reward(self):
        # 報酬の計算ロジック
        # 例:正しい位置にあるタイルの数を報酬とする
        correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
        return correct_tiles
    
    def is_solved(self):
        # パズルが解かれたかどうかをチェック
        return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
    
    def display(self):
        # パズルの現在の状態を表示
        print(self.board)
    
    def render(self):
        print(self.board)

episodes = 20000
sync_interval = 20
limit_slide_count = 500

# タイルスライドパズル環境のインスタンス化
env = TileSlidePuzzleEnv()
agent = DQNAgent()

# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time()  # 開始時刻を記録

for episode in range(episodes):
    print('----- ----- episode:', episode, ' ----- -----')
    state = env.reset()
    done = False
    total_reward = 0
    total_loss = 0
    slide_count = 0
    step_count = 0
    
    while not done:
        slide_count += 1
        print('----- ----- episode:', episode, ' ----- -----')
        print('slide _count:', slide_count)
        
        # 行動をランダムで選択
        action = agent.get_action(state)
        
        # 行動後の状態を変数に代入
        next_state, reward, done, info = env.step(action)[:4]
        
        loss = agent.update(state, action, reward, next_state, done)
        
        if loss is not None:
            total_loss += loss.item()  # 損失の合計を更新
        
        state = next_state
        total_reward += reward
        step_count += 1
        
        if done:
            total_reward = 10000
        print('total_reward:', total_reward)
        
        if slide_count >= limit_slide_count:
            done = True
    
    # エピソード終了時にepsilonを更新
    agent.epsilon = max(agent.epsilon_end, agent.epsilon_decay * agent.epsilon)
    
    # エピソード終了時にスケジューラを更新
    agent.update_scheduler()
    
    if episode % sync_interval == 0:
        agent.sync_qnet()
    
    reward_history.append(total_reward)
    loss_history.append(total_loss / step_count if step_count > 0 else 0)
    print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")

end_time = time.time()  # エピソードの終了時刻を記録
episode_duration = end_time - start_time  # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)

# エピソード番号のリストを生成
episodes = list(range(len(reward_history)))

# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_tile_slide_model.pth")

# グラフの作成
plt.figure(figsize=(10, 6))
plt.plot(episodes, reward_history, marker='o')
plt.title('Episode vs Total Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2