記事の概要
m1 mac miniを用いて、深層強化学習を行うための手順をまとめます。
ただ、ある程度大きいニューラルネットワークの場合は、mpsで処理をするよりも、CPUで処理した方が処理が早くなるということが分かったので、メモリの小さいmacの場合はオススメはしません。(メモリ8Gでディープラーニングは流石に厳しい。。。)
環境
OS:macOS Ventura 13.2.1
プロセッサー:M1
memory:8G
python:3.9.12
環境構築
以下のコマンドでpython仮想環境を作成する。
python -m venv myenv
python仮想環境をactivateする。
source myenv/bin/activate
pytorch等のライブラリをインストールする。
pip install torch torchvision torchaudio
pip install matplotlib
pip install IPython
環境確認
以下のプログラムを実行し、「Using device: mps」と表示されることを確認する。
import torch
# GPUが利用可能かどうかをチェック
is_cuda_available = torch.cuda.is_available()
# Apple Siliconの場合、mpsデバイスが利用可能かどうかをチェック
is_mps_available = torch.backends.mps.is_available()
print(f"CUDA is available: {is_cuda_available}")
print(f"MPS is available: {is_mps_available}")
# 利用可能なデバイスを表示(mpsまたはcpu)
device = torch.device("mps" if is_mps_available else "cpu")
print(f"Using device: {device}")
深層強化学習用のプログラムを実行する。
以下の記事プログラムを流用し、m1 macで深層強化学習を行えるように修正したプログラムを実行する。
(Windowsからは「★この行のみを修正」のみを修正して実行可能)
import copy
from collections import deque
import random
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
from torch.optim.lr_scheduler import StepLR
import torch.nn.functional as F
import time # 時間計測のために追加
# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu") # ★この行のみを修正
print(f"Using device: {device}")
class ReplayBuffer:
def __init__(self, buffer_size, batch_size):
self.buffer = deque(maxlen=buffer_size)
self.batch_size = batch_size
def add(self, state, action, reward, next_state, done):
# stateがタプルの場合、最初の要素のみを取り出す
if isinstance(state, tuple):
state = state[0]
if isinstance(next_state, tuple):
next_state = next_state[0]
data = (state, action, reward, next_state, done)
self.buffer.append(data)
def __len__(self):
return len(self.buffer)
def get_batch(self):
data = random.sample(self.buffer, self.batch_size)
state, action, reward, next_state, done = zip(*data)
return (torch.tensor(np.array(state), dtype=torch.float32),
torch.tensor(action),
torch.tensor(reward, dtype=torch.float32),
torch.tensor(np.array(next_state), dtype=torch.float32),
torch.tensor(done, dtype=torch.float32))
class QNet(nn.Module):
def __init__(self, action_size):
super().__init__()
self.fc1 = nn.Linear(9, 512)
self.dropout1 = nn.Dropout(p=0.2)
self.fc2 = nn.Linear(512, 512)
self.dropout2 = nn.Dropout(p=0.2)
self.fc3 = nn.Linear(512, 256)
self.dropout3 = nn.Dropout(p=0.2)
self.fc4 = nn.Linear(256, 128)
self.dropout4 = nn.Dropout(p=0.2)
self.fc5 = nn.Linear(128, 64)
self.dropout5 = nn.Dropout(p=0.2)
self.fc6 = nn.Linear(64, action_size)
def forward(self, x):
x = F.leaky_relu(self.fc1(x), 0.01)
x = self.dropout1(x)
x = F.leaky_relu(self.fc2(x), 0.01)
x = self.dropout2(x)
x = F.leaky_relu(self.fc3(x), 0.01)
x = self.dropout3(x)
x = F.leaky_relu(self.fc4(x), 0.01)
x = self.dropout4(x)
x = F.leaky_relu(self.fc5(x), 0.01)
x = self.dropout5(x)
x = self.fc6(x)
return x
class DQNAgent:
def __init__(self):
self.gamma = 0.98
self.lr = 0.0005
self.epsilon_start = 1 # 初期値
self.epsilon_end = 0.01 # 最小値
self.epsilon_decay = 0.9995 # 減衰率
self.epsilon = self.epsilon_start # 現在のepsilon値
self.buffer_size = 10000
self.batch_size = 32
self.action_size = 4 # 'up', 'down', 'left', 'right'
self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
self.qnet = QNet(self.action_size).to(device)
self.qnet_target = QNet(self.action_size).to(device)
self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr, weight_decay=1e-5)
self.scheduler = StepLR(self.optimizer, step_size=100, gamma=self.gamma) # 100ステップごとに学習率を0.98倍にする
def sync_qnet(self):
self.qnet_target.load_state_dict(self.qnet.state_dict())
def get_action(self, state):
print('state:', state)
# stateがNumPy配列の場合、そのまま使用
if isinstance(state, np.ndarray):
state_array = state
# stateがリストの場合、NumPy配列に変換
elif isinstance(state, list):
state_array = np.array(state)
# stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
state_array = state[0]
# それ以外の場合はエラー
else:
raise TypeError(f"Unrecognized state format: {state}")
# state_array = np.array(state) #tuple型をリスト型に変換
state_tensor = torch.from_numpy(state_array.flatten()).float().unsqueeze(0).to(device) # NumPy配列をTensorに変換
if np.random.rand() < self.epsilon:
print('random mode')
return np.random.choice(self.action_size)
else:
print('qnet mode')
qs = self.qnet(state_tensor)
return qs.argmax().item() # .item() を追加してPythonの数値に変換
def update(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)
if len(self.replay_buffer) < self.batch_size:
return None # バッチサイズに達していない場合はNoneを返す
state, action, reward, next_state, done = self.replay_buffer.get_batch()
state = state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
action = action.to(device)
reward = reward.to(device)
next_state = next_state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
done = done.to(device)
# ネットワークの出力と損失の計算
qs = self.qnet(state)
q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
next_qs = self.qnet_target(next_state)
next_q = next_qs.max(1)[0]
target = reward + (1 - done) * self.gamma * next_q
loss = F.mse_loss(q, target)
# バックプロパゲーション
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss # 損失の値を返す
# 学習率のスケジューラを更新する関数
def update_scheduler(self):
self.scheduler.step()
class TileSlidePuzzleEnv:
def __init__(self, size=3):
self.size = size
self.board = np.zeros((self.size, self.size), dtype=int)
self.last_state = None # 前回の状態を保存する変数
self.step_count = 0 # ステップカウンターを追加
self.reset()
def reset(self):
# ゴール状態を設定
self.board = np.arange(self.size**2).reshape(self.size, self.size)
self.last_state = None # 前回の状態をリセット
self.step_count = 0 # ステップカウンターをリセット
# 一定回数のランダムな動きでタイルを混ぜる
for _ in range(50): # 例えば50回のランダムな動き
action = np.random.choice(4)
self.slide_tile(action)
return self.board.flatten()
def step(self, action):
# タイルをスライドさせるロジックを実装
# actionはタイルを移動する方向(例: 0=上, 1=下, 2=左, 3=右)
print('action:', action)
# ステップカウントを更新
self.step_count += 1
# 現在の状態を保存
current_state = self.board.flatten()
moved = self.slide_tile(action)
self.display()
# 次の状態(フラット化されたボード)
next_state = self.board.flatten()
# 報酬の計算
if not moved:
reward = -1
elif self.last_state is not None and np.array_equal(self.last_state, next_state):
reward = -1 # 2回前と同じ動きの場合
elif self.step_count > 100:
reward = -0.1 # 100ステップを超えた場合
else:
reward = self.calculate_reward() * 0.1
# ゲーム終了条件のチェック
done = self.is_solved()
if done:
reward = 10
print('reward:', reward)
# 状態の更新
self.last_state = current_state
# 追加情報(空の辞書を返す)
info = {}
# next_state, reward, done, infoを返す
return next_state, reward, done, info
def slide_tile(self, direction):
# 空白タイルの位置を見つける
x, y = np.where(self.board == 0)
x, y = int(x), int(y)
moved = False # タイルが動いたかどうかを追跡する変数
if direction == 0 and x < self.size - 1:
self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
moved = True
elif direction == 1 and x > 0:
self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
moved = True
elif direction == 2 and y < self.size - 1:
self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
moved = True
elif direction == 3 and y > 0:
self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
moved = True
return moved
def calculate_reward(self):
# 報酬の計算ロジック
# 例:正しい位置にあるタイルの数を報酬とする
correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
return correct_tiles
def is_solved(self):
# パズルが解かれたかどうかをチェック
return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
def display(self):
# パズルの現在の状態を表示
print(self.board)
def render(self):
print(self.board)
episodes = 20000
sync_interval = 20
limit_slide_count = 500
# タイルスライドパズル環境のインスタンス化
env = TileSlidePuzzleEnv()
agent = DQNAgent()
# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time() # 開始時刻を記録
for episode in range(episodes):
print('----- ----- episode:', episode, ' ----- -----')
state = env.reset()
done = False
total_reward = 0
total_loss = 0
slide_count = 0
step_count = 0
while not done:
slide_count += 1
print('----- ----- episode:', episode, ' ----- -----')
print('slide _count:', slide_count)
# 行動をランダムで選択
action = agent.get_action(state)
# 行動後の状態を変数に代入
next_state, reward, done, info = env.step(action)[:4]
loss = agent.update(state, action, reward, next_state, done)
if loss is not None:
total_loss += loss.item() # 損失の合計を更新
state = next_state
total_reward += reward
step_count += 1
if done:
total_reward = 10000
print('total_reward:', total_reward)
if slide_count >= limit_slide_count:
done = True
# エピソード終了時にepsilonを更新
agent.epsilon = max(agent.epsilon_end, agent.epsilon_decay * agent.epsilon)
# エピソード終了時にスケジューラを更新
agent.update_scheduler()
if episode % sync_interval == 0:
agent.sync_qnet()
reward_history.append(total_reward)
loss_history.append(total_loss / step_count if step_count > 0 else 0)
print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")
end_time = time.time() # エピソードの終了時刻を記録
episode_duration = end_time - start_time # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)
# エピソード番号のリストを生成
episodes = list(range(len(reward_history)))
# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_tile_slide_model.pth")
# グラフの作成
plt.figure(figsize=(10, 6))
plt.plot(episodes, reward_history, marker='o')
plt.title('Episode vs Total Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()