0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

botorch入門3 モデル作成3

Last updated at Posted at 2025-05-19

前回に引き続きモデルについて説明していきます。

今回は深層ガウス過程や深層カーネル学習などです。
公式の実装がないため独自の実装ですのでご了承ください

gpytorchの実装をもとにしています。

業務で使いたいなどあれば、公式の実装をおまちください。

致命的なミス・モデル改善に関するご意見があればコメントいただけますとありがたいです。
こういった話をする相手がいないのでお気軽に議論したいと思っています

12. DeepGP

概要:
DeepGP は多層構造(レイヤー)を持つガウス過程で、BoTorch には標準では存在しませんが、非線形かつ階層的な関係をモデリング可能な自作実装として応用されています。

主な用途:

  • 非定常・複雑な現象の表現力向上
  • 潜在空間の学習や表現学習の応用

特徴:

  • GPを複数重ねた構造(レイヤー構成)で表現力が高い
  • 内部的に潜在変数を介して非線形写像を学習
  • トレーニングはELBOによる近似推論を用い、通常のGPより難易度は高い

次のような関数からデータ点をサンプリングしてモデルの作成を行います。
まずは単目的モデルを作成します。

# import gpytorch
from torch import Tensor
from gpytorch.means import ConstantMean, LinearMean
from gpytorch.kernels import MaternKernel, ScaleKernel
from gpytorch.variational import VariationalStrategy, CholeskyVariationalDistribution
from gpytorch.distributions import MultivariateNormal, MultitaskMultivariateNormal
from gpytorch.models.deep_gps import DeepGPLayer, DeepGP
from gpytorch.likelihoods import MultitaskGaussianLikelihood
from botorch.models.gpytorch import GPyTorchModel
from botorch.models.utils.gpytorch_modules import (
    get_covar_module_with_dim_scaled_prior,
    get_gaussian_likelihood_with_lognormal_prior,
)
from gpytorch.mlls import DeepApproximateMLL, VariationalELBO
from botorch.posteriors.gpytorch import GPyTorchPosterior
class DeepGPHiddenLayer(DeepGPLayer):
    """
    Deep Gaussian Processの隠れ層を表現するクラス。

    Args:
        input_dims (int): 入力次元数。
        output_dims (Optional[int]): 出力次元数。スカラ出力の場合はNone。
        num_inducing (int): 誘導点の数。デフォルトは128。
        mean_type (str): 平均関数の種類('constant'または'linear')。デフォルトは'constant'"""
    def __init__(
        self,
        input_dims,
        output_dims=None,
        num_inducing=128,
        mean_type='constant'
    ):
        """
        初期化メソッド。

        誘導点、変分分布、変分戦略、平均関数、カーネルを設定します。

        Args:
            input_dims (int): 入力次元数。
            output_dims (Optional[int]): 出力次元数。
            num_inducing (int): 誘導点の数。
            mean_type (str): 平均関数の種類。
        """
        # 誘導点の初期化
        inducing_points = torch.randn(
            num_inducing, input_dims
        ) if output_dims is None else torch.randn(
            output_dims, num_inducing, input_dims
        )
        
        # バッチ形状の設定
        batch_shape = torch.Size([] if output_dims is None else [output_dims])

        # 変分分布と変分戦略の設定
        variational_distribution = CholeskyVariationalDistribution(
            num_inducing_points=num_inducing, batch_shape=batch_shape
        )
        variational_strategy = VariationalStrategy(
            self, inducing_points, variational_distribution, learn_inducing_locations=True
        )

        # 親クラスの初期化
        super().__init__(variational_strategy, input_dims, output_dims)

        # 平均関数の設定
        self.mean_module = ConstantMean() if mean_type == 'constant' else LinearMean(input_dims)
        
        # RBFカーネル(Maternカーネル)の設定
        self.covar_module = ScaleKernel(
            MaternKernel(nu=2.5, batch_shape=batch_shape, ard_num_dims=input_dims),
            batch_shape=batch_shape
        )

    def forward(self, x):
        """
        フォワードパスで多変量正規分布を計算する。

        Args:
            x (Tensor): 入力テンソル。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        mean_x = self.mean_module(x)  # 平均の計算
        covar_x = self.covar_module(x)  # 共分散の計算
        return MultivariateNormal(mean_x, covar_x)
class DeepGPModel(DeepGP, GPyTorchModel):
    """
    Deep Gaussian Processモデルクラス。

    複数の隠れ層を持つ深層ガウス過程を実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        list_hidden_dims (list): 隠れ層の次元リスト。デフォルトは[10, 10]。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        train_Yvar=None,
        likelihood=None,
        outcome_transform=None,
        list_hidden_dims=[10],
    ):
        """
        モデルの初期化。

        訓練データに基づいて隠れ層を構築し、出力変換および尤度を設定します。

        Args:
            train_X (Tensor): 訓練データの特徴量。
            train_Y (Tensor): 訓練データの観測値。
            train_Yvar (Optional[Tensor]): 観測ノイズの分散。
            likelihood (Optional): モデルの尤度関数。
            outcome_transform (Union[str, Standardize, None]): 出力変換。
            list_hidden_dims (list): 隠れ層の次元リスト。
        """
        super().__init__()

        # 入力次元数と出力次元数を取得
        input_dim = train_X.shape[-1]
        num_outputs = train_Y.shape[-1]

        # 入力データの検証
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        # 出力変換の設定
        if outcome_transform is None:
            outcome_transform = Standardize(m=train_Y.shape[-1], batch_shape=train_X.shape[:-2])
        
        if outcome_transform is not None:
            train_Y, train_Yvar = outcome_transform(train_Y, train_Yvar)
        
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        _, self._aug_batch_shape = self.get_batch_dimensions(train_X=train_X, train_Y=train_Y)

        self.train_inputs = train_X
        self.train_targets = train_Y

        # 隠れ層の構築
        self.hidden_layer = torch.nn.Sequential()
        input_dims = input_dim
        for i, dim in enumerate(list_hidden_dims):
            self.hidden_layer.add_module(
                f"hidden{i}", 
                DeepGPHiddenLayer(input_dims=input_dims, output_dims=dim, mean_type="linear")
            )
            input_dims = dim  # 次の層の入力次元を更新

        # 最終層の設定
        self.last_layer = DeepGPHiddenLayer(
            input_dims=list_hidden_dims[-1],
            output_dims=None if num_outputs == 1 else num_outputs,
            mean_type="constant"
        )

        # モデルの出力数と尤度を設定
        self._num_outputs = num_outputs
        if likelihood is None:
            self.likelihood = (
                get_gaussian_likelihood_with_lognormal_prior(batch_shape=self._aug_batch_shape)
                if num_outputs == 1
                else MultitaskGaussianLikelihood(num_tasks=num_outputs)
            )
        else:
            self.likelihood = likelihood

        # 出力変換を設定
        if outcome_transform is not None:
            self.outcome_transform = outcome_transform

    def forward(self, inputs, batch_mean=True):
        """
        入力データを処理し、多変量正規分布を計算。

        Args:
            inputs (Tensor): モデルへの入力データ。
            batch_mean (bool): バッチ全体の平均を計算するかどうか。デフォルトはTrue。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        h = self.hidden_layer(inputs)  # 隠れ層を通過
        output = self.last_layer(h)  # 最終層を通過

        # 平均と共分散を計算
        mean_x = output.mean.mean(0)
        covar_x = output.covariance_matrix.mean(0)

        if batch_mean:
            if self._num_outputs > 1:
                return MultitaskMultivariateNormal(mean_x, covar_x)
            else:
                return MultivariateNormal(mean_x, covar_x)
        else:
            return output
        return output

    @property
    def num_outputs(self) -> int:
        """
        モデルの出力次元数を取得。

        Returns:
            int: 出力次元数。
        """
        return self._num_outputs

    @staticmethod
    def get_batch_dimensions(
        train_X: Tensor, train_Y: Tensor
    ) -> tuple[torch.Size, torch.Size]:
        """
        入力データのバッチ形状と、出力次元を含む拡張バッチ形状を取得。

        Args:
            train_X (Tensor): 訓練データの特徴量。
            train_Y (Tensor): 訓練データの観測値。

        Returns:
            Tuple[torch.Size, torch.Size]:
                - 入力バッチ形状。
                - 出力次元を含む拡張バッチ形状。
        """
        input_batch_shape = train_X.shape[:-2]
        aug_batch_shape = input_batch_shape
        num_outputs = train_Y.shape[-1]
        if num_outputs > 1:
            aug_batch_shape += torch.Size([num_outputs])
        return input_batch_shape, aug_batch_shape
def fit_deepgp_mll(mll, lr=0.01, epoch=50):
    """
    モデルの周辺対数尤度 (Marginal Log Likelihood, MLL) を最適化して、DeepGPモデルを訓練します。

    Args:
        mll (gpytorch.mlls.MarginalLogLikelihood): 訓練対象の周辺対数尤度オブジェクト。
    """
    # モデルと尤度を訓練モードに設定
    mll.model.train()
    mll.model.likelihood.train()

    # Adamオプティマイザの設定(学習率: 0.01)
    optimizer = torch.optim.Adam(mll.model.parameters(), lr=lr)

    y_tensor = mll.model.train_targets
    x_tensor = mll.model.train_inputs
    
    # 訓練ループ
    for i in range(epoch):
        optimizer.zero_grad()  # 勾配の初期化
        output = mll.model.forward(x_tensor, False)  # モデルの出力を計算
        # 損失を計算 (出力次元数に応じて処理を分岐)
        if (len(y_tensor.shape)>1)&(y_tensor.shape[-1] > 1):
            loss = -mll(output, y_tensor)  # 多変量の場合
        else:
            loss = -mll(output, y_tensor.view(-1))  # 単変量の場合

        loss.backward()  # 逆伝播で勾配を計算
        optimizer.step()  # パラメータの更新
def blackbox(x):
    return np.sin(x*2)+np.cos(x/2-2)-x*0.1+np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y = blackbox(x)

plt.plot(x, y);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_84_1.png

train_x = torch.rand(10,1, dtype=torch.float32)*10
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(
    VariationalELBO(
        model.likelihood,
        model,
        len(train_x),
        beta=.01
    )
)

fit_deepgp_mll(mll)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy().ravel()
pred_std = pred.variance.sqrt().detach().numpy().ravel()

plt.plot(x.ravel(), pred_mean, color='orange')
plt.fill_between(x.ravel(), pred_mean+pred_std, pred_mean-pred_std, color='orange', alpha=.4)
plt.scatter(train_x, train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_87_1.png

次に多目的を考えます。

def blackbox1(x):
    return np.sin(x[:, 0:1]*2) + np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.1 + np.random.rand(len(x), 1)

def blackbox2(x):
    return -0.8*np.sin(x[:, 0:1]*2) - 1.2*np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.15 + np.random.rand(len(x), 1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1 = blackbox1(x)
y2 = blackbox2(x)
plt.plot(y1)
plt.plot(y2)

output_89_1.png

train_x = torch.rand(30,1, dtype=torch.float32)*10
train_y = torch.cat([blackbox1(train_x), blackbox2(train_x)], dim=1).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(
    VariationalELBO(
        model.likelihood,
        model,
        len(train_x),
        beta=.01
    )
)

fit_deepgp_mll(mll)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy()
pred_std = pred.variance.sqrt().detach().numpy()

plt.plot(x.ravel(), pred_mean[:,0], color='orange')
plt.fill_between(x.ravel(), pred_mean[:,0]+pred_std[:,0], pred_mean[:,0]-pred_std[:,0], color='orange', alpha=.4)

plt.plot(x.ravel(), pred_mean[:,1], color='green')
plt.fill_between(x.ravel(), pred_mean[:,1]+pred_std[:,1], pred_mean[:,1]-pred_std[:,1], color='green', alpha=.4)

plt.scatter(train_x, train_y[:,0], color='red')
plt.scatter(train_x, train_y[:,1], color='purple')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_92_1.png

最後に連続値+カテゴリ値を考えます。

from botorch.models.kernels.categorical import CategoricalKernel
from botorch.utils.transforms import normalize_indices
from gpytorch.constraints import GreaterThan
class DeepMixedGPHiddenLayer(DeepGPLayer):
    """
    混合型データ(カテゴリカルデータと連続データ)に対応したDeep Gaussian Processの隠れ層クラス。

    Args:
        input_dims (int): 入力次元数。
        output_dims (Optional[int]): 出力次元数。スカラー出力の場合はNone。
        ord_dims (Sequence[int]): 連続データの次元インデックス。
        cat_dims (Sequence[int]): カテゴリカルデータの次元インデックス。
        aug_batch_shape (torch.Size): カーネルのバッチ形状。
        num_inducing (int): 誘導点の数。デフォルトは128。
        mean_type (str): 平均関数の種類('constant'または'linear')。デフォルトは'constant'"""
    def __init__(
        self,
        input_dims,
        output_dims,
        ord_dims,
        cat_dims,
        num_inducing=128,
        mean_type='constant',
        input_data=None,
    ):
        """
        初期化メソッド。

        隠れ層の誘導点、変分分布、カーネル関数を設定します。

        Args:
            input_dims (int): 入力の次元数。
            output_dims (Optional[int]): 出力の次元数。
            ord_dims (Sequence[int]): 連続データの次元インデックス。
            cat_dims (Sequence[int]): カテゴリカルデータの次元インデックス。
            num_inducing (int): 誘導点の数。
            mean_type (str): 平均関数の種類。
        """        
        # 誘導点の初期化
        inducing_points = (
            torch.randn(num_inducing, input_dims) if output_dims is None
            else torch.randn(output_dims, num_inducing, input_dims)
        )
        
        for i in range(len(cat_dims)):
            if output_dims is None:
                inducing_points[:,cat_dims_ind[i]] = torch.tensor(
                    np.random.choice(np.unique(input_data[:,cat_dims[i]]), (num_inducing))
                )
            else:
                inducing_points[:,:,cat_dims[i]] = torch.tensor(
                    np.random.choice(np.unique(input_data[:,cat_dims[i]]), (output_dims, num_inducing))
                )
        batch_shape = torch.Size([] if output_dims is None else [output_dims])

        # 変分分布の初期化
        variational_distribution = CholeskyVariationalDistribution(
            num_inducing_points=num_inducing, batch_shape=batch_shape
        )

        # 変分戦略の設定
        variational_strategy = VariationalStrategy(
            self,
            inducing_points,
            variational_distribution,
            learn_inducing_locations=True,
        )

        # 親クラスの初期化
        super().__init__(variational_strategy, input_dims, output_dims)
        
        # 平均関数の選択
        self.mean_module = (
            ConstantMean() if mean_type == 'constant' else LinearMean(input_dims)
        )

        # カーネル関数の構築(混合データに対応)
        if len(ord_dims) == 0:
            # 連続データがなく、カテゴリカルデータのみの場合
            self.covar_module = ScaleKernel(
                CategoricalKernel(
                    batch_shape=batch_shape,
                    ard_num_dims=len(cat_dims),
                    lengthscale_constraint=GreaterThan(1e-6),
                )
            )
        else:
            # 連続データとカテゴリカルデータの両方が存在する場合
            cont_kernel_factory = get_covar_module_with_dim_scaled_prior

            # 和カーネル(連続データ + カテゴリカルデータ)
            sum_kernel = ScaleKernel(
                cont_kernel_factory(
                    batch_shape=batch_shape,
                    ard_num_dims=len(ord_dims),
                    active_dims=ord_dims,
                )
                + ScaleKernel(
                    CategoricalKernel(
                        batch_shape=batch_shape,
                        ard_num_dims=len(cat_dims),
                        active_dims=cat_dims,
                        lengthscale_constraint=GreaterThan(1e-6),
                    )
                )
            )

            # 積カーネル(連続データ * カテゴリカルデータ)
            prod_kernel = ScaleKernel(
                cont_kernel_factory(
                    batch_shape=batch_shape,
                    ard_num_dims=len(ord_dims),
                    active_dims=ord_dims,
                )
                * CategoricalKernel(
                    batch_shape=batch_shape,
                    ard_num_dims=len(cat_dims),
                    active_dims=cat_dims,
                    lengthscale_constraint=GreaterThan(1e-6),
                )
            )

            # 和カーネルと積カーネルを合成
            self.covar_module = sum_kernel + prod_kernel

    def forward(self, x):
        """
        入力データに対して平均関数と共分散関数を適用し、多変量正規分布を計算します。

        Args:
            x (Tensor): 入力テンソル。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        mean_x = self.mean_module(x)  # 平均の計算
        covar_x = self.covar_module(x)  # 共分散の計算
        return MultivariateNormal(mean_x, covar_x)
class DeepMixedGPModel(DeepGP, GPyTorchModel):
    """
    Deep Gaussian Processモデル(混合データ対応)。

    このモデルは、カテゴリデータと連続データの混在した入力を扱う深層ガウス過程モデルを実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        cat_dims (Sequence[int]): 入力のカテゴリ次元のインデックス。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        hidden_dim (int): 隠れ層の次元数。デフォルトは10。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        cat_dims,
        train_Yvar=None,
        likelihood=None,
        outcome_transform=None,
        hidden_dim=8
    ):
        """
        モデルの初期化。

        訓練データに基づいて、隠れ層や出力層を設定します。

        Args:
            train_X (Tensor): 訓練データの特徴量。
            train_Y (Tensor): 訓練データの観測値。
            cat_dims (Sequence[int]): 入力のカテゴリ次元のインデックス。
            train_Yvar (Optional[Tensor]): 観測ノイズの分散。
            likelihood (Optional): モデルの尤度関数。
            outcome_transform (Union[str, Standardize, None]): 出力変換。
            hidden_dim (int): 隠れ層の次元数。
        """
        super().__init__()
        input_dim = train_X.shape[-1]
        num_outputs = train_Y.shape[-1]

        # 入力データの検証
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        # 出力変換を適用(デフォルトで標準化を使用)
        if outcome_transform is None:
            outcome_transform = Standardize(
                m=train_Y.shape[-1], batch_shape=train_X.shape[:-2]
            )
        if outcome_transform is not None:
            train_Y, train_Yvar = outcome_transform(train_Y, train_Yvar)
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        self.train_inputs = train_X
        self.train_targets = train_Y

        # カテゴリ次元が指定されていない場合にエラーをスロー
        if len(cat_dims) == 0:
            raise ValueError("カテゴリ次元を指定する必要があります (cat_dims)。")

        # 入力データの次元を正規化して、連続データとカテゴリカルデータを分ける
        self._ignore_X_dims_scaling_check = cat_dims
        _, self._aug_batch_shape = self.get_batch_dimensions(train_X=train_X, train_Y=train_Y)

        d = train_X.shape[-1]
        cat_dims = normalize_indices(indices=cat_dims, d=d)
        ord_dims = sorted(set(range(d)) - set(cat_dims))

        # 入力層の定義(混合データ対応)
        self.input_layer = DeepMixedGPHiddenLayer(
            input_dims=input_dim,
            output_dims=hidden_dim,
            ord_dims=ord_dims,
            cat_dims=cat_dims,
            num_inducing=128,
            mean_type="linear",
            input_data=train_X
        )
        
        # 最終層の定義
        self.last_layer = DeepGPHiddenLayer(
            input_dims=hidden_dim,
            output_dims=None if num_outputs == 1 else num_outputs,
            mean_type="constant",
        )

        # モデルの出力数と尤度を設定
        self._num_outputs = num_outputs
        if likelihood is None:
            if num_outputs == 1:
                self.likelihood = get_gaussian_likelihood_with_lognormal_prior(
                            batch_shape=self._aug_batch_shape
                        )
            else:
                self.likelihood = MultitaskGaussianLikelihood(num_tasks=num_outputs)
        else:
            self.likelihood = likelihood

        # 出力変換を設定
        if outcome_transform is not None:
            self.outcome_transform = outcome_transform

    def forward(self, inputs, batch_mean=True):
        """
        入力データを処理し、多変量正規分布を計算。

        Args:
            inputs (Tensor): モデルへの入力データ。
            batch_mean (bool): バッチ全体の平均を計算するかどうか。デフォルトはTrue。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        # 入力層を通過
        h = self.input_layer(inputs)
        # 最終層を通過して出力を取得
        output = self.last_layer(h)

        # 平均と共分散を計算
        mean_x = output.mean.mean(0)
        covar_x = output.covariance_matrix.mean(0)

        # バッチ平均を計算
        if batch_mean:
            if self._num_outputs > 1:
                return MultitaskMultivariateNormal(mean_x, covar_x)
            else:
                return MultivariateNormal(mean_x, covar_x)
        else:
            return output

    @property
    def num_outputs(self) -> int:
        """
        モデルの出力次元数を取得。

        Returns:
            int: 出力次元数。
        """
        return self._num_outputs

    @staticmethod
    def get_batch_dimensions(
        train_X: Tensor, train_Y: Tensor
    ) -> tuple[torch.Size, torch.Size]:
        """
        入力データのバッチ形状と、出力次元を含む拡張バッチ形状を取得。

        Args:
            train_X (Tensor): 訓練データの特徴量 (n x d または batch_shape x n x d)。
            train_Y (Tensor): 訓練データの観測値 (n x m または batch_shape x n x m)。

        Returns:
            Tuple[torch.Size, torch.Size]:
                - 入力バッチ形状。
                - 出力次元を含む拡張バッチ形状 (input_batch_shape x (m))。
        """
        input_batch_shape = train_X.shape[:-2]
        aug_batch_shape = input_batch_shape
        num_outputs = train_Y.shape[-1]
        if num_outputs > 1:
            aug_batch_shape += torch.Size([num_outputs])
        return input_batch_shape, aug_batch_shape
def blackbox(x, noise=True):
    if type(x)==np.ndarray:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).astype(int)-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).astype(int) + int(noise) * np.random.rand(len(x),1)
    else:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).int()-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).int() + int(noise) * np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1=blackbox(np.concatenate([x,np.zeros_like(x)], axis=1), noise=False)
y2=blackbox(np.concatenate([x,np.ones_like(x)], axis=1), noise=False)

plt.plot(x[:,0], y1);
plt.plot(x[:,0], y2);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_97_1.png

train_x = torch.cat([torch.rand(30,1, dtype=torch.float32)*10, torch.tensor(np.random.choice([0,1], (30,1)))], dim=1)
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepMixedGPModel(
    train_x_scaled,
    train_y,
    cat_dims=[1],
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(
    VariationalELBO(
        model.likelihood,
        model,
        len(train_x),
        beta=.01
    )
)

fit_deepgp_mll(mll)
x = torch.arange(0,10,0.1).reshape(-1,1)
x1 = torch.cat([x, torch.zeros_like(x)], axis=1)
x2 = torch.cat([x, torch.ones_like(x)], axis=1)
x_norm1 = normalize(x1, bounds)
x_norm2 = normalize(x2, bounds)

pred1 = model.posterior(x_norm1)
pred1_mean = pred1.mean.detach().numpy().ravel()
pred1_std = pred1.variance.sqrt().detach().numpy().ravel()

pred2 = model.posterior(x_norm2)
pred2_mean = pred2.mean.detach().numpy().ravel()
pred2_std = pred2.variance.sqrt().detach().numpy().ravel()

plt.plot(x, pred1_mean, color='orange')
plt.fill_between(x.ravel(), pred1_mean+pred1_std, pred1_mean-pred1_std, color='orange', alpha=.4)

plt.plot(x, pred2_mean, color='green')
plt.fill_between(x.ravel(), pred2_mean+pred2_std, pred2_mean-pred2_std, color='green', alpha=.4)

plt.scatter(train_x[:,0], train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_100_1.png

13. Deep Kernel Learning (DKL)

概要:
DeepKernelGP は、ニューラルネットワークで特徴量変換を行った後にガウス過程を適用する手法で、深層特徴抽出器とGPの組み合わせとして機能します。

主な用途:

  • 高次元画像・テキストなどの複雑特徴抽出
  • データの表現を学習しつつ不確実性も扱いたい場面

特徴:

  • 特徴抽出器(CNN, MLPなど) + GP の構成
  • 通常のカーネルでは捉えにくいパターンを抽出
  • PyTorchとの連携性が高く、BoTorchでも自作で拡張可能

次のような関数からデータ点をサンプリングしてモデルの作成を行います。
ディープカーネルとして4層のMLPを考えます。まずは単目的モデルを作成します。

import torch.nn as nn
from gpytorch.utils.grid import ScaleToBounds
from gpytorch.models import ExactGP
from gpytorch.means import ConstantMean, MultitaskMean
from gpytorch.kernels import ScaleKernel, RBFKernel, MultitaskKernel
from botorch.models.gpytorch import BatchedMultiOutputGPyTorchModel
class LargeFeatureExtractor(torch.nn.Sequential):
    """
    LargeFeatureExtractorは、入力次元から指定された出力次元までの特徴を抽出するための多層パーセプトロン(MLP)です。
    
    Attributes:
        input_dim (int): 入力の特徴量の次元数
        output_dim (int): 出力の特徴量の次元数
        hidden_dims (list[int]): 隠れ層の次元数。デフォルトは[100, 50, 10]。
    """

    def __init__(self, input_dim, output_dim):
        """
        LargeFeatureExtractorのコンストラクタ。
        
        Args:
            input_dim (int): 入力データの次元数
            output_dim (int): 出力データの次元数
            hidden_dims (list[int], optional): 隠れ層の次元数。デフォルトは[100, 50, 10]。
        """
        super(LargeFeatureExtractor, self).__init__()

        # 最初の線形変換 (入力次元 -> 隠れ層1)
        self.add_module('linear1', torch.nn.Linear(input_dim, input_dim*10))
        # ReLU 活性化関数
        self.add_module('relu1', torch.nn.ReLU())

        # 2番目の線形変換 (隠れ層1 -> 隠れ層2)
        self.add_module('linear2', torch.nn.Linear(input_dim*10, input_dim*5))
        # ReLU 活性化関数
        self.add_module('relu2', torch.nn.ReLU())

        # 3番目の線形変換 (隠れ層2 -> 隠れ層3)
        self.add_module('linear3', torch.nn.Linear(input_dim*5, input_dim*2))
        # ReLU 活性化関数
        self.add_module('relu3', torch.nn.ReLU())

        # 最後の線形変換 (隠れ層3 -> 出力層)
        self.add_module('linear4', torch.nn.Linear(input_dim*2, output_dim))
class DeepKernel(ExactGP):
    """
    DeepKernelModelは、深層学習を組み合わせたガウス過程モデルです。
    特徴抽出器(Deep Feature Extractor)を使用して入力データを変換し、その後ガウス過程回帰に基づいて予測を行います。

    Attributes:
        mean_module (gpytorch.modules.mean.Mean): 平均モジュール
        covar_module (gpytorch.kernels.Kernel): 共分散モジュール
        feature_extractor (LargeFeatureExtractor): 特徴抽出器
        scale_to_bounds (ScaleToBounds): 入力データを[-1, 1]の範囲にスケーリング
    """

    def __init__(self, train_x, train_y, likelihood):
        """
        DeepKernelModelのコンストラクタ。

        Args:
            train_x (Tensor): 学習データの特徴量
            train_y (Tensor): 学習データのターゲット値
            likelihood (gpytorch.likelihoods.Likelihood): 尤度関数
        """
        super(DeepKernel, self).__init__(train_x, train_y, likelihood)
        
        # 入力データの次元数を取得
        input_dim = train_x.size(-1)
        num_outputs = train_y.shape[-1] if (len(train_y.shape)>1)&(train_y.shape[-1]!=1) else None
        self.num_outputs = train_y.shape[-1] if (len(train_y.shape)>1)&(train_y.shape[-1]!=1) else 1
        batch_shape = torch.Size([] if num_outputs is None else [num_outputs])

        # 定数平均関数を設定
        self.mean_module = ConstantMean(batch_shape=batch_shape)
        if num_outputs is None:
            self.mean_module = ConstantMean(batch_shape=batch_shape)
            self.covar_module = ScaleKernel(
                    RBFKernel(
                        batch_shape=batch_shape,
                         ard_num_dims=train_x.size(1)
                    ),
                    batch_shape=batch_shape)
        else:
            self.mean_module = MultitaskMean(
                ConstantMean(), num_tasks=train_y.shape[-1]
            )
            self.covar_module = MultitaskKernel(
                        ScaleKernel(RBFKernel(ard_num_dims=train_x.size(1))),
                num_tasks=train_y.shape[-1])

        # 特徴抽出器を初期化 (データの次元数を使用)
        self.feature_extractor = LargeFeatureExtractor(
            input_dim=input_dim,
            output_dim=input_dim
        )

        # 入力データを[-1, 1]の範囲にスケーリングするためのモジュール
        self.scale_to_bounds = ScaleToBounds(-1., 1.)

    def forward(self, inputs):
        """
        モデルの順伝播を定義します。入力データを特徴抽出器を通して処理し、ガウス過程による予測を行います。

        Args:
            x (Tensor): 入力データ

        Returns:
            MultivariateNormal: ガウス過程の予測結果(平均と共分散)
        """
        # データを特徴抽出器で変換
        projected_x = self.feature_extractor(inputs)

        # スケーリングを適用
        projected_x = self.scale_to_bounds(projected_x)

        # 変換されたデータに基づいて平均と共分散を計算
        mean_x = self.mean_module(projected_x)
        covar_x = self.covar_module(projected_x)

        # 平均と共分散からガウス過程の分布を作成
        if self.num_outputs == 1:
            return MultivariateNormal(mean_x, covar_x)
        else:
            return MultitaskMultivariateNormal(mean_x, covar_x)
class DeepKernelGPModel(DeepGP, GPyTorchModel):
    """
    Deep Gaussian Processモデルクラス。

    このモデルは複数の隠れ層を持つ深層ガウス過程を実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        list_hidden_dims (list): 隠れ層の次元リスト。デフォルトは[10, 10]。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        train_Yvar=None,
        likelihood=None,
        outcome_transform=None
    ):
        super().__init__()

        # 入力次元数と出力次元数を取得
        input_dim = train_X.shape[-1]
        num_outputs = train_Y.shape[-1] if (len(train_Y.shape)>1)&(train_Y.shape[-1]!=1) else None
        # モデルの出力数と尤度を設定
        self._num_outputs = train_Y.shape[-1] if (len(train_Y.shape)>1)&(train_Y.shape[-1]!=1) else 1
        
        # 入力データの検証
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        # 出力変換の適用(デフォルトで標準化を使用)
        if outcome_transform is None:
            outcome_transform = Standardize(
                m=num_outputs, batch_shape=train_X.shape[:-2]
            )
        if outcome_transform is not None:
            train_Y, train_Yvar = outcome_transform(train_Y, train_Yvar)
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        if train_Y.shape[-1] == 1:
            train_Y = train_Y.ravel()

        train_X = train_X.to(torch.float32)
        train_Y = train_Y.to(torch.float32)
        
        self.train_inputs = train_X
        self.train_targets = train_Y

        if likelihood is None:
            if num_outputs is None:
                self.likelihood = GaussianLikelihood()  # 単変量の場合
            else:
                self.likelihood = MultitaskGaussianLikelihood(num_tasks=train_Y.shape[-1])  # 多変量の場合

        else:
            self.likelihood = likelihood

        self.deepkernel = DeepKernel(train_X, train_Y, self.likelihood)

        # 出力変換を設定
        if outcome_transform is not None:
            self.outcome_transform = outcome_transform

    def forward(self, inputs):
        """
        入力データを処理し、多変量正規分布を計算。

        Args:
            inputs (Tensor): モデルへの入力データ。
            batch_mean (bool): バッチ全体の平均を計算するかどうか。デフォルトはTrue。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        return self.deepkernel(inputs)

    def predict(self, inputs):
        """
        モデルによる予測を計算。

        Args:
            x (Tensor): 予測対象の入力データ。

        Returns:
            Tuple[Tensor, Tensor]: 予測の平均と分散。
        """
        preds = self.likelihood(self(inputs))
        y_pred = preds.mean
        y_var = preds.variance
        y_pred, y_var = self.outcome_transform.untransform(y_pred, y_var)
        return y_pred, y_var

    @property
    def num_outputs(self) -> int:
        """
        モデルの出力次元数を取得。

        Returns:
            int: 出力次元数。
        """
        return self._num_outputs
def fit_deepkernel_mll(mll, lr=0.01, epoch=50):
    """
    モデルの周辺対数尤度 (Marginal Log Likelihood, MLL) を最適化して、DeepGPモデルを訓練します。

    Args:
        mll (gpytorch.mlls.MarginalLogLikelihood): 訓練対象の周辺対数尤度オブジェクト。
    """
    # モデルと尤度を訓練モードに設定
    mll.model.train()
    mll.model.likelihood.train()

    # Adamオプティマイザの設定(学習率: 0.01)
    optimizer = torch.optim.Adam([
        {'params': mll.model.deepkernel.feature_extractor.parameters()},
        {'params': mll.model.deepkernel.covar_module.parameters()},
        {'params': mll.model.deepkernel.mean_module.parameters()},
        {'params': mll.model.deepkernel.likelihood.parameters()},
    ], lr=lr)

    y_tensor = mll.model.train_targets
    x_tensor = mll.model.train_inputs
    
    # 訓練ループ
    for i in range(epoch):
        optimizer.zero_grad()  # 勾配の初期化
        output = mll.model.forward(x_tensor)  # モデルの出力を計算
        # 損失を計算 (出力次元数に応じて処理を分岐)
        if (len(y_tensor.shape)>1)&(y_tensor.shape[-1]>1):
            loss = -mll(output, y_tensor)  # 多変量の場合
        else:
            loss = -mll(output, y_tensor.view(-1))  # 単変量の場合

        loss.backward()  # 逆伝播で勾配を計算
        optimizer.step()  # パラメータの更新
def blackbox(x):
    return np.sin(x*2)+np.cos(x/2-2)-x*0.1+np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y = blackbox(x)

plt.plot(x, y);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_107_1.png

train_x = torch.rand(50,1, dtype=torch.float32)*10
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = ExactMarginalLogLikelihood(
    model.likelihood,
    model
)

fit_deepkernel_mll(mll, lr=1e-2, epoch=300)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy().ravel()
pred_std = pred.variance.sqrt().detach().numpy().ravel()

plt.plot(x.ravel(), pred_mean, color='orange')
plt.fill_between(x.ravel(), pred_mean+pred_std, pred_mean-pred_std, color='orange', alpha=.4)
plt.scatter(train_x, train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_110_1.png

次に多目的を考えます。

def blackbox1(x):
    return np.sin(x[:, 0:1]*2) + np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.1 + np.random.rand(len(x), 1)

def blackbox2(x):
    return -0.8*np.sin(x[:, 0:1]*2) - 1.2*np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.15 + np.random.rand(len(x), 1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1 = blackbox1(x)
y2 = blackbox2(x)
plt.plot(y1)
plt.plot(y2)

output_111_1.png

train_x = torch.rand(40,1, dtype=torch.float32)*10
train_y = torch.cat([blackbox1(train_x), blackbox2(train_x)], dim=1).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = ExactMarginalLogLikelihood(
    model.likelihood,
    model
)

fit_deepkernel_mll(mll, lr=1e-2, epoch=300)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy()
pred_std = pred.variance.sqrt().detach().numpy()

plt.plot(x.ravel(), pred_mean[:,0], color='orange')
plt.fill_between(x.ravel(), pred_mean[:,0]+pred_std[:,0], pred_mean[:,0]-pred_std[:,0], color='orange', alpha=.4)

plt.plot(x.ravel(), pred_mean[:,1], color='green')
plt.fill_between(x.ravel(), pred_mean[:,1]+pred_std[:,1], pred_mean[:,1]-pred_std[:,1], color='green', alpha=.4)

plt.scatter(train_x, train_y[:,0], color='red')
plt.scatter(train_x, train_y[:,1], color='purple')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_114_1.png

最後に連続値+カテゴリ値のモデルを作成します。

class DeepKernelMixed(BatchedMultiOutputGPyTorchModel, ExactGP):
    """
    DeepKernelModelは、深層学習を組み合わせたガウス過程モデルです。
    特徴抽出器(Deep Feature Extractor)を使用して入力データを変換し、その後ガウス過程回帰に基づいて予測を行います。

    Attributes:
        mean_module (gpytorch.modules.mean.Mean): 平均モジュール
        covar_module (gpytorch.kernels.Kernel): 共分散モジュール
        feature_extractor (LargeFeatureExtractor): 特徴抽出器
        scale_to_bounds (ScaleToBounds): 入力データを[-1, 1]の範囲にスケーリング
    """

    def __init__(self, train_x, train_y, cat_dims, likelihood):
        """
        DeepKernelModelのコンストラクタ。

        Args:
            train_x (Tensor): 学習データの特徴量
            train_y (Tensor): 学習データのターゲット値
            likelihood (gpytorch.likelihoods.Likelihood): 尤度関数
        """
        super(DeepKernelMixed, self).__init__(train_x, train_y, likelihood)

        # 入力データの次元数を取得
        data_dim = train_x.size(-1)
        self._num_outputs = train_y.shape[-1] if (len(train_y.shape)>1)&(train_y.shape[-1]!=1) else 1

        # カテゴリ次元が指定されていない場合にエラーをスロー
        if len(cat_dims) == 0:
            raise ValueError(
                "カテゴリ次元を指定する必要があります (cat_dims)。"
            )

        # 次元の正規化と分割
        self._ignore_X_dims_scaling_check = cat_dims
        _, aug_batch_shape = self.get_batch_dimensions(train_X=train_x, train_Y=train_y)
        # aug_batch_shape = torch.Size([1])
        aug_batch_shape = train_x.shape[:-2]

        d = train_x.shape[-1]
        self.cat_dims = cat_dims
        cat_dims = normalize_indices(indices=cat_dims, d=d)
        ord_dims = sorted(set(range(d)) - set(cat_dims))

        self.feature_extractor = LargeFeatureExtractor(
            input_dim=len(ord_dims),
            output_dim=len(ord_dims)
        )

        # 平均関数の選択
        if self._num_outputs==1:
            self.mean_module = ConstantMean(batch_shape=aug_batch_shape)
        else:
            self.mean_module = MultitaskMean(
                ConstantMean(), num_tasks=train_y.shape[-1]
            )

        # カーネル関数の構築(混合データに対応)
        if len(ord_dims) == 0:
            # 連続データがなく、カテゴリカルデータのみの場合
            self.covar_module = ScaleKernel(
                CategoricalKernel(
                    batch_shape=aug_batch_shape,
                    ard_num_dims=len(cat_dims),
                    lengthscale_constraint=GreaterThan(1e-6),
                )
            )
        else:
            # 連続データとカテゴリカルデータの両方が存在する場合
            cont_kernel_factory = get_covar_module_with_dim_scaled_prior

            # 和カーネル
            sum_kernel = ScaleKernel(
                cont_kernel_factory(
                    batch_shape=aug_batch_shape,
                    ard_num_dims=len(ord_dims),
                    active_dims=ord_dims,
                )
                + ScaleKernel(
                    CategoricalKernel(
                        batch_shape=aug_batch_shape,
                        ard_num_dims=len(cat_dims),
                        active_dims=cat_dims,
                        lengthscale_constraint=GreaterThan(1e-6),
                    )
                )
            )

            # 積カーネル
            prod_kernel = ScaleKernel(
                cont_kernel_factory(
                    batch_shape=aug_batch_shape,
                    ard_num_dims=len(ord_dims),
                    active_dims=ord_dims,
                )
                * CategoricalKernel(
                    batch_shape=aug_batch_shape,
                    ard_num_dims=len(cat_dims),
                    active_dims=cat_dims,
                    lengthscale_constraint=GreaterThan(1e-6),
                )
            )

        # 和カーネルと積カーネルを合成
        if self._num_outputs==1:
            self.covar_module = sum_kernel + prod_kernel
        else:
            self.covar_module = MultitaskKernel(
                sum_kernel + prod_kernel,
                num_tasks=train_y.shape[-1]
            )

        # 入力データを[-1, 1]の範囲にスケーリングするためのモジュール
        self.scale_to_bounds = ScaleToBounds(-1., 1.)

    def forward(self, x):
        """
        モデルの順伝播を定義します。入力データを特徴抽出器を通して処理し、ガウス過程による予測を行います。

        Args:
            x (Tensor): 入力データ

        Returns:
            MultivariateNormal: ガウス過程の予測結果(平均と共分散)
        """
        # 抜き出した列を取得
        extracted_columns = x[..., self.cat_dims]
        
        # 残りの列を取得
        remaining_columns = x[..., [i for i in range(x.size(-1)) if i not in self.cat_dims]]

        # データを特徴抽出器で変換
        projected_x = self.feature_extractor(remaining_columns)

        # スケーリングを適用
        projected_x = self.scale_to_bounds(projected_x)

        # 抜き出した列を戻す処理
        restored_tensor = remaining_columns.clone()  # 残りのテンソルをコピーして初期化
        for idx, col_idx in enumerate(self.cat_dims):
            restored_tensor = torch.cat((projected_x[..., :col_idx],
                                         extracted_columns[..., idx:idx+1],
                                         projected_x[..., col_idx:]), dim=len(x.shape)-1)
        
        # 変換されたデータに基づいて平均と共分散を計算
        mean_x = self.mean_module(restored_tensor)
        covar_x = self.covar_module(restored_tensor)
        # 平均と共分散からガウス過程の分布を作成
        # return  gpytorch.distributions.MultitaskMultivariateNormal(mean_x, covar_x)
        if self._num_outputs==1:
            return MultivariateNormal(mean_x, covar_x)
        else:
            return MultitaskMultivariateNormal(mean_x, covar_x)
class DeepKernelMixedGPModel(DeepGP, GPyTorchModel):
    """
    Deep Gaussian Processモデルクラス。

    このモデルは複数の隠れ層を持つ深層ガウス過程を実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        list_hidden_dims (list): 隠れ層の次元リスト。デフォルトは[10, 10]。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        train_Yvar=None,
        cat_dims=None,
        likelihood=None,
        outcome_transform=None,
        list_hidden_dims=[10, 10],
    ):
        super().__init__()

        # 入力次元数と出力次元数を取得
        input_dim = train_X.shape[-1]
        num_outputs = train_Y.shape[-1] if (len(train_Y.shape)>1)&(train_Y.shape[-1]!=1) else None
        # モデルの出力数と尤度を設定
        self._num_outputs = train_Y.shape[-1] if (len(train_Y.shape)>1)&(train_Y.shape[-1]!=1) else 1
        
        # 入力データの検証
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        # 出力変換の適用(デフォルトで標準化を使用)
        if outcome_transform is None:
            outcome_transform = Standardize(
                m=train_Y.shape[-1], batch_shape=train_X.shape[:-2]
            )
        else:
            train_Y, train_Yvar = outcome_transform(train_Y, train_Yvar)
        self._validate_tensor_args(X=train_X, Y=train_Y, Yvar=train_Yvar)

        if train_Y.shape[-1] == 1:
            train_Y = train_Y.ravel()

        train_X = torch.tensor(train_X, dtype=torch.float32)
        train_Y = torch.tensor(train_Y, dtype=torch.float32)

        self.train_inputs = train_X
        self.train_targets = train_Y

        if likelihood is None:
            if num_outputs is None:
                self.likelihood = GaussianLikelihood()  # 単変量の場合
            else:
                self.likelihood = MultitaskGaussianLikelihood(num_tasks=train_Y.shape[-1])  # 多変量の場合

        else:
            self.likelihood = likelihood

        self.deepkernel = DeepKernelMixed(train_X, train_Y, cat_dims, self.likelihood)

        # 出力変換を設定
        if outcome_transform is not None:
            self.outcome_transform = outcome_transform

    def forward(self, inputs):
        """
        入力データを処理し、多変量正規分布を計算。

        Args:
            inputs (Tensor): モデルへの入力データ。
            batch_mean (bool): バッチ全体の平均を計算するかどうか。デフォルトはTrue。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        return self.deepkernel(inputs)

    def predict(self, inputs):
        """
        モデルによる予測を計算。

        Args:
            x (Tensor): 予測対象の入力データ。

        Returns:
            Tuple[Tensor, Tensor]: 予測の平均と分散。
        """
        preds = self.likelihood(self.forward(inputs))
        y_pred = preds.mean
        y_var = preds.variance
        y_pred, y_var = self.outcome_transform.untransform(y_pred, y_var)
        return y_pred, y_var

    @property
    def num_outputs(self) -> int:
        """
        モデルの出力次元数を取得。

        Returns:
            int: 出力次元数。
        """
        return self._num_outputs
def blackbox(x, noise=True):
    if type(x)==np.ndarray:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).astype(int)-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).astype(int) + int(noise) * np.random.rand(len(x),1)
    else:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).int()-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).int() + int(noise) * np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1=blackbox(np.concatenate([x,np.zeros_like(x)], axis=1), noise=False)
y2=blackbox(np.concatenate([x,np.ones_like(x)], axis=1), noise=False)

plt.plot(x[:,0], y1);
plt.plot(x[:,0], y2);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_117_1.png

train_x = torch.cat([torch.rand(30,1, dtype=torch.float32)*10, torch.tensor(np.random.choice([0,1], (30,1)))], dim=1)
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelMixedGPModel(
    train_x_scaled,
    train_y,
    cat_dims=[1],
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = ExactMarginalLogLikelihood(
    model.likelihood,
    model
)

fit_deepkernel_mll(mll, lr=1e-2, epoch=300)
x = torch.arange(0,10,0.1).reshape(-1,1)
x1 = torch.cat([x, torch.zeros_like(x)], axis=1)
x2 = torch.cat([x, torch.ones_like(x)], axis=1)
x_norm1 = normalize(x1, bounds)
x_norm2 = normalize(x2, bounds)

pred1 = model.posterior(x_norm1)
pred1_mean = pred1.mean.detach().numpy().ravel()
pred1_std = pred1.variance.sqrt().detach().numpy().ravel()

pred2 = model.posterior(x_norm2)
pred2_mean = pred2.mean.detach().numpy().ravel()
pred2_std = pred2.variance.sqrt().detach().numpy().ravel()

plt.plot(x, pred1_mean, color='orange')
plt.fill_between(x.ravel(), pred1_mean+pred1_std, pred1_mean-pred1_std, color='orange', alpha=.4)

plt.plot(x, pred2_mean, color='green')
plt.fill_between(x.ravel(), pred2_mean+pred2_std, pred2_mean-pred2_std, color='green', alpha=.4)

plt.scatter(train_x[:,0], train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_120_1.png

14. DeepGP + DKL

DeepGPとDKを組み合わせたいと思います。

次のような関数からデータ点をサンプリングしてモデルの作成を行います。
まずは単目的モデルを作成します。

class DeepKernelDeepGPHiddenLayer(DeepGPHiddenLayer):
    """
    Deep Gaussian Processの隠れ層を表現するクラス。

    Args:
        input_dims (int): 入力次元数。
        output_dims (Optional[int]): 出力次元数。スカラ出力の場合はNone。
        num_inducing (int): 誘導点の数。デフォルトは128。
        mean_type (str): 平均関数の種類('constant'または'linear')。デフォルトは'constant'"""
    def __init__(
        self,
        input_dims,
        output_dims=None,
        num_inducing=128,
        mean_type='constant'
    ):
        super().__init__(input_dims, output_dims, num_inducing, mean_type)

        self.feature_extractor = LargeFeatureExtractor(
                input_dim=input_dims,
                output_dim=input_dims
        )
        self.scale_to_bounds = ScaleToBounds(-1., 1.)

    def forward(self, x):
        """
        フォワードパスで多変量正規分布を計算する。

        Args:
            x (Tensor): 入力テンソル。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
         # データを特徴抽出器で変換
        projected_x = self.feature_extractor(x)

        # スケーリングを適用
        projected_x = self.scale_to_bounds(projected_x)
        
        mean_x = self.mean_module(projected_x)
        covar_x = self.covar_module(projected_x)
        return MultivariateNormal(mean_x, covar_x)
class DeepKernelDeepGPModel(DeepGPModel):
    """
    Deep Gaussian Processモデルクラス。

    このモデルは複数の隠れ層を持つ深層ガウス過程を実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        list_hidden_dims (list): 隠れ層の次元リスト。デフォルトは[10, 10]。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        train_Yvar=None,
        likelihood = None,
        outcome_transform=None,
        list_hidden_dims=[10, 10],
    ):
        super().__init__(
            train_X,
            train_Y,
            train_Yvar,
            likelihood,
            outcome_transform,
            list_hidden_dims,
        )
        num_outputs = train_Y.shape[-1]
        # 最終層を定義
        self.last_layer = DeepKernelDeepGPHiddenLayer(
            input_dims=list_hidden_dims[-1],
            output_dims=None if num_outputs == 1 else num_outputs,
            mean_type="constant",  # 定数平均関数を使用
        )
def blackbox(x):
    return np.sin(x*2)+np.cos(x/2-2)-x*0.1+np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y = blackbox(x)

plt.plot(x, y);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_124_1.png

train_x = torch.rand(50,1, dtype=torch.float32)*10
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelDeepGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(VariationalELBO(model.likelihood, model, len(train_X), beta=.01))
fit_deepgp_mll(mll, lr=1e-2, epoch=200)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy().ravel()
pred_std = pred.variance.sqrt().detach().numpy().ravel()

plt.plot(x.ravel(), pred_mean, color='orange')
plt.fill_between(x.ravel(), pred_mean+pred_std, pred_mean-pred_std, color='orange', alpha=.4)
plt.scatter(train_x, train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_127_1.png

次に多目的を考えます。

def blackbox1(x):
    return np.sin(x[:, 0:1]*2) + np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.1 + np.random.rand(len(x), 1)

def blackbox2(x):
    return -0.8*np.sin(x[:, 0:1]*2) - 1.2*np.cos(x[:, 0:1]/2 - 2) - x[:, 0:1]*0.15 + np.random.rand(len(x), 1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1 = blackbox1(x)
y2 = blackbox2(x)
plt.plot(y1)
plt.plot(y2)

output_128_1.png

train_x = torch.rand(40,1, dtype=torch.float32)*10
train_y = torch.cat([blackbox1(train_x), blackbox2(train_x)], dim=1).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelDeepGPModel(
    train_x_scaled,
    train_y,
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(VariationalELBO(model.likelihood, model, len(train_X), beta=.01))
fit_deepgp_mll(mll, lr=1e-2, epoch=200)
x = torch.arange(0,10,0.1).reshape(-1,1)
x_norm = normalize(x, bounds)

pred = model.posterior(x_norm)
pred_mean = pred.mean.detach().numpy()
pred_std = pred.variance.sqrt().detach().numpy()

plt.plot(x.ravel(), pred_mean[:,0], color='orange')
plt.fill_between(x.ravel(), pred_mean[:,0]+pred_std[:,0], pred_mean[:,0]-pred_std[:,0], color='orange', alpha=.4)

plt.plot(x.ravel(), pred_mean[:,1], color='green')
plt.fill_between(x.ravel(), pred_mean[:,1]+pred_std[:,1], pred_mean[:,1]-pred_std[:,1], color='green', alpha=.4)

plt.scatter(train_x, train_y[:,0], color='red')
plt.scatter(train_x, train_y[:,1], color='purple')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_131_1.png

最後に連続値+カテゴリ値を考えます。

class DeepKernelDeepMixedGPHiddenLayer(DeepMixedGPHiddenLayer):
    """
    混合型データ(カテゴリカルデータと連続データ)に対応したDeep Gaussian Processの隠れ層クラス。

    Args:
        input_dims (int): 入力次元数。
        output_dims (Optional[int]): 出力次元数。スカラー出力の場合はNone。
        ord_dims (Sequence[int]): 連続データの次元インデックス。
        cat_dims (Sequence[int]): カテゴリカルデータの次元インデックス。
        aug_batch_shape (torch.Size): カーネルのバッチ形状。
        num_inducing (int): 誘導点の数。デフォルトは128。
        mean_type (str): 平均関数の種類('constant'または'linear')。デフォルトは'constant'"""
    def __init__(
        self,
        input_dims,
        output_dims,
        ord_dims,
        cat_dims,
        num_inducing=128,
        mean_type='constant',
        input_data=None
    ):
        super().__init__(input_dims,output_dims,ord_dims,cat_dims,num_inducing,mean_type,input_data)

        self.feature_extractor = LargeFeatureExtractor(
            input_dim=len(ord_dims),
            output_dim=len(ord_dims)
        )
        self.scale_to_bounds = ScaleToBounds(-1., 1.)

        self.cat_dims = cat_dims
        self.ord_dims = ord_dims
        
    def forward(self, x):
        """
        入力データに対して平均関数と共分散関数を適用し、多変量正規分布を計算します。

        Args:
            x (Tensor): 入力テンソル。

        Returns:
            MultivariateNormal: 平均と共分散を持つ多変量正規分布。
        """
        # 抜き出した列を取得
        extracted_columns = x[..., self.cat_dims]
        
        # 残りの列を取得
        remaining_columns = x[..., [i for i in range(x.size(-1)) if i not in self.cat_dims]]

        # データを特徴抽出器で変換
        projected_x = self.feature_extractor(remaining_columns)

        # スケーリングを適用
        projected_x = self.scale_to_bounds(projected_x)

        # 抜き出した列を戻す処理
        restored_tensor = remaining_columns.clone()  # 残りのテンソルをコピーして初期化
        for idx, col_idx in enumerate(self.cat_dims):
            restored_tensor = torch.cat((projected_x[..., :col_idx],
                                         extracted_columns[..., idx:idx+1],
                                         projected_x[..., col_idx:]), dim=len(x.shape)-1)
    
        mean_x = self.mean_module(restored_tensor)
        covar_x = self.covar_module(restored_tensor)
        return MultivariateNormal(mean_x, covar_x)
class DeepKernelDeepMixedGPModel(DeepMixedGPModel):
    """
    Deep Gaussian Processモデル(混合データ対応)。

    このモデルは、カテゴリデータと連続データの混在した入力を扱う深層ガウス過程モデルを実現します。

    Args:
        train_X (Tensor): 訓練データの入力。
        train_Y (Tensor): 訓練データの出力。
        cat_dims (Sequence[int]): 入力のカテゴリ次元のインデックス。
        train_Yvar (Optional[Tensor]): 観測ノイズの分散。デフォルトはNone。
        outcome_transform (Union[str, Standardize, None]): 出力変換。デフォルトは"DEFAULT"。
        list_hidden_dims (list): 隠れ層の次元リスト。デフォルトは[10, 10]。
    """
    def __init__(
        self,
        train_X,
        train_Y,
        cat_dims,
        train_Yvar=None,
        likelihood=None,
        outcome_transform=None,
        hidden_dim=10,
    ):
        super().__init__(train_X,train_Y,cat_dims,train_Yvar,likelihood,outcome_transform,hidden_dim)

        input_dim = train_X.shape[-1]
        d = train_X.shape[-1]
        cat_dims = normalize_indices(indices=cat_dims, d=d)
        ord_dims = sorted(set(range(d)) - set(cat_dims))

        self.input_layer = DeepKernelDeepMixedGPHiddenLayer(
            input_dims=input_dim,
            output_dims=hidden_dim,
            ord_dims=ord_dims,
            cat_dims=cat_dims,
            num_inducing=128,
            mean_type="linear",
            input_data=train_X
        )
def blackbox(x, noise=True):
    if type(x)==np.ndarray:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).astype(int)-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).astype(int) + int(noise) * np.random.rand(len(x),1)
    else:
        return np.sin(x[:,[0]]*2)+np.cos(x[:,[0]]/2-2)+np.sign((x[:,[1]]==1).int()-0.5)*x[:,[0]]*0.2 - 2*(x[:,[1]]==1).int() + int(noise) * np.random.rand(len(x),1)

x = np.arange(0,10,0.1).reshape(-1,1)
y1=blackbox(np.concatenate([x,np.zeros_like(x)], axis=1), noise=False)
y2=blackbox(np.concatenate([x,np.ones_like(x)], axis=1), noise=False)

plt.plot(x[:,0], y1);
plt.plot(x[:,0], y2);
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_135_1.png

train_x = torch.cat([torch.rand(30,1, dtype=torch.float32)*10, torch.tensor(np.random.choice([0,1], (30,1)))], dim=1)
train_y = blackbox(train_x).to(torch.float32)
bounds = torch.stack([
    torch.min(train_x, dim=0)[0],
    torch.max(train_x, dim=0)[0]
])

# 入力データを正規化
train_x_scaled = normalize(train_x, bounds=bounds).to(torch.float32)

# モデルの作成
model = DeepKernelDeepMixedGPModel(
    train_x_scaled,
    train_y,
    cat_dims=[1],
    outcome_transform=Standardize(
        m=train_y.size(-1)
    )
)

# モデルの最尤推定を行う
mll = DeepApproximateMLL(VariationalELBO(model.likelihood, model, len(train_X), beta=.01))
fit_deepgp_mll(mll, lr=1e-2, epoch=200)
x = torch.arange(0,10,0.1).reshape(-1,1)
x1 = torch.cat([x, torch.zeros_like(x)], axis=1)
x2 = torch.cat([x, torch.ones_like(x)], axis=1)
x_norm1 = normalize(x1, bounds)
x_norm2 = normalize(x2, bounds)

pred1 = model.posterior(x_norm1)
pred1_mean = pred1.mean.detach().numpy().ravel()
pred1_std = pred1.variance.sqrt().detach().numpy().ravel()

pred2 = model.posterior(x_norm2)
pred2_mean = pred2.mean.detach().numpy().ravel()
pred2_std = pred2.variance.sqrt().detach().numpy().ravel()

plt.plot(x, pred1_mean, color='orange')
plt.fill_between(x.ravel(), pred1_mean+pred1_std, pred1_mean-pred1_std, color='orange', alpha=.4)

plt.plot(x, pred2_mean, color='green')
plt.fill_between(x.ravel(), pred2_mean+pred2_std, pred2_mean-pred2_std, color='green', alpha=.4)

plt.scatter(train_x[:,0], train_y, color='red')
plt.xlabel('x', size=20)
plt.ylabel('y', size=20)

output_138_1.png

15. 分類GP(Classification GP)

概要:
BoTorch では分類タスクに特化した BernoulliLikelihood を用いて GP 分類モデルを構築することができます。ユーザー定義の ClassifierGP などで拡張的に実装されることが多いです。

主な用途:

  • 実験成功/失敗のモデリング
  • バイナリ分類による設計領域の分割

特徴:

  • 出力は [0,1] の確率として解釈される
  • 獲得関数も UCB などを分類用に拡張可能

次のように三角関数でサンプリングされたデータで、しきい値を決めて二値分類モデルを作成します。

import gpytorch
from gpytorch.models import ApproximateGP
from gpytorch.variational import CholeskyVariationalDistribution, VariationalStrategy
from gpytorch.likelihoods import BernoulliLikelihood
from gpytorch.mlls import VariationalELBO
from botorch.models.model import Model
from botorch.posteriors import Posterior
from botorch.posteriors.torch import TorchPosterior
from botorch.acquisition import AcquisitionFunction
from botorch.utils.transforms import t_batch_mode_transform
from torch.distributions import Bernoulli
from botorch.posteriors import Posterior
from typing import Optional
from torch import cdist

import torch
from torch import Tensor
from torch.nn import Module
from typing import Optional

import gpytorch
from gpytorch.models import ApproximateGP
from gpytorch.variational import CholeskyVariationalDistribution, VariationalStrategy
from gpytorch.likelihoods import BernoulliLikelihood
from gpytorch.mlls.variational_elbo import VariationalELBO
from gpytorch.distributions import MultivariateNormal

from botorch.models.model import Model
from botorch.models.utils import validate_input_scaling
from botorch.posteriors import Posterior
from botorch.utils.transforms import t_batch_mode_transform
from botorch.utils.transforms import unnormalize, normalize
from botorch.optim import optimize_acqf
import matplotlib.pyplot as plt

from botorch.sampling.get_sampler import GetSampler
from botorch.posteriors import Posterior
class SimpleBernoulliPosterior(Posterior):
    def __init__(self, mean: torch.Tensor, variance: torch.Tensor):
        self._mean = mean
        self._variance = variance

    @property
    def mean(self) -> torch.Tensor:
        return self._mean

    @property
    def variance(self) -> torch.Tensor:
        return self._variance

    @property
    def device(self):
        return self._mean.device

    @property
    def dtype(self):
        return self._mean.dtype

    def rsample(self, sample_shape: torch.Size = torch.Size([1])) -> torch.Tensor:
        return torch.distributions.Normal(self.mean, self.variance.sqrt()).rsample(sample_shape)

@GetSampler.register(SimpleBernoulliPosterior)
def get_sampler_for_simple_bernoulli(posterior: SimpleBernoulliPosterior, sample_shape: torch.Size, seed=None):
    # 従来の MCSampler 継承を避ける:BoTorch古い環境でも動作
    class SimpleBernoulliSampler:
        def __call__(self, posterior: Posterior) -> torch.Tensor:
            return posterior.rsample(sample_shape)

    return SimpleBernoulliSampler()

class GPClassificationModel(ApproximateGP):
    def __init__(self, train_x, train_y):
        num_inducing = min(train_x.shape[0], 20)  # 20点以下にする
        variational_distribution = CholeskyVariationalDistribution(num_inducing_points=num_inducing)
        inducing_points = train_x[:num_inducing].clone()

        variational_strategy = VariationalStrategy(
            self,
            inducing_points=inducing_points,
            variational_distribution=variational_distribution,
        )
        super().__init__(variational_strategy)

        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())
        
        self.train_inputs = train_x
        self.train_targets = train_y

    def forward(self, x):
        mean = self.mean_module(x)
        covar = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean, covar)


class ClassifierGP(Model):
    def __init__(
        self,
        train_X: Tensor,
        train_Y: Tensor,
        # input_transform: Optional[Module] = None,
        model: Optional[ApproximateGP] = None,
        likelihood: Optional[BernoulliLikelihood] = None,
    ):
        super().__init__()

        # if input_transform is not None:
        #     train_X = input_transform(train_X)
        # self.input_transform = input_transform

        self.train_inputs = train_X
        self.train_targets = train_Y

        self.model = model if model is not None else GPClassificationModel(train_X, train_Y)
        self.likelihood = likelihood if likelihood is not None else BernoulliLikelihood()

        self.to(train_X)

    def forward(self, X: Tensor):
        # if self.input_transform is not None:
        #     X = self.input_transform(X)
        return self.model(X)

    def posterior(
        self,
        X: Tensor,
        observation_noise: bool = False,
        **kwargs
    ) -> Posterior:
        self.model.eval()
        self.likelihood.eval()

        # if self.input_transform is not None:
        #     X = self.input_transform(X)

        preds = self.likelihood(self.model(X))
        p = preds.mean
        var = p * (1 - p)
        # var = preds.variance
        posterior = SimpleBernoulliPosterior(mean=p.unsqueeze(-1), variance=var.unsqueeze(-1))

        return posterior

    @property
    def num_outputs(self) -> int:
        return 1

def fit_classifier_mll(mll):
    mll.model.train()
    mll.likelihood.train()
    
    optimizer = torch.optim.Adam(mll.model.parameters(), lr=0.01)

    for i in range(300):
        optimizer.zero_grad()
        output = mll.model(mll.model.train_inputs)
        loss = -mll(output, mll.model.train_targets)
        loss.backward()
        optimizer.step()
train_X = (torch.rand(25, 1)-0.5)*4
# train_Y = torch.sinc(2*train_X) + 0.1 * torch.randn_like(train_X)
train_Y = torch.sin(3*train_X) + 0.1 * torch.randn_like(train_X)

train_Y = (train_Y > 0.3).float().squeeze()

plt.scatter(train_X,train_Y)

output_142_1.png

model = ClassifierGP(train_X, train_Y)
mll = VariationalELBO(model.likelihood, model.model, num_data=train_Y.shape[0])
fit_classifier_mll(mll)
xx = torch.arange(-2,2,0.1)
pred_mean = model.posterior(xx).mean.detach().numpy().ravel()
pred_std = model.posterior(xx).variance.sqrt().detach().numpy().ravel()

plt.plot(xx,pred_mean)
plt.fill_between(xx,pred_mean+pred_std,pred_mean-pred_std, alpha=.25)

output_144_1.png

これでモデル作成に関しては以上となります。
今回の実装では精度や安定性が不十分であったため改善の必要があります。
公式の実装を切望します。

ご意見あればコメントお願いします

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?