こんにちは。今日は使えると便利な深層学習のハイパラメータチューニングである「Ray Tune」の使い方をまとめていきます。
基本的にRayTuneの公式ドキュメントからコードや説明文を引用させていただきますが、後半では、実際にRNNモデルをあてはめてみるとどうなるかを考えてみようと思います。
Ray Tuneの構造
以下のような流れで結果が返されます。
-
探索範囲:Search Spaces
チューニングしたいハイパラメータを「探索範囲(Search Spaces)」として定義 -
目的関数:Trainables
チューニングしたい関数を定義 -
探索アルゴリズム:Search Algorithms
パラメータを効果的に最適化する「探索アルゴリズム(Search Algorithms)」を選択 -
スケジューラ:Schedulers
(必要に応じて)探索を打ち切り試行スピードを早めるための「スケジューラ(Scheduler)」を使う -
試行:Trials
「関数(Trainable)」、「検索アルゴリズム(Search Algorithms)」、および「スケジューラ(Scheduler)」を「チューナー(Tuner)」に渡し、実験を実行し、「試行(Trial)」を作成 -
結果取得:Analysis
「チューナー(Tuner)」は、実験結果を確認するための「結果表(ResultGrid)」を返す
RayTuneの構成要素は上記のとおりですが、実際にどのようなコードで動いていくのかを深掘りしていきます!
Ray Tuneの流れ
1. 探索範囲:Search Spaces
まずは、ハイパラメータの探索範囲を定義します。各試行において、探索範囲からランダムにパラメータの組み合わせをサンプルしていき、試行した中で最もパフォーマンスが優れたものを見つけ出します。
以下はサンプリング方法を指定する関数の例です。
config = {
"uniform": tune.uniform(-5, -1), # -5から-1の間の一様分布に従う浮動小数点数
"quniform": tune.quniform(3.2, 5.4, 0.2), # 0.2の倍数に丸める
"loguniform": tune.loguniform(1e-4, 1e-1), # 対数空間で一様分布に従う浮動小数点数
"qloguniform": tune.qloguniform(1e-4, 1e-1, 5e-5), # 0.00005の倍数に丸める
"randn": tune.randn(10, 2), # 平均10、標準偏差2の正規分布
"qrandn": tune.qrandn(10, 2, 0.2), # 0.2の倍数に丸める
"randint": tune.randint(-9, 15), # -9から15の間のランダムな整数
"qrandint": tune.qrandint(-21, 12, 3), # 3の倍数に丸める(12を含む)
"lograndint": tune.lograndint(1, 10), # 対数空間でランダムな整数
"qlograndint": tune.qlograndint(1, 10, 2), # 2の倍数に丸める
"choice": tune.choice(["a", "b", "c"]), # これらの選択肢の中からランダムに1つ選ぶ
"func": tune.sample_from(
lambda spec: spec.config.uniform * 0.01
), # 他の値に依存する
"grid": tune.grid_search([32, 64, 128]), # これらの値すべてを試す
}
ポイント
-
config
とは辞書。そのため、["キー"]を指定して対応する”値”を取り出すことができる -
tune.sample_from
:探索空間内で動的に値を生成する -
spec.config.uniform
:spec
は現在の試行のパラメータ設定を格納したオブジェクトであり、uniform
は別のキーに設定された値を参照している(例:uniform
= 3 の場合、func
= 3 * 0.01 = 0.03 になる)
実際には以下のように使います。
config = {
"l1": tune.sample_from(lambda _: 2**np.random.randint(2, 9)), # 2から8のランダムな整数を2のべき乗として計算
"l2": tune.sample_from(lambda _: 2**np.random.randint(2, 9)),
"lr": tune.loguniform(1e-4, 1e-1), # 学習率を対数スケールの一様分布に従ってサンプリング
"batch_size": tune.choice([2, 4, 8, 16]) # ランダムに1つ選ぶ
}
2. 目的関数:Trainables
Trainable
は日本語で直感的に理解しにくいですが、Tuneの実行に渡すことができるオブジェクトです。
定義の仕方はFunction APIとClass APIがありますが、前者が一般的に推奨されています。
from ray import train
def objective(x, a, b): # 目的関数を定義する
return a * (x ** 0.5) + b
def trainable(config): # Trainable関数に"config"辞書を渡す
for x in range(20): # 20回の反復でスコアを計算する
score = objective(x, config["a"], config["b"])
session.report({"score": score}) # スコアをTuneに送信する
ポイント
-
session.report
:ハイパーパラメータチューニング時の試行(trial)ごとの進捗や結果をRay Tuneに報告。これにより、スケジューラや結果の分析で利用できる
3. 探索アルゴリズム:Search Algorithms
- トレーニングプロセスのハイパーパラメータを最適化するために、探索アルゴリズム(Search Algorithm) を使用してハイパーパラメータを構成します。探索アルゴリズムを指定しない場合、Tuneはデフォルトでランダムサーチを使用することになります
- 例えば、
bayesian-optimization
パッケージを使用したシンプルなベイズ最適化をTuneで利用するには、BayesOptSearch
を使用してアルゴリズムを定義し、search_alg
引数をtune.TuneConfig
に渡すという流れになります -
tune
の書き方は5.試行:Trialsに別途記載します。
from ray.tune.search.bayesopt import BayesOptSearch
from ray import train
## ベイズ最適化アルゴリズムを定義
algo = BayesOptSearch(random_search_steps=4)
tuner = tune.Tuner(
trainable,
tune_config=tune.TuneConfig(
~
search_alg=algo, # 【ここ!】探索アルゴリズムを設定
~
),
run_config=train.RunConfig(
~
),
param_space=search_space, # 探索範囲
)
result_grid = tuner.fit()
4. スケジューラ:Schedulers
- スケジューラを指定しない場合、
Tune
はデフォルトでFIFO(First-In-First-Out)スケジューラを使用することになります(検索アルゴリズムによって選択された順に試行を実行し、早期停止を行わないもの) - スケジューラは実行中の試行を停止、一時停止、またはハイパーパラメータを調整することで、ハイパーパラメータチューニングを大幅に高速化することができます。ただし、探索アルゴリズムとは異なり、スケジューラは評価するハイパーパラメータの構成を選択することはできません
from ray.tune.schedulers import HyperBandScheduler
# HyperBandスケジューラを作成し、スコアを最大化する
hyperband = HyperBandScheduler(metric="score", mode="max")
tuner = tune.Tuner(
trainable,
tune_config=tune.TuneConfig(
~
scheduler=hyperband, # 【ここ!】スケジューラの設定
~
),
param_space=search_space, # 探索範囲
)
result_grid = tuner.fit()
ポイント
- 一番簡単なスケジューラは
ASHAScheduler
というもの。パフォーマンスの低い試行を積極的に終了させる(リソースの効率的利用)。また、試行ごとに独立して評価されるため、リソースが空くとすぐに新しい試行を開始できる(非同期処理) - スケジューラを使用する際、以下のような互換性の問題が発生する可能性がある
- 一部のスケジューラは特定の検索アルゴリズムと併用できない
- 一部のスケジューラでは、チェックポイント機能を実装する必要がある
5. 試行:Trials
tune.TuneConfig
の設定
## TuneConfigを設定
tune_config = tune.TuneConfig(
metric="loss",
mode="min",
num_samples=10, # 試行回数
search_alg=algo
)
-
探索アルゴリズムや試行数、チューニングのモード(最小化または最大化)などを指定
-
metric
:最適化の対象となる指標を指定する(例: "loss" や "accuracy") -
mode
:指標を「最小化」または「最大化」するかを指定(例: "min" または "max") -
num_samples
:試行(trial)の数を指定。デフォルトは 1 -
search_alg
:探索アルゴリズム(例: BayesOptSearch, OptunaSearch) -
time_budget_s
:実験に費やす最大時間(秒単位)
-
RunConfig
の設定
from ray.train import RunConfig
## RunConfigを設定
run_config = RunConfig(
stop={"training_iteration": 10}, # 10イテレーションで停止用
local_dir="./ray_results", # 結果を保存するディレクトリ
verbose=1, # 実行中のログ詳細度
)
- リソースの割り当てやトレーニングの終了条件など、実行時の詳細設定を行う部分
-
stop
:トレーニングを停止する条件を辞書形式で指定
例: {"training_iteration": 10}(10回のイテレーションで停止) -
local_dir
:結果を保存するディレクトリを指定 -
verbose
:実行中のログの詳細度を指定(例: 0, 1, 2)
-
上記を合わせると...
tuner = tune.Tuner(
trainable=trainable,
tune_config=tune.TuneConfig(
metric="test_loss",
mode="min",
num_samples=30,
search_alg=algo,
),
run_config=RunConfig(
stop={"training_iteration": 20},
local_dir="./ray_results",
verbose=1,
),
param_space=config,
)
result_grid = tuner.fit()
6. 結果取得:Analysis
- 実行直後であれば、
result_grid
オブジェクトをメモリ内で保持しており、結果を分析するのは簡単です。ただし、別のタイミングで結果を分析したい場合は、保存された実験結果からresult_grid
を再取得する必要があります - この場合、保存された
Tuner
の復元を行い、実験ディレクトリを指定して結果を読み込むことができます(実験ディレクトリのパスは、~/ray_results/{exp_name} のような形式) - なお、
RunConfig
で実験名(exp_name)を指定しない場合、自動生成されることになり、この場合、実験名はログに記録されています
result_grid = tuner.fit()
# 最良の結果を取得
best_result = result_grid.get_best_result() # 最良の結果オブジェクトを取得
best_config = best_result.config # 最良の試行のハイパーパラメータ
best_logdir = best_result.path # 最良の試行の結果が保存されているディレクトリ
best_checkpoint = best_result.checkpoint # 最良の試行のチェックポイント
best_metrics = best_result.metrics # 最良の試行の最終結果
best_result_df = best_result.metrics_dataframe # 最良の結果をpandasのデータフレームとして取得
results_df = results.get_dataframe() # 各試行の最終結果を含むpandasのデータフレームとして取得
maxscore_df = results.get_dataframe(filter_metric="score", filter_mode="max") # 特定のスコアやモードに基づいた結果をデータフレームとして取得
実験名を指定するのが簡単
実際は、実験名を指定して管理するのが簡単だと思います。
# 最初にディレクトリを作成する
experiment_path = os.path.join(storage_path, exp_name)
os.makedirs(experiment_path, exist_ok=True) # すでにディレクトリが存在している場合はスキップ
ポイント
-
experiment_path
の指定
実験結果が保存されているディレクトリを指定する。このディレクトリには、各試行の結果やログが格納されている- 設定例:
exp_name = "RNN_tune_XXXXXX"
- 設定例:
storage_path = "./ray_results"
、もしcolab環境で実行していてGoogledrive上に保存したいならstorage_path = "/content/drive/My Drive/ray_results"
- 設定例:
# Tunerの設定
run_config=tune.RunConfig(
local_dir=experiment_path, # 結果保存先を指定
)
# 保存されたTunerを復元
restored_tuner = tune.Tuner.restore(experiment_path, trainable=trainable)
# 結果を取得
result_grid = restored_tuner.get_results()
ポイント
-
Tuner
の復元
tune.Tuner.restore()
を使って、指定した実験ディレクトリからTunerを復元する
復元時には、最初の実験で使用したtrainable関数
を指定 - 結果の取得
復元したTuner
からresult_grid
を取得する。このオブジェクトを使って、結果の分析を行う
RNNモデルでRayTuneを使ってみる
今までは簡単なコードで例を示してきましたが、今からは実際のデータでRNNモデルを構築するときにRayTuneをどのように組み込んでいけばよいかを考えていきます。
事前設定
チューニングするしないに関わらずこの部分は共通です。
ランダムシードの設定
import random
# 乱数シード設定
def set_seed(seed):
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
データローダーを作成する関数の定義
今回は、訓練データとテストデータあり、それぞれ目的変数と説明変数に分けられているとします。
def create_dataloaders(batch_size):
train_dataset = TensorDataset(train_sequences, train_labels)
test_dataset = TensorDataset(test_sequences, test_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
return train_loader, test_loader
ポイント
-
TensorDataset
:データとそれに対応するラベルを保存 -
Dataloader
:データセットからバッチごとにデータを取り出してモデルに供給する-
shuffle=True
:ランダムに抽出
-
モデルの定義
PyTorchにはRNNモジュールが含まれており、自分で構築する必要がありません。
class RNNModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers, dropout=0.0):
super(RNNModel, self).__init__()
self.rnn = nn.RNN(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, _ = self.rnn(x) # 各時刻での隠れ層の出力(batch_size, seq_length, hidden_size)と最後の時刻の隠れ状態(使用しない)を出力
out = self.fc(out[:, -1, :]) # 最後の時間ステップを出力
return out
ポイント
-
super()
:nnモジュール(親クラス)を継承
RayTuneを組み込む
- 通常はこのあと、例:
model = RNNModel(input_size, hidden_size, output_size, num_layers, dropout)
でクラスのインスタンス化を行い、各パラメータの値を定義します - RayTuneではこの各値を幅を持たせて設定することで、探索してくれるようにします
- なお、今回のチューニングはColabのGPUを使いました、その際、どのGPUが使われているかをチェックするコードはこちら
import torch
print("GPU Available:", torch.cuda.is_available())
print("GPU Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")
ライブラリのインポート
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR
from ray.tune.schedulers import ASHAScheduler
import optuna
from ray.tune.search.optuna import OptunaSearch
from ray.air import RunConfig, session
from ray.train import ScalingConfig
from ray.tune import ExperimentAnalysis
import os
from sklearn.metrics import mean_squared_error, r2_score
探索範囲
# ハイパーパラメータ探索範囲
config = {
"hidden_size": tune.choice([64, 128, 256]),
"learning_rate": tune.loguniform(1e-4, 1e-2),
"dropout": tune.uniform(0.0, 0.5),
"batch_size": tune.choice([64, 128]),
"num_epochs": 30
}
最適化アルゴリズム
今回はOptunaSearchを使用してみます。
# Optuna定義
algo = OptunaSearch(metric="test_loss", mode="min")
スケジューラ
# 試行スケジューラ定義
ray_scheduler = ASHAScheduler(
max_t=30,
grace_period=5,
reduction_factor=2
)
ポイント
-
max_t
:試行ごとの最大エポック数 ※ 引数time_attr
のデフォルトはtraining_iteration
が使われ、エポック数が基準に進捗が評価される -
grace_period
:最低エポック数 -
reduction_facter
:試行の削減率を設定(2の場合、次のエポックではパフォーマンスの低い試行が半分ずつ停止され、4の場合、試行数が4分の1になる)
モデルを呼び出す
def train_rnn_model(config):
set_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu" # GPUが利用可能なら使用
train_loader, test_loader = create_dataloaders(config["batch_size"])
model = RNNModel(
input_size=6, # 特徴量数に合わせる
hidden_size=config["hidden_size"],
output_size=1, # 出力サイズ
num_layers=1, # ここを変える
dropout=config["dropout"]
).to(device) # モデルをGPUに送る
ポイント
-
.to(device)
:GPU上に配置しておく -
create_dataloaders
:事前に作成したデータローダーへ変換する関数をしよう
評価指標などを定義
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config["learning_rate"])
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)
訓練モード
for epoch in range(config["num_epochs"]):
model.train()
train_loss = 0.0
train_predictions, train_targets = [], []
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device) # 入力データをGPUに送る
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_predictions.append(outputs.cpu()) # CPUに戻す
train_targets.append(batch_y.cpu())
train_loss /= len(train_loader)
train_predictions = torch.cat(train_predictions).detach().numpy() # 勾配追跡を無効化
train_targets = torch.cat(train_targets).numpy()
train_rmse = np.sqrt(mean_squared_error(train_targets, train_predictions))
ポイント
-
データをCPUに送る
- batch_x(入力データ)と batch_y(対応するラベル)を .to(device) でGPUに送る
-
モデルを実行
-
model(batch_x)
: -
criterion(outputs, batch_y)
:モデルの出力(outputs)とターゲット(batch_y)を損失関数(criterion)に渡して損失を計算
-
-
損失のバックプロパゲーション
-
loss.backward()
:損失に基づいてモデルの各パラメータの勾配を計算 -
optimizer.step()
:勾配を基にモデルのパラメータを更新
-
-
CPUに戻す
-
.cpu()
:GPUにあるテンソル(outputs や batch_y)をCPU上に移動 - CPU上に戻すことで、後続の処理(例: 評価指標の計算や保存)が可能
-
-
結果の後処理
-
torch.cat
:ミニバッチごとに保存した出力やターゲットを1つのテンソルに結合 -
.detach()
:勾配計算を無効化(メモリ節約のため) -
.numpy()
:PyTorchのテンソルをNumPy配列に変換
-
検証モード
model.eval()
test_loss = 0.0
test_predictions, test_targets = [], []
with torch.no_grad():
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
test_loss += loss.item()
test_predictions.append(outputs.cpu()) # CPUに戻す
test_targets.append(batch_y.cpu())
test_loss /= len(test_loader)
test_predictions = torch.cat(test_predictions).numpy()
test_targets = torch.cat(test_targets).numpy()
test_rmse = np.sqrt(mean_squared_error(test_targets, test_predictions))
test_r2 = r2_score(test_targets, test_predictions)
ポイント
- 訓練モードとの違いは損失のバックプロパゲーションがないこと
- テストデータでr2乗決定係数を算出
- エポックの最後に学習率スケジューラを更新
Lossの最小値でベストモデルを更新
if test_loss < best_test_loss:
best_test_loss = test_loss
best_epoch = epoch
best_model_state = model.state_dict()
ポイント
-
best_model_state = model.state_dict()
:ベストモデルの重みを保存
結果を返す
# 結果を報告
session.report({"train_loss": train_loss, "train_rmse": train_rmse,
"test_loss": test_loss, "test_rmse": test_rmse,
"test_r2": test_r2,
"epoch": epoch, "best_test_loss": best_test_loss, "best_epoch":best_epoch})
リソース設定
scaling_config = ScalingConfig(
num_workers=1, # 試行ごとに使用するワーカー数
use_gpu=True, # GPUを使用
trainer_resources={"CPU": 1}, # トレーナーが使用するリソース
)
- Colabでは1つのGPUしか提供されないため、num_workers=2のように指定しても、GPUリソースが共有されるだけで並列に試行を行うことはできない(1つの試行がGPUを専有する形で実行する方が効率的)
- ワーカーがモデルをトレーニングしている間、トレーナーはデータの分割、タスクの監視、結果の集約などを行う → トレーナーに一つのCPUを割り当てる
試行
tuner = tune.Tuner(
trainable=train_rnn_model,
tune_config=tune.TuneConfig(
metric="test_loss",
mode="min",
num_samples=30,
search_alg=algo,
),
run_config=RunConfig(
storage_path=experiment_path,
verbose=1,
),
param_space=config,
)
result_grid = tuner.fit()
結果を取得
先ほど記載したresult_grid.get_best_result()
ではなく、今回はExperimentAnalysis
クラスを使用して取得してみます。
experiment_file = "/content/drive/My Drive/ray_results/RNN_tune_1/train_gru_model_2025-XX-XX_XX-XX-XX/experiment_state-2025-XX-XX_XX-XX-XX.json" # 例
analysis = ExperimentAnalysis(experiment_file)
# 最良試行のデータフレームを取得
best_trial = analysis.get_best_trial(metric="test_loss", mode="min")
best_trial_df = analysis.trial_dataframes[best_trial.trial_id]
# test_loss が最小のエポックを取得
best_epoch_idx = best_trial_df["test_loss"].idxmin()
best_epoch = best_trial_df.loc[best_epoch_idx]
# 同じエポックの test_rmse, test_r2 を取得
best_epoch_loss = best_epoch["test_loss"]
best_epoch_rmse = best_epoch["test_rmse"]
best_epoch_r2 = best_epoch["test_r2"]
# 結果を表示
print(f"Best Epoch: {best_epoch['epoch']}")
print(f"Best Test Loss: {best_epoch_loss}")
print(f"Best Test RMSE: {best_epoch_rmse}")
print(f"Best Test R²: {best_epoch_r2}")
ポイント
-
result_grid.get_best_result()
- 最良試行(best trial)のみ結果を取得し、実験終了直後にすぐ利用可能だが、他の試行の情報には直接アクセスできない
- ハイパーパラメータ(best_result.config)
- メトリクス(best_result.metrics)
- チェックポイント(best_result.checkpoint)
# 例
best_result = result_grid.get_best_result()
print(best_result.config) # 最良試行のハイパーパラメータ
print(best_result.metrics) # 最良試行の最終結果
-
ExperimentAnalysis
- ExperimentAnalysis は、実験全体の結果を総合的に解析する
- 全試行の結果をデータフレームで取得(analysis.results_df)
- 最良試行のメトリクス解析:各試行のメトリクス(例: test_loss)の統計情報(min, max, mean)にアクセス
- 再解析:実験後、結果ディレクトリ(experiment_path)から結果を再解析可能
# 例
analysis = ExperimentAnalysis(experiment_path)
best_trial = analysis.best_trial
print(best_trial.metric_analysis["test_loss"]["min"]) # 最良試行の最小test_loss
print(analysis.results_df.head()) # 全試行の結果
可視化
plt.figure(figsize=(10, 6))
plt.plot(best_trial_df["epoch"], best_trial_df["train_loss"], label="Train Loss")
plt.plot(best_trial_df["epoch"], best_trial_df["test_loss"], label="Test Loss")
plt.title("Loss Learning Curve(best)")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()
実際に精度は向上するの?
電力消費データを使ってRNN、LSTM、GRUモデルを構築し、一つ一つ値を変えながらハイパラメータチューニングを行った結果、GRUモデルの「Test Loss: 0.0027、Test R2:0.913」がベストな精度でした。
今回、RayTuneを使用して同様に3つのモデルでハイパラメータチューニングを行った結果、GRUモデルで「Test Loss:0.0025、Test R2:0.920」を出すことができましたので、使った効果はありそうです!
以上です。読んでいただきありがとうございました。