LoginSignup
6
1

More than 3 years have passed since last update.

ノンパラメトリックモデルのためのオンライン学習(実装編)

Last updated at Posted at 2018-12-23

初めに

本記事は,ノンパラメトリックモデルのためのオンライン学習(理論編)の続編です.

以下を参考に,BSGD を scikit-learn に準拠した形で実装します.

  • sklearn.linear_model.SGDRegressor
  • sklearn.svm.SVR

実装

最適化手法

早速,BSGD に相当する関数 bsgd を実装します.tqdm.tqdmtqdm.trange を利用することで,学習の進捗を表示できるようになります.

bsgd
kernel_model.py
from typing import Tuple

import numpy as np
from sklearn.utils import gen_batches
from sklearn.utils import resample
from sklearn.utils import safe_indexing
from tqdm import tqdm
from tqdm import trange


def bsgd(
    X: np.ndarray,
    y: np.ndarray,
    sample_weight: np.ndarray,
    seed: int,
    support_vectors: np.ndarray,
    dual_coef: np.ndarray,
    intercept: float,
    t: int,
    alpha: float = 1e-04,
    coef0: float = 0.0,
    degree: int = 3,
    epsilon: float = 0.1,
    eta0: float = 0.01,
    fit_intercept: bool = True,
    gamma: float = None,
    kernel: str = 'linear',
    learning_rate: str = 'invscaling',
    loss: str = 'squared_loss',
    max_iter: int = 5,
    max_support: int = 100,
    power_t: float = 0.25,
    shuffle: bool = True,
    verbose: int = 0
) -> Tuple[np.ndarray, np.ndarray, float, int]:
    """Budgeted Stochastic Gradient Descent (BSGD).

    Parameters
    ----------
    X
        Training Data.

    y
        Target values.

    sample_weight
        Weights applied to individual samples.

    seed
        Seed of the pseudo random number generator.

    support_vectors
        Initial support vectors.

    dual_coef
        Initial coefficients of support vectors.

    intercept
        Initial intercept term.

    t
        Initial round.

    alpha
        Regularization parameter.

    coef0
        Independent term in the polynomial (or sigmoid) kernel function.

    degree
        Degree of the polynomial kernel function.

    epsilon
        Epsilon in the epsilon-insensitive (or Huber) loss functions.

    eta0
        Initial learning rate.

    fit_intercept
        If True, fit the intercept.

    gamma
        Kernel coefficient for 'rbf', 'poly' and 'sigmoid'.

    kernel
        Used kernel function.

    learning_rate
        Learning rate schedule.

    loss
        Used Loss function.

    max_iter
        Maximum number of epochs.

    max_support
        Maximum number of support vectors (a.k.a. budget).

    power_t
        Exponent for inverse scaling learning rate.

    shuffle
        If True, shuffle the data.

    verbose
        Verbosity level.

    Returns
    -------
    support_vectors
        Support vectors.

    dual_coef
        Coefficients of support vectors.

    intercept
        Intercept term.

    t
        Round.
    """

    random_state = np.random.RandomState(seed)
    loss_function = _get_loss_function(loss=loss, epsilon=epsilon)
    disable = verbose <= 0
    epochs = trange(max_iter, desc='epoch loop', disable=disable)
    n_samples = len(X)
    generator = gen_batches(n_samples, 1)
    batches = tqdm(generator, desc='round loop', disable=disable, leave=False)

    for _ in epochs:
        if shuffle:
            indices = np.arange(n_samples)

            random_state.shuffle(indices)

            X = safe_indexing(X, indices)
            y = safe_indexing(y, indices)
            sample_weight = safe_indexing(sample_weight, indices)

        for batch in batches:
            X_batch = safe_indexing(X, batch)
            y_batch = safe_indexing(y, batch)
            sample_weight_batch = safe_indexing(sample_weight, batch)
            y_score = _decision_function(
                X_batch,
                support_vectors,
                dual_coef,
                intercept,
                coef0=coef0,
                degree=degree,
                gamma=gamma,
                kernel=kernel
            )

            eta = _get_eta(
                t,
                eta0=eta0,
                learning_rate=learning_rate,
                power_t=power_t
            )

            dual_coef -= eta * alpha * dual_coef

            dloss = loss_function.dloss(y_score, y_batch)
            update = - sample_weight_batch * eta * dloss

            if update != 0.0:
                condition = np.all(support_vectors == X_batch, axis=1)
                indices = np.flatnonzero(condition)

                if indices.size > 0:
                    index = indices[0]
                    dual_coef[index] += update
                else:
                    support_vectors = np.append(
                        support_vectors,
                        X_batch,
                        axis=0
                    )
                    dual_coef = np.append(dual_coef, update)

                if fit_intercept:
                    intercept += update

                if verbose > 1:
                    tqdm.write(f'added the support vector', file=sys.stderr)

            n_SV = len(support_vectors)

            if n_SV > max_support:
                abs_dual_coef = np.abs(dual_coef)
                removed = np.argmin(abs_dual_coef)
                mask = np.ones(n_SV, dtype=bool)
                mask[removed] = False
                support_vectors = support_vectors[mask]
                dual_coef = dual_coef[mask]

                if verbose > 1:
                    tqdm.write(
                        f'removed the {removed + 1}-th support vector',
                        file=sys.stderr
                    )

            t += 1

    return support_vectors, dual_coef, intercept, t

_get_loss_function は,scikit-learn が提供する損失関数クラスのインスタンスを取得する関数です.二乗損失関数,$\varepsilon$-許容損失関数,Huber 損失のいずれかを取得します.

_get_loss_function
kernel_model.py
from sklearn.linear_model.sgd_fast import EpsilonInsensitive
from sklearn.linear_model.sgd_fast import Huber
from sklearn.linear_model.sgd_fast import LossFunction
from sklearn.linear_model.sgd_fast import SquaredLoss

LOSS_CLASSES = {
    'epsilon_insensitive': EpsilonInsensitive,
    'huber': Huber,
    'squared_loss': SquaredLoss
}


def _get_loss_function(
    loss: str = 'squared_loss',
    epsilon: float = 0.1
) -> LossFunction:

    loss_class = LOSS_CLASSES[loss]

    if loss in ['epsilon_insensitive', 'huber']:
        return loss_class(epsilon)

    return loss_class()

_get_eta は,学習率を取得する関数です.

_get_eta
kernel_model.py
def _get_eta(
    t: int,
    eta0: float = 0.01,
    learning_rate: str = 'invscaling',
    power_t: float = 0.25
) -> float:

    if learning_rate == 'constant':
        return eta0

    return eta0 / t ** power_t

_decision_function は,仮説に相当する関数です.sklearn.metrics.pairwise.pairwise_kernels を利用することで,任意のカーネル関数を指定できるようになります.

_decision_function
kernel_model.py
import numpy as np
from sklearn.metrics.pairwise import pairwise_kernels


def _decision_function(
    X: np.ndarray,
    support_vectors: np.ndarray,
    dual_coef: np.ndarray,
    intercept: float,
    coef0: float = 0.0,
    degree: int = 3,
    gamma: float = None,
    kernel: str = 'linear'
) -> np.ndarray:

    n_SV = len(support_vectors)

    if n_SV == 0:
        n_samples = len(X)

        return np.zeros(n_samples)

    K = pairwise_kernels(
        X,
        support_vectors,
        coef0=coef0,
        degree=degree,
        filter_params=True,
        gamma=gamma,
        metric=kernel
    )

    return K @ dual_coef + intercept

推定器

続いて,sklearn.base.BaseEstimatorabc.ABC を継承した scikit-learn 準拠の抽象基底クラス BaseBSGD を実装します.学習が終えたかどうかを確認するため,_check_is_fitted メソッドを用意しています.また,パラメータが不正な値でないかどうかを検証するため,_check_params メソッドと _check_sample_weight メソッドを用意しています.更に,再学習時に属性を初期化するため,_reset メソッドを用意しています.

BSGD
kernel_model.py
from abc import ABC
from abc import abstractmethod
from typing import Union

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_is_fitted


class BaseSGD(BaseEstimator, ABC):
    @abstractmethod
    def __init__(
        self,
        loss: str,
        alpha: float = 1e-04,
        coef0: float = 0.0,
        degree: int = 3,
        epsilon: float = 0.1,
        eta0: float = 0.01,
        fit_intercept: bool = True,
        gamma: float = None,
        kernel: str = 'linear',
        learning_rate: str = 'invscaling',
        max_iter: int = 5,
        max_support: int = 100,
        power_t: float = 0.25,
        random_state: Union[int, np.random.RandomState] = None,
        shuffle: bool = True,
        verbose: int = 0,
        warm_start: bool = False
    ) -> None:

        self.alpha = alpha
        self.coef0 = coef0
        self.degree = degree
        self.epsilon = epsilon
        self.eta0 = eta0
        self.fit_intercept = fit_intercept
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.loss = loss
        self.kernel = kernel
        self.max_iter = max_iter
        self.max_support = max_support
        self.power_t = power_t
        self.random_state = random_state
        self.shuffle = shuffle
        self.verbose = verbose
        self.warm_start = warm_start

    @abstractmethod
    def _partial_fit(
        self,
        X: np.ndarray,
        y: np.ndarray,
        sample_weight: np.ndarray,
        max_iter: int
    ) -> BaseEstimator:

        pass

    def _check_is_fitted(self) -> None:
        check_is_fitted(
            self,
            ['dual_coef_', 'intercept_', 'support_vectors_', 't_']
        )

    def _check_params(self) -> None:
        if self.alpha < 0.0:
            raise ValueError(f'alpha must be >= 0, got {self.alpha}')

        if self.eta0 <= 0.0:
            raise ValueError(f'eta0 must be > 0, got {self.eta0}')

        if self.eta0 * self.alpha >= 1.0:
            raise ValueError(f'eta0 must be < 1 / alpha, got {self.eta0}')

        if type(self.fit_intercept) is not bool:
            raise ValueError(f'fit_intercept must be either True or False')

        if self.learning_rate not in ('constant', 'invscaling'):
            raise ValueError(
                f'learning rate {self.learning_rate} is not supported'
            )

        if self.loss not in self._losses:
            raise ValueError(f'loss {self.loss} is not supported')

        if self.max_iter <= 0:
            raise ValueError(f'max_iter must be > 0, got {self.max_iter}')

        if self.max_support <= 0:
            raise ValueError(
                f'max_support must be > 0, got {self.max_support}'
            )

        if self.power_t < 0.0:
            raise ValueError(f'power_t must be >= 0, got {self.power_t}')

        if type(self.shuffle) is not bool:
            raise ValueError(f'shuffle must be either True or False')

        if type(self.warm_start) is not bool:
            raise ValueError(f'warm_start must be either True or False')

    def _check_sample_weight(self, sample_weight, n_samples):
        if sample_weight is None:
            sample_weight = np.ones(n_samples)
        else:
            sample_weight = np.asarray(sample_weight)

        if sample_weight.ndim != 1:
            raise ValueError(f'sample_weight must be a 1D array')

        if sample_weight.size != n_samples:
            raise ValueError(f'the size of sample_weight must be {n_samples}')

        if np.any(sample_weight < 0):
            raise ValueError(f'individual weights for each sample must be >= 0')

    return sample_weight

    def _reset(self) -> None:
        if not self.warm_start and hasattr(self, 'support_vectors_'):
            del self.support_vectors_
            del self.dual_coef_
            del self.intercept_

        self.t_ = 1

    def fit(
        self,
        X: np.ndarray,
        y: np.ndarray,
        sample_weight: np.ndarray = None
    ) -> BaseEstimator:
        """Fit the model according to the given training data.

        Parameters
        ----------
        X
            Training data.

        y
            Target values.

        sample_weight
            Weights applied to individual samples.

        Returns
        -------
        self
            Return self.
        """

        self._reset()

        return self._partial_fit(X, y, sample_weight, self.max_iter)

    def partial_fit(
        self,
        X: np.ndarray,
        y: np.ndarray,
        sample_weight: np.ndarray = None
    ) -> BaseEstimator:
        """Fit the model according to the given training data.

        Parameters
        ----------
        X
            Training data.

        y
            Target values.

        sample_weight
            Weights applied to individual samples.

        Returns
        -------
        self
            Return self.
        """

        return self._partial_fit(X, y, sample_weight, 1)

    @abstractmethod
    def predict(
        self,
        X: np.ndarray
    ) -> np.ndarray:
        """Predict using the Fitted model.

        Parameters
        ----------
        X
            Data.

        Returns
        -------
        y_pred
            Predicted values.
        """

        pass

最後に,回帰器 BSGDRegressor を実装して完成です.

BSGDRegressor
kernel_model.py
from typing import Union

import numpy as np
from sklearn.base import BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.utils import check_array
from sklearn.utils import check_random_state
from sklearn.utils import check_X_y


class BSGDRegressor(BaseSGD, RegressorMixin):
    """BSGD Regressor.

    Parameters
    ----------
    alpha
        Regularization parameter.

    coef0
        Independent term in the polynomial (or sigmoid) kernel function.

    degree
        Degree of the polynomial kernel function.

    epsilon
        Epsilon in the epsilon-insensitive (or Huber) loss functions.

    eta0
        Initial learning rate.

    fit_intercept
        If True, fit the intercept.

    gamma
        Kernel coefficient for 'rbf', 'poly' and 'sigmoid'.

    kernel
        Used kernel function.

    learning_rate
        Learning rate schedule.

    loss
        Used Loss function.

    max_iter
        Maximum number of epochs.

    max_support
        Maximum number of support vectors (a.k.a. budget).

    power_t
        Exponent for inverse scaling learning rate.

    random_state
        Seed of the pseudo random number generator.

    shuffle
        If True, shuffle the data.

    verbose
        Verbosity level.

    warm_start
        If True, reuse the solution of the previous call to fit as
        initialization.

    Attributes
    ----------
    support_vectors_
        Support vectors.

    dual_coef_
        Coefficients of support vectors.

    intercept_
        Intercept term.

    t_
        Round.

    Examples
    --------
    >>> from mllib.bsgd import BSGDRegressor
    >>> from sklearn.datasets import load_boston
    >>> reg = BSGDRegressor(kernel='rbf')
    >>> X, y = load_boston(return_X_y=True)
    >>> reg.fit(X, y) # doctest: +ELLIPSIS
    BSGDRegressor(...)
    >>> y_pred = reg.predict(X)

    References
    ----------
    Wang, Z., Crammer, K. and Vucetic, S.
    Breaking the curse of kernelization: Budgeted stochastic gradient descent
    for large-scale svm training.
    Journal of Machine Learning Research (JMLR), Vol. 13, pp. 3103-3131, 2012.
    """

    _losses = ['epsilon_insensitive', 'huber', 'squared_loss']

    def __init__(
        self,
        alpha: float = 1e-04,
        coef0: float = 0.0,
        degree: int = 3,
        epsilon: float = 0.1,
        eta0: float = 0.01,
        fit_intercept: bool = True,
        gamma: float = None,
        kernel: str = 'linear',
        learning_rate: str = 'invscaling',
        loss: str = 'squared_loss',
        max_iter: int = 5,
        max_support: int = 100,
        power_t: float = 0.25,
        random_state: Union[int, np.random.RandomState] = None,
        shuffle: bool = True,
        verbose: int = 0,
        warm_start: bool = False
    ) -> None:

        super().__init__(
            alpha=alpha,
            coef0=coef0,
            degree=degree,
            epsilon=epsilon,
            eta0=eta0,
            fit_intercept=fit_intercept,
            gamma=gamma,
            kernel=kernel,
            learning_rate=learning_rate,
            loss=loss,
            max_iter=max_iter,
            max_support=max_support,
            power_t=power_t,
            random_state=random_state,
            shuffle=shuffle,
            verbose=verbose,
            warm_start=warm_start
        )

    def _partial_fit(
        self,
        X: np.ndarray,
        y: np.ndarray,
        sample_weight: np.ndarray,
        max_iter: int
    ) -> BaseEstimator:

        self._check_params()

        X, y = check_X_y(X, y, estimator=self)
        n_samples, n_features = X.shape
        sample_weight = self._check_sample_weight(sample_weight, n_samples)
        random_state = check_random_state(self.random_state)
        seed = random_state.randint(0, np.iinfo(np.int32).max)

        if not hasattr(self, 'support_vectors_'):
            self.support_vectors_ = np.empty((0, n_features))
            self.dual_coef_ = np.empty(0)
            self.intercept_ = 0.0

        if not hasattr(self, 't_'):
            self.t_ = 1

        self.support_vectors_, self.dual_coef_, self.intercept_, self.t_ = \
            bsgd(
                X,
                y,
                sample_weight,
                seed,
                self.support_vectors_,
                self.dual_coef_,
                self.intercept_,
                self.t_,
                alpha=self.alpha,
                coef0=self.coef0,
                degree=self.degree,
                epsilon=self.epsilon,
                eta0=self.eta0,
                fit_intercept=self.fit_intercept,
                gamma=self.gamma,
                kernel=self.kernel,
                learning_rate=self.learning_rate,
                loss=self.loss,
                max_iter=max_iter,
                max_support=self.max_support,
                power_t=self.power_t,
                shuffle=self.shuffle,
                verbose=self.verbose
            )

        return self

    def predict(
        self,
        X: np.ndarray
    ) -> np.ndarray:
        """Predict using the Fitted model.

        Parameters
        ----------
        X
            Data.

        Returns
        -------
        y_pred
            Predicted values.
        """

        self._check_is_fitted()

        X = check_array(X, estimator=self)

        return _decision_function(
            X,
            self.support_vectors_,
            self.dual_coef_,
            self.intercept_,
            coef0=self.coef0,
            degree=self.degree,
            gamma=self.gamma,
            kernel=self.kernel
        )

同様に,BSGDsklearn.base.ClassifierMixin を継承することで,BSGDClassifier を実装することができます.

動作確認

sklearn.utils.esetimator_checks.check_estimator を利用することで,正常に動作することを確認します.

kernel_model.py
if '__name__' == '__main__':
    from sklearn.utils.estimator_checks import check_estimator

    check_estimator(BSGDRegressor)

終わりに

次回は,BSGD の性能を検証するノンパラメトリックモデルのためのオンライン学習(実験編)をお届けします.

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1