0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

研究者の直感で探索空間を削れるベイズ最適化手法【コード】

0
Posted at

以下の記事に書いたアルゴリズムを実際にコードにしたので置いておきます。

詳しい解説や結果は書くのが面倒になってしまいました...
AIさんにやらせればすぐに出ると思います。

結果

今回対象にした目的関数(等高線)と探索結果(ポイント)です。
x1方向に勾配が急峻(重要)、x2方向にはなだらか(重要でない)目的関数になっています。
この目的関数に対して、30回の探索試行を行った結果が散布図になります。

探索履歴.png

下のグラフは、10回目の探索におけるICEを用いた各パラメータ、各サンプルに対する予測曲線です。
x1の予測曲線の最大幅(MPDE)はおよそ15。
一方でx2は、最大でもおよそ4ほどです。
今回、このMPDEに対する閾値は10と設定しており、x1は重要である、x2は重要でないと判断され、x1のみの探索を11回目で行います。
このように目的関数の予想変化幅で探索空間を選択できるのがMPDE-BOの良い部分です。
ice.png

準備

上記のアルゴリズムをPythonで実装してみます。
論文ではGPy(GPyTorch?)を使っているみたいですが、ここでは以下を理由にBoTorchを使用します。

  • より実用的なMerten5/2+ARDカーネルと組み合わせて使いたい
  • 最近よく使われる(イメージ)
  • 自分が使ってみたい

使用バージョン

pyproject.toml
requires-python = ">=3.13"
dependencies = [
	"torch>=2.0",
	"botorch>=0.10",
	"gpytorch>=1.11",
	"numpy>=1.26",
]

コード

論文のアルゴリズムの図と対応するため、一部冗長な箇所があります。

import torch
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.acquisition import LogExpectedImprovement, AcquisitionFunction
from botorch.optim import optimize_acqf
from gpytorch.kernels import MaternKernel, ScaleKernel


# ==========================================
# 1. 目的関数 (Gaussian Mixture)
# ==========================================
class GaussianMixtureFunction:
    def __init__(self, n_dense=1, n_sparse=1, n_peak=4, seed=7):
        self.dim = n_dense + n_sparse
        torch.manual_seed(seed)

        # 混合ガウス分布のパラメータ生成 (double precision)
        self.centers = torch.rand(n_peak, self.dim, dtype=torch.double)
        self.amplitudes = (torch.rand(n_peak, dtype=torch.double) + 0.5) * 10
        self.sigma = (torch.rand(n_peak, self.dim, dtype=torch.double) + 0.5) * 1.5

        # 次元の感度重み付け (Dense次元は重く、Sparse次元は軽く)
        self.weights = torch.zeros(self.dim, dtype=torch.double)
        self.weights[:n_dense] = 30.0  # Sensitive
        self.weights[n_dense:] = 1.0  # Insensitive

    def __call__(self, X):
        # X: (batch, dim)
        if X.ndim == 1:
            X = X.unsqueeze(0)
        X = X.to(dtype=torch.double)  # Ensure double

        # 各ピークとの距離計算 -> 重み付け -> ガウス和
        weighted_sq_diff = (
            self.weights * ((X.unsqueeze(1) - self.centers) / self.sigma).pow(2)
        ).sum(dim=-1)
        responses = (self.amplitudes * torch.exp(-weighted_sq_diff)).sum(dim=-1)
        return responses.unsqueeze(-1)


# ==========================================
# 2. スパースモデリング (ICE / MPDE)
# ==========================================
def compute_mpde(model, X, dim, resolution=50):
    """各次元のMPDE (Maximum Partial Dependence Effect) を計算

    MPDEは、各サンプルのICE曲線における変動幅(max-min)の最大値として定義される。
    """
    mpdes = torch.zeros(dim, dtype=torch.double)
    n_samples = X.shape[0]

    # 予測用グリッド: (n_samples, resolution)
    grid = (
        torch.linspace(0, 1, resolution, dtype=torch.double)
        .expand(n_samples, resolution)
        .to(X)
    )

    for i in range(dim):
        # i番目の次元だけグリッド値を入れ、他は固定して予測 (ICEの定義)
        X_ice = X.unsqueeze(1).expand(n_samples, resolution, dim).clone()
        X_ice[:, :, i] = grid
        X_ice = X_ice.reshape(-1, dim)

        with torch.no_grad():
            posterior = model.posterior(X_ice)
            ice_values = posterior.mean.reshape(n_samples, resolution)

        # 1. 各サンプルごとのICE曲線の変動幅 (max - min) を計算
        min_vals = ice_values.min(dim=1).values
        max_vals = ice_values.max(dim=1).values
        diffs = max_vals - min_vals

        # 2. 変動幅の最大値をMPDEとする
        mpdes[i] = diffs.max()

    return mpdes


# ==========================================
# 3. 獲得関数の部分空間最適化
# ==========================================
class DenseSubspaceAcquisition(AcquisitionFunction):
    """Dense次元のみ最適化し、Sparse次元は固定値で埋めるラッパー"""

    def __init__(self, model, base_acq, dense_indices, full_dim):
        super().__init__(model)
        self.base_acq = base_acq
        self.dense_indices = dense_indices
        self.full_dim = full_dim
        self.fixed_val = 0.5  # Sparse次元の埋め値

    def forward(self, X_dense):
        # X_dense: (batch, q, n_dense) -> X_full: (batch, q, full_dim)
        batch_size, q = X_dense.shape[:2]
        X_full = torch.full(
            (batch_size, q, self.full_dim), self.fixed_val, dtype=torch.double
        ).to(X_dense)

        for i, idx in enumerate(self.dense_indices):
            X_full[..., idx] = X_dense[..., i]

        return self.base_acq(X_full)


# ==========================================
# 4. メインループ
# ==========================================
def main():
    # 最適化の設定
    n_dense, n_sparse = 1, 1  # 要不要の次元数
    dim = n_dense + n_sparse
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    objective = GaussianMixtureFunction(n_dense, n_sparse)  # 目的関数の定義

    n_trials = 30  # 試行回数

    # 初期データ (2点) - double precision
    train_X = torch.rand(2, dim, dtype=torch.double).to(device)
    train_Y = objective(train_X).to(device)

    # 閾値設定
    ard_threshold = 100.0
    mpde_threshold = 10.0

    # 初期モデルを構築
    covar_module = ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=dim))
    model = SingleTaskGP(train_X, train_Y, covar_module=covar_module)
    mll = ExactMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_mll(mll)

    for t in range(n_trials):
        # 1. 長さスケール計算: ARDカーネルのパラメータを更新します。
        lengthscales = model.covar_module.base_kernel.lengthscale.squeeze().detach()

        # 2. MPDE計算: 各パラメータの影響度を物理量スケールで計算します。
        mpdes = compute_mpde(model, train_X, dim)

        # 3. 仕分け: MPDE閾値を使って、パラメータの要不要(Active / Inactive)を判定します。
        dense_indices = [
            i
            for i in range(dim)
            if lengthscales[i] < ard_threshold and mpdes[i] > mpde_threshold
        ]
        sparse_indices = [i for i in range(dim) if i not in dense_indices]

        print(
            f"Trial {t+1}: Best Y = {train_Y.max().item():.4f}, Dense Dims = {dense_indices}"
        )

        # 4. 候補点算出: 「必要(Active)」と判定されたパラメータ空間だけで獲得関数を計算し、次の候補点を探します。
        next_X = torch.zeros(1, dim, dtype=torch.double).to(device)
        if dense_indices:
            acq = DenseSubspaceAcquisition(
                model=model,
                base_acq=LogExpectedImprovement(model, best_f=train_Y.max()),
                dense_indices=dense_indices,
                full_dim=dim,
            )
            candidates, _ = optimize_acqf(
                acq_function=acq,
                bounds=torch.stack(
                    [
                        torch.zeros(len(dense_indices), dtype=torch.double),
                        torch.ones(len(dense_indices), dtype=torch.double),
                    ]
                ).to(device),
                q=1,
                num_restarts=5,
                raw_samples=20,
            )
            next_X[0, dense_indices] = candidates[0]

        # 5. ランダム化: 「不要(Inactive)」とされたパラメータについては、ランダムサンプリングを行います
        if sparse_indices:
            next_X[0, sparse_indices] = torch.rand(
                len(sparse_indices), dtype=torch.double
            ).to(device)

        # 6. 実験・計測: 目的関数を計算し、新しいデータを取得します。
        new_Y = objective(next_X)
        train_X = torch.cat([train_X, next_X])
        train_Y = torch.cat([train_Y, new_Y])

        # 7. モデル更新: GPモデルを再構築します。
        covar_module = ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=dim))
        model = SingleTaskGP(train_X, train_Y, covar_module=covar_module)
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        fit_gpytorch_mll(mll)


if __name__ == "__main__":
    main()

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?