この動画で使っているプログラムを、こちらの記事に転記しています。
Google ColabとWindowsで実行できるプログラムを作成しています。
この記事ではWIndowsで実行できるプログラムを記載しています。
Google Colabで実行できるプログラムは、↓↓↓この記事に転記しています。
OpenAI GYM(CartPole)
CartPoleを強化学習するためのプログラムです。
import copy
from collections import deque
import random
import numpy as np
import gym
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
import torch.nn.functional as F
import time # 時間計測のために追加
class ReplayBuffer:
def __init__(self, buffer_size, batch_size):
self.buffer = deque(maxlen=buffer_size)
self.batch_size = batch_size
def add(self, state, action, reward, next_state, done):
# stateがタプルの場合、最初の要素のみを取り出す
if isinstance(state, tuple):
state = state[0]
if isinstance(next_state, tuple):
next_state = next_state[0]
data = (state, action, reward, next_state, done)
self.buffer.append(data)
def __len__(self):
return len(self.buffer)
def get_batch(self):
data = random.sample(self.buffer, self.batch_size)
state, action, reward, next_state, done = zip(*data)
return (torch.tensor(np.array(state), dtype=torch.float32),
torch.tensor(action),
torch.tensor(reward, dtype=torch.float32),
torch.tensor(np.array(next_state), dtype=torch.float32),
torch.tensor(done, dtype=torch.float32))
class QNet(nn.Module):
def __init__(self, action_size):
# super(QNet, self).__init__()
super().__init__()
self.fc1 = nn.Linear(4, 128) # CartPoleの状態は4次元
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class DQNAgent:
def __init__(self):
self.gamma = 0.98
self.lr = 0.0005
self.epsilon = 0.1
self.buffer_size = 10000
self.batch_size = 32
self.action_size = 2
self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
self.qnet = QNet(self.action_size).cuda()
self.qnet_target = QNet(self.action_size).cuda()
self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr)
# self.optimizer.setup(self.qnet)
def sync_qnet(self):
# self.qnet_target = copy.deepcopy(self.qnet)
self.qnet_target.load_state_dict(self.qnet.state_dict())
def get_action(self, state):
print('state:', state)
# stateがNumPy配列の場合、そのまま使用
if isinstance(state, np.ndarray):
state_array = state
# stateがリストの場合、NumPy配列に変換
elif isinstance(state, list):
state_array = np.array(state)
# stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
state_array = state[0]
# それ以外の場合はエラー
else:
raise TypeError(f"Unrecognized state format: {state}")
# state_array = np.array(state) #tuple型をリスト型に変換
state_tensor = torch.from_numpy(state_array).float().unsqueeze(0).cuda() # NumPy配列をTensorに変換
if np.random.rand() < self.epsilon:
return np.random.choice(self.action_size)
else:
qs = self.qnet(state_tensor)
return qs.argmax().item() # .item() を追加してPythonの数値に変換
def update(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)
if len(self.replay_buffer) < self.batch_size:
return
state, action, reward, next_state, done = self.replay_buffer.get_batch()
state = state.cuda()
action = action.cuda()
reward = reward.cuda()
next_state = next_state.cuda()
done = done.cuda()
# ネットワークの出力と損失の計算
qs = self.qnet(state)
q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
next_qs = self.qnet_target(next_state)
next_q = next_qs.max(1)[0]
target = reward + (1 - done) * self.gamma * next_q
loss = F.mse_loss(q, target)
# バックプロパゲーション
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
episodes = 300
sync_interval = 20
env = gym.make('CartPole-v1', render_mode='rgb_array')
agent = DQNAgent()
# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time() # 開始時刻を記録
for episode in range(episodes):
print('episode:', episode)
state = env.reset()
done = False
total_reward = 0
total_loss = 0
step_count = 0
while not done:
# 行動をランダムで選択
action = agent.get_action(state)
# 行動後の状態を変数に代入
next_state, reward, done, info = env.step(action)[:4]
loss = agent.update(state, action, reward, next_state, done)
if loss is not None:
total_loss += loss.item() # 損失の合計を更新
state = next_state
total_reward += reward
step_count += 1
# ゲームの状態を描画
env.render()
if episode % sync_interval == 0:
agent.sync_qnet()
reward_history.append(total_reward)
loss_history.append(total_loss / step_count if step_count > 0 else 0)
print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")
end_time = time.time() # エピソードの終了時刻を記録
episode_duration = end_time - start_time # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)
# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_cartpole_model.pth")
env.close()
OpenAI GYM(CartPole)評価用
CartPoleを評価するためのプログラムです。
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
class QNet(nn.Module):
def __init__(self, action_size):
super().__init__()
self.fc1 = nn.Linear(4, 128) # CartPoleの状態は4次元
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def load_model(model_path):
model = QNet(action_size=2).cuda()
model.load_state_dict(torch.load(model_path))
model.eval()
return model
def evaluate_model(model, num_episodes=100):
env = gym.make('CartPole-v1', render_mode='rgb_array')
total_rewards = []
for _ in range(num_episodes):
state = env.reset()
done = False
total_reward = 0
while not done:
# stateがタプルの場合、最初の要素を取り出す
if isinstance(state, tuple):
state_array = state[0]
else:
state_array = state
state_tensor = torch.from_numpy(state_array).float().unsqueeze(0).cuda()
action = model(state_tensor).max(1)[1].item()
state, reward, done, _ = env.step(action)[:4]
total_reward += reward
env.render()
total_rewards.append(total_reward)
env.close()
return total_rewards
# モデルのロード
model_path = "dqn_cartpole_model.pth"
model = load_model(model_path)
# モデルの評価
rewards = evaluate_model(model, num_episodes=100)
min_reward = min(rewards)
max_reward = max(rewards)
avg_reward = np.mean(rewards)
print(f"Minimum Reward: {min_reward}")
print(f"Maximum Reward: {max_reward}")
print(f"Average Reward: {avg_reward}")
8パズル
8パズルをプレイするためのプログラムです。
class TileSlidePuzzleEnv:
def __init__(self, size=3):
self.size = size
self.board = np.zeros((self.size, self.size), dtype=int)
self.reset()
def reset(self):
# ゴール状態を設定してからランダムに混ぜる
self.board = np.arange(self.size**2).reshape(self.size, self.size)
for _ in range(50): # 例えば50回のランダムな動き
action = np.random.choice(4)
self.slide_tile(action)
return self.board.flatten()
def step(self, action):
moved = self.slide_tile(action)
next_state = self.board.flatten()
reward = self.calculate_reward()
done = self.is_solved()
info = {}
return next_state, reward, done, info
def slide_tile(self, direction):
x, y = np.where(self.board == 0)
x, y = int(x), int(y)
if direction == 0 and x < self.size - 1: # 上にスライド
self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
elif direction == 1 and x > 0: # 下にスライド
self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
elif direction == 2 and y < self.size - 1: # 左にスライド
self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
elif direction == 3 and y > 0: # 右にスライド
self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
return True
def calculate_reward(self):
correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
return correct_tiles
def is_solved(self):
return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
def display(self):
print(self.board)
def user_play(self):
self.reset()
self.display()
while not self.is_solved():
move = input("Move (0:Up, 1:Down, 2:Left, 3:Right): ")
if move.isdigit() and int(move) in [0, 1, 2, 3]:
self.step(int(move))
self.display()
else:
print("Invalid move. Please enter 0, 1, 2, or 3.")
print("Congratulations! Puzzle solved.")
# ゲームの実行
env = TileSlidePuzzleEnv()
env.user_play()
PytorchによるDQN(8パズル)
8パズルを強化学習するためのプログラムです。
import copy
from collections import deque
import random
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import torch
import torch.nn as nn
import torch.optim as optimizers
import torch.nn.functional as F
import time # 時間計測のために追加
# GPUが利用可能か確認し、利用可能なら使用する
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
class ReplayBuffer:
def __init__(self, buffer_size, batch_size):
self.buffer = deque(maxlen=buffer_size)
self.batch_size = batch_size
def add(self, state, action, reward, next_state, done):
# stateがタプルの場合、最初の要素のみを取り出す
if isinstance(state, tuple):
state = state[0]
if isinstance(next_state, tuple):
next_state = next_state[0]
data = (state, action, reward, next_state, done)
self.buffer.append(data)
def __len__(self):
return len(self.buffer)
def get_batch(self):
data = random.sample(self.buffer, self.batch_size)
state, action, reward, next_state, done = zip(*data)
return (torch.tensor(np.array(state), dtype=torch.float32),
torch.tensor(action),
torch.tensor(reward, dtype=torch.float32),
torch.tensor(np.array(next_state), dtype=torch.float32),
torch.tensor(done, dtype=torch.float32))
class QNet(nn.Module):
def __init__(self, action_size):
super().__init__()
self.fc1 = nn.Linear(9, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 128)
self.fc4 = nn.Linear(128, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
x = self.fc4(x)
return x
class DQNAgent:
def __init__(self):
self.gamma = 0.98
self.lr = 0.0005
self.epsilon = 0.2 # 0.2にすると10回に2回の確率でランダムに実行する。
self.buffer_size = 10000
self.batch_size = 32
self.action_size = 4 # 'up', 'down', 'left', 'right'
self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)
self.qnet = QNet(self.action_size).to(device)
self.qnet_target = QNet(self.action_size).to(device)
self.optimizer = optimizers.Adam(self.qnet.parameters(), self.lr)
def sync_qnet(self):
self.qnet_target.load_state_dict(self.qnet.state_dict())
def get_action(self, state):
print('state:', state)
# stateがNumPy配列の場合、そのまま使用
if isinstance(state, np.ndarray):
state_array = state
# stateがリストの場合、NumPy配列に変換
elif isinstance(state, list):
state_array = np.array(state)
# stateがタプル形式の場合(NumPy配列と辞書)、最初の要素を使用
elif isinstance(state, tuple) and isinstance(state[0], np.ndarray):
state_array = state[0]
# それ以外の場合はエラー
else:
raise TypeError(f"Unrecognized state format: {state}")
# state_array = np.array(state) #tuple型をリスト型に変換
state_tensor = torch.from_numpy(state_array.flatten()).float().unsqueeze(0).to(device) # NumPy配列をTensorに変換
if np.random.rand() < self.epsilon:
print('random mode')
return np.random.choice(self.action_size)
else:
print('qnet mode')
qs = self.qnet(state_tensor)
return qs.argmax().item() # .item() を追加してPythonの数値に変換
def update(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)
if len(self.replay_buffer) < self.batch_size:
return
state, action, reward, next_state, done = self.replay_buffer.get_batch()
state = state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
action = action.to(device)
reward = reward.to(device)
next_state = next_state.view(self.batch_size, -1).to(device) # バッチ処理のために、状態をフラット化
done = done.to(device)
# ネットワークの出力と損失の計算
qs = self.qnet(state)
q = qs.gather(1, action.unsqueeze(1)).squeeze(1)
next_qs = self.qnet_target(next_state)
next_q = next_qs.max(1)[0]
target = reward + (1 - done) * self.gamma * next_q
loss = F.mse_loss(q, target)
# バックプロパゲーション
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss # 損失の値を返す
class TileSlidePuzzleEnv:
def __init__(self, size=3):
self.size = size
self.board = np.zeros((self.size, self.size), dtype=int)
self.reset()
def reset(self):
# nums = np.arange(1, self.size**2)
# np.random.shuffle(nums)
# self.board = np.insert(nums, 0, 0).reshape(self.size, self.size)
# return self.board.flatten()
# ゴール状態を設定
self.board = np.arange(self.size**2).reshape(self.size, self.size)
# 一定回数のランダムな動きでタイルを混ぜる
for _ in range(50): # 例えば50回のランダムな動き
action = np.random.choice(4)
self.slide_tile(action)
return self.board.flatten()
def step(self, action):
# タイルをスライドさせるロジックを実装
# actionはタイルを移動する方向(例: 0=上, 1=下, 2=左, 3=右)
print('action:', action)
moved = self.slide_tile(action)
self.display()
# 次の状態(フラット化されたボード)
next_state = self.board.flatten()
# 報酬の計算(例:正しい位置にあるタイルの数)
reward = self.calculate_reward()
print('reward:', reward)
# ゲーム終了条件のチェック
done = self.is_solved()
# 追加情報(空の辞書を返す)
info = {}
# next_state, reward, done, infoを返す
return next_state, reward, done, info
def slide_tile(self, direction):
# 空白タイルの位置を見つける
x, y = np.where(self.board == 0)
x, y = int(x), int(y)
if direction == 0 and x < self.size - 1:
self.board[x, y], self.board[x+1, y] = self.board[x+1, y], self.board[x, y]
elif direction == 1 and x > 0:
self.board[x, y], self.board[x-1, y] = self.board[x-1, y], self.board[x, y]
elif direction == 2 and y < self.size - 1:
self.board[x, y], self.board[x, y+1] = self.board[x, y+1], self.board[x, y]
elif direction == 3 and y > 0:
self.board[x, y], self.board[x, y-1] = self.board[x, y-1], self.board[x, y]
def calculate_reward(self):
# 報酬の計算ロジック
# 例:正しい位置にあるタイルの数を報酬とする
correct_tiles = np.sum(self.board == np.arange(self.size**2).reshape(self.size, self.size))
return correct_tiles
def is_solved(self):
# パズルが解かれたかどうかをチェック
return np.array_equal(self.board, np.arange(self.size**2).reshape(self.size, self.size))
def display(self):
# パズルの現在の状態を表示
print(self.board)
def render(self):
print(self.board)
episodes = 10000
sync_interval = 20
limit_slide_count = 200
# タイルスライドパズル環境のインスタンス化
env = TileSlidePuzzleEnv()
agent = DQNAgent()
# 各エピソードの報酬を記録するリスト
reward_history = []
loss_history = []
start_time = time.time() # 開始時刻を記録
for episode in range(episodes):
print('----- ----- episode:', episode, ' ----- -----')
state = env.reset()
done = False
total_reward = 0
total_loss = 0
slide_count = 0
step_count = 0
while not done:
slide_count += 1
print('----- ----- episode:', episode, ' ----- -----')
print('slide _count:', slide_count)
# 行動をランダムで選択
action = agent.get_action(state)
# 行動後の状態を変数に代入
next_state, reward, done, info = env.step(action)[:4]
loss = agent.update(state, action, reward, next_state, done)
if loss is not None:
total_loss += loss.item() # 損失の合計を更新
state = next_state
total_reward += reward
step_count += 1
if done:
total_reward = 10000
print('total_reward:', total_reward)
if slide_count >= limit_slide_count:
done = True
if episode % sync_interval == 0:
agent.sync_qnet()
reward_history.append(total_reward)
loss_history.append(total_loss / step_count if step_count > 0 else 0)
print(f"Episode {episode}: Total Reward = {total_reward}, Average Loss = {loss_history[-1]}")
end_time = time.time() # エピソードの終了時刻を記録
episode_duration = end_time - start_time # エピソードの実行時間を計算
print(f"Duration = {episode_duration:.2f} seconds")
print(reward_history)
# エピソード番号のリストを生成
episodes = list(range(len(reward_history)))
# チェックポイントの保存
torch.save(agent.qnet.state_dict(), "dqn_tile_slide_model.pth")
# グラフの作成
plt.figure(figsize=(10, 6))
plt.plot(episodes, reward_history, marker='o')
plt.title('Episode vs Total Reward')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()