0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Optunaお試し

Last updated at Posted at 2025-09-12

二値分類を想定。
動作確認中

from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable, Tuple, Dict, List
import numpy as np
import optuna
import lightgbm as lgb
# lgb.register_logger(None)   # undisplay print information
from lightgbm import LGBMClassifier
from sklearn.metrics import (
    f1_score,
    precision_recall_fscore_support,
    confusion_matrix,
)
from sklearn.model_selection import StratifiedKFold


def mean_f1_with_global_threshold(
    params: dict[str, Any],
    X: np.ndarray,
    y: np.ndarray,
    cv: Any,
    thresholds: Iterable[float],
    splits: List[Tuple[np.ndarray, np.ndarray]] | None = None,
) -> Tuple[float, float, Dict[str, Any]]:
    """
    Train on each fold
    Apply the *same* threshold t to every fold.
    """
    # Early stopping for correlated metric
    callbacks = [
        lgb.early_stopping(
            stopping_rounds=50,
            first_metric_only=True,
        ),
        lgb.log_evaluation(period=0)
    ]

    # Fixed or provided splits
    if splits is None:
        skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=params.get("random_state", 42))
        splits = [(train_idx, valid_idx) for train_idx, valid_idx in skf.split(X, y)]

    # Cache per-fold predictions and truths
    fold_predictions: list[np.ndarray] = []
    fold_truths: list[np.ndarray] = []
    fold_best_iterations: list[int] = []

    for train_idx, valid_idx in splits:
        X_train, X_valid = X[train_idx], X[valid_idx]
        y_train, y_valid = y[train_idx], y[valid_idx]

        model = LGBMClassifier(**params)
        model.fit(
            X_train, y_train,
            eval_set=[(X_valid, y_valid)],
            eval_metric="binary_logloss",
            callbacks=callbacks,
        )
        best_iteration = int(getattr(model, "best_iteration_", None) or model.n_estimators_)
        prob_valid = model.predict_proba(X_valid, num_iteration=best_iteration)[:, 1]

        fold_predictions.append(prob_valid)
        fold_truths.append(y_valid.astype(int))
        fold_best_iterations.append(best_iteration)

    # ---------- Threshold candidates (percentiles) ----------
    candidate_thresholds = np.linspace(0.05, 0.95, 181)
    # pooled_predictions = np.concatenate(fold_predictions)
    # candidate_thresholds = np.quantile(pooled_predictions, np.linspace(0.05, 0.95, 121))
    # -------------------------------------------------------

    # Find best threshold by F1
    best_threshold: float = 0.5
    best_mean_f1: float = -1.0
    best_per_fold_f1: list[float] = []

    for threshold in candidate_thresholds:
        f1s: list[float] = []
        for prob, y_true in zip(fold_predictions, fold_truths):
            y_hat = np.greater(prob, threshold).astype(int)
            f1s.append(f1_score(y_true, y_hat, zero_division=0))
        mean_f1 = float(np.mean(f1s))

        if mean_f1 > best_mean_f1:
            best_mean_f1 = mean_f1
            best_threshold = float(threshold)
            best_per_fold_f1 = f1s

    # After the loop: compute other metrics ONCE at best_threshold
    per_fold_precision: list[float] = []
    per_fold_recall: list[float] = []
    per_fold_cm: list[np.ndarray] = []

    for prob, y_true in zip(fold_predictions, fold_truths):
        y_hat = np.greater(prob, best_threshold).astype(int)
        prec, rec, _, _ = precision_recall_fscore_support(
            y_true, y_hat, average="binary", zero_division=0
        )
        cm = confusion_matrix(y_true, y_hat, labels=[0, 1])

        per_fold_precision.append(prec)
        per_fold_recall.append(rec)
        per_fold_cm.append(cm)

    report: Dict[str, Any] = {
        "per_fold_f1": best_per_fold_f1,
        "per_fold_precision": per_fold_precision,
        "per_fold_recall": per_fold_recall,
        "per_fold_confusion_matrix": per_fold_cm,
        "per_fold_best_iterations": fold_best_iterations,
        "threshold_candidates_count": int(len(candidate_thresholds)),
    }

    return best_mean_f1, best_threshold, report


def optuna_lgbm_mean_f1(
    X: np.ndarray,
    y: np.ndarray,
    n_trials: int,
    random_state: int | None,
) -> Tuple[dict[str, Any], float, float, optuna.Study, Dict[str, Any]]:
    """
    Optuna search maximizing CV-mean F1 with a single global threshold across folds.
    Returns: (best_params, best_cv_mean_f1, best_threshold, study, best_report)
    """
    # Fixed splits
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)
    splits = [(train_idx, valid_idx) for train_idx, valid_idx in skf.split(X, y)]

    # scale_pos_weight
    pos = int(np.sum(y == 1))
    neg = int(np.sum(y == 0))
    scale_pos_weight = float(neg / max(pos, 1))

    def objective(trial: optuna.Trial) -> float:
    params: dict[str, Any] = {
        "objective": "binary",
        "boosting_type": "gbdt",
        "n_estimators": 2000,
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
        "num_leaves": trial.suggest_int("num_leaves", 16, 255),
        "max_depth": trial.suggest_int("max_depth", -1, 10),
        "min_child_samples": trial.suggest_int("min_child_samples", 10, 300),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), # feature_fraction
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),               # bagging_fraction
        "subsample_freq": trial.suggest_int("subsample_freq", 0, 6),           # bagging_freq
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 100.0, log=True),  # lambda_l1
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 100.0, log=True),# lambda_l2
        "min_split_gain": trial.suggest_float("min_split_gain", 0.0, 1.0),     # min_gain_to_split
        "n_jobs": -1,
        "random_state": random_state if random_state is not None else 42,
        "scale_pos_weight": scale_pos_weight,
    }
        mean_f1, best_threshold, _ = mean_f1_with_global_threshold(
            params=params,
            X=X,
            y=y,
            cv=skf,
            thresholds=np.array([0.5]),  # placeholder; rebuilt inside
            splits=splits,
        )
        trial.set_user_attr("threshold", best_threshold)
        return mean_f1

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

    best_params = study.best_params
    best_params = {
        **best_params,
        "objective": "binary",
        "boosting_type": "gbdt",
        "n_estimators": 2000,
        "n_jobs": -1,
        "random_state": random_state if random_state is not None else 42,
        "scale_pos_weight": scale_pos_weight,
    }

    best_mean_f1, best_threshold, best_report = mean_f1_with_global_threshold(
        params=best_params,
        X=X,
        y=y,
        cv=skf,
        thresholds=np.array([0.5]),  # placeholder
        splits=splits,
    )

    return best_params, float(best_mean_f1), float(best_threshold), study, best_report

使い方

best_params, cv_mean_f1, best_threshold, study, report = optuna_lgbm_mean_f1(
    X, y,
    n_trials=100,
    random_state=42,
)
final_model = LGBMClassifier(**best_params).fit(X, y)
y_hat = (final_model.predict_proba(X_test)[:, 1] > best_threshold).astype(int)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?