以下の記事に書いたアルゴリズムを実際にコードにしたので置いておきます。
詳しい解説や結果は書くのが面倒になってしまいました...
AIさんにやらせればすぐに出ると思います。
結果
今回対象にした目的関数(等高線)と探索結果(ポイント)です。
x1方向に勾配が急峻(重要)、x2方向にはなだらか(重要でない)目的関数になっています。
この目的関数に対して、30回の探索試行を行った結果が散布図になります。
下のグラフは、10回目の探索におけるICEを用いた各パラメータ、各サンプルに対する予測曲線です。
x1の予測曲線の最大幅(MPDE)はおよそ15。
一方でx2は、最大でもおよそ4ほどです。
今回、このMPDEに対する閾値は10と設定しており、x1は重要である、x2は重要でないと判断され、x1のみの探索を11回目で行います。
このように目的関数の予想変化幅で探索空間を選択できるのがMPDE-BOの良い部分です。

準備
上記のアルゴリズムをPythonで実装してみます。
論文ではGPy(GPyTorch?)を使っているみたいですが、ここでは以下を理由にBoTorchを使用します。
- より実用的なMerten5/2+ARDカーネルと組み合わせて使いたい
- 最近よく使われる(イメージ)
自分が使ってみたい
使用バージョン
pyproject.toml
requires-python = ">=3.13"
dependencies = [
"torch>=2.0",
"botorch>=0.10",
"gpytorch>=1.11",
"numpy>=1.26",
]
コード
論文のアルゴリズムの図と対応するため、一部冗長な箇所があります。
import torch
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import LogExpectedImprovement, AcquisitionFunction
from botorch.optim import optimize_acqf
from gpytorch.kernels import MaternKernel, ScaleKernel
# ==========================================
# 1. 目的関数 (Gaussian Mixture)
# ==========================================
class GaussianMixtureFunction:
def __init__(self, n_dense=1, n_sparse=1, n_peak=4, seed=7):
self.dim = n_dense + n_sparse
torch.manual_seed(seed)
# 混合ガウス分布のパラメータ生成 (double precision)
self.centers = torch.rand(n_peak, self.dim, dtype=torch.double)
self.amplitudes = (torch.rand(n_peak, dtype=torch.double) + 0.5) * 10
self.sigma = (torch.rand(n_peak, self.dim, dtype=torch.double) + 0.5) * 1.5
# 次元の感度重み付け (Dense次元は重く、Sparse次元は軽く)
self.weights = torch.zeros(self.dim, dtype=torch.double)
self.weights[:n_dense] = 30.0 # Sensitive
self.weights[n_dense:] = 1.0 # Insensitive
def __call__(self, X):
# X: (batch, dim)
if X.ndim == 1:
X = X.unsqueeze(0)
X = X.to(dtype=torch.double) # Ensure double
# 各ピークとの距離計算 -> 重み付け -> ガウス和
weighted_sq_diff = (
self.weights * ((X.unsqueeze(1) - self.centers) / self.sigma).pow(2)
).sum(dim=-1)
responses = (self.amplitudes * torch.exp(-weighted_sq_diff)).sum(dim=-1)
return responses.unsqueeze(-1)
# ==========================================
# 2. スパースモデリング (ICE / MPDE)
# ==========================================
def compute_mpde(model, X, dim, resolution=50):
"""各次元のMPDE (Maximum Partial Dependence Effect) を計算
MPDEは、各サンプルのICE曲線における変動幅(max-min)の最大値として定義される。
"""
mpdes = torch.zeros(dim, dtype=torch.double)
n_samples = X.shape[0]
# 予測用グリッド: (n_samples, resolution)
grid = (
torch.linspace(0, 1, resolution, dtype=torch.double)
.expand(n_samples, resolution)
.to(X)
)
for i in range(dim):
# i番目の次元だけグリッド値を入れ、他は固定して予測 (ICEの定義)
X_ice = X.unsqueeze(1).expand(n_samples, resolution, dim).clone()
X_ice[:, :, i] = grid
X_ice = X_ice.reshape(-1, dim)
with torch.no_grad():
posterior = model.posterior(X_ice)
ice_values = posterior.mean.reshape(n_samples, resolution)
# 1. 各サンプルごとのICE曲線の変動幅 (max - min) を計算
min_vals = ice_values.min(dim=1).values
max_vals = ice_values.max(dim=1).values
diffs = max_vals - min_vals
# 2. 変動幅の最大値をMPDEとする
mpdes[i] = diffs.max()
return mpdes
# ==========================================
# 3. 獲得関数の部分空間最適化
# ==========================================
class DenseSubspaceAcquisition(AcquisitionFunction):
"""Dense次元のみ最適化し、Sparse次元は固定値で埋めるラッパー"""
def __init__(self, model, base_acq, dense_indices, full_dim):
super().__init__(model)
self.base_acq = base_acq
self.dense_indices = dense_indices
self.full_dim = full_dim
self.fixed_val = 0.5 # Sparse次元の埋め値
def forward(self, X_dense):
# X_dense: (batch, q, n_dense) -> X_full: (batch, q, full_dim)
batch_size, q = X_dense.shape[:2]
X_full = torch.full(
(batch_size, q, self.full_dim), self.fixed_val, dtype=torch.double
).to(X_dense)
for i, idx in enumerate(self.dense_indices):
X_full[..., idx] = X_dense[..., i]
return self.base_acq(X_full)
# ==========================================
# 4. メインループ
# ==========================================
def main():
# 最適化の設定
n_dense, n_sparse = 1, 1 # 要不要の次元数
dim = n_dense + n_sparse
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
objective = GaussianMixtureFunction(n_dense, n_sparse) # 目的関数の定義
n_trials = 30 # 試行回数
# 初期データ (2点) - double precision
train_X = torch.rand(2, dim, dtype=torch.double).to(device)
train_Y = objective(train_X).to(device)
# 閾値設定
ard_threshold = 100.0
mpde_threshold = 10.0
# 初期モデルを構築
covar_module = ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=dim))
model = SingleTaskGP(train_X, train_Y, covar_module=covar_module)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
for t in range(n_trials):
# 1. 長さスケール計算: ARDカーネルのパラメータを更新します。
lengthscales = model.covar_module.base_kernel.lengthscale.squeeze().detach()
# 2. MPDE計算: 各パラメータの影響度を物理量スケールで計算します。
mpdes = compute_mpde(model, train_X, dim)
# 3. 仕分け: MPDE閾値を使って、パラメータの要不要(Active / Inactive)を判定します。
dense_indices = [
i
for i in range(dim)
if lengthscales[i] < ard_threshold and mpdes[i] > mpde_threshold
]
sparse_indices = [i for i in range(dim) if i not in dense_indices]
print(
f"Trial {t+1}: Best Y = {train_Y.max().item():.4f}, Dense Dims = {dense_indices}"
)
# 4. 候補点算出: 「必要(Active)」と判定されたパラメータ空間だけで獲得関数を計算し、次の候補点を探します。
next_X = torch.zeros(1, dim, dtype=torch.double).to(device)
if dense_indices:
acq = DenseSubspaceAcquisition(
model=model,
base_acq=LogExpectedImprovement(model, best_f=train_Y.max()),
dense_indices=dense_indices,
full_dim=dim,
)
candidates, _ = optimize_acqf(
acq_function=acq,
bounds=torch.stack(
[
torch.zeros(len(dense_indices), dtype=torch.double),
torch.ones(len(dense_indices), dtype=torch.double),
]
).to(device),
q=1,
num_restarts=5,
raw_samples=20,
)
next_X[0, dense_indices] = candidates[0]
# 5. ランダム化: 「不要(Inactive)」とされたパラメータについては、ランダムサンプリングを行います
if sparse_indices:
next_X[0, sparse_indices] = torch.rand(
len(sparse_indices), dtype=torch.double
).to(device)
# 6. 実験・計測: 目的関数を計算し、新しいデータを取得します。
new_Y = objective(next_X)
train_X = torch.cat([train_X, next_X])
train_Y = torch.cat([train_Y, new_Y])
# 7. モデル更新: GPモデルを再構築します。
covar_module = ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=dim))
model = SingleTaskGP(train_X, train_Y, covar_module=covar_module)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
if __name__ == "__main__":
main()
