二値分類を想定。
動作確認中
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable, Tuple, Dict, List
import numpy as np
import optuna
import lightgbm as lgb
# lgb.register_logger(None) # undisplay print information
from lightgbm import LGBMClassifier
from sklearn.metrics import (
f1_score,
precision_recall_fscore_support,
confusion_matrix,
)
from sklearn.model_selection import StratifiedKFold
def mean_f1_with_global_threshold(
params: dict[str, Any],
X: np.ndarray,
y: np.ndarray,
cv: Any,
thresholds: Iterable[float],
splits: List[Tuple[np.ndarray, np.ndarray]] | None = None,
) -> Tuple[float, float, Dict[str, Any]]:
"""
Train on each fold
Apply the *same* threshold t to every fold.
"""
# Early stopping for correlated metric
callbacks = [
lgb.early_stopping(
stopping_rounds=50,
first_metric_only=True,
),
lgb.log_evaluation(period=0)
]
# Fixed or provided splits
if splits is None:
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=params.get("random_state", 42))
splits = [(train_idx, valid_idx) for train_idx, valid_idx in skf.split(X, y)]
# Cache per-fold predictions and truths
fold_predictions: list[np.ndarray] = []
fold_truths: list[np.ndarray] = []
fold_best_iterations: list[int] = []
for train_idx, valid_idx in splits:
X_train, X_valid = X[train_idx], X[valid_idx]
y_train, y_valid = y[train_idx], y[valid_idx]
model = LGBMClassifier(**params)
model.fit(
X_train, y_train,
eval_set=[(X_valid, y_valid)],
eval_metric="binary_logloss",
callbacks=callbacks,
)
best_iteration = int(getattr(model, "best_iteration_", None) or model.n_estimators_)
prob_valid = model.predict_proba(X_valid, num_iteration=best_iteration)[:, 1]
fold_predictions.append(prob_valid)
fold_truths.append(y_valid.astype(int))
fold_best_iterations.append(best_iteration)
# ---------- Threshold candidates (percentiles) ----------
candidate_thresholds = np.linspace(0.05, 0.95, 181)
# pooled_predictions = np.concatenate(fold_predictions)
# candidate_thresholds = np.quantile(pooled_predictions, np.linspace(0.05, 0.95, 121))
# -------------------------------------------------------
# Find best threshold by F1
best_threshold: float = 0.5
best_mean_f1: float = -1.0
best_per_fold_f1: list[float] = []
for threshold in candidate_thresholds:
f1s: list[float] = []
for prob, y_true in zip(fold_predictions, fold_truths):
y_hat = np.greater(prob, threshold).astype(int)
f1s.append(f1_score(y_true, y_hat, zero_division=0))
mean_f1 = float(np.mean(f1s))
if mean_f1 > best_mean_f1:
best_mean_f1 = mean_f1
best_threshold = float(threshold)
best_per_fold_f1 = f1s
# After the loop: compute other metrics ONCE at best_threshold
per_fold_precision: list[float] = []
per_fold_recall: list[float] = []
per_fold_cm: list[np.ndarray] = []
for prob, y_true in zip(fold_predictions, fold_truths):
y_hat = np.greater(prob, best_threshold).astype(int)
prec, rec, _, _ = precision_recall_fscore_support(
y_true, y_hat, average="binary", zero_division=0
)
cm = confusion_matrix(y_true, y_hat, labels=[0, 1])
per_fold_precision.append(prec)
per_fold_recall.append(rec)
per_fold_cm.append(cm)
report: Dict[str, Any] = {
"per_fold_f1": best_per_fold_f1,
"per_fold_precision": per_fold_precision,
"per_fold_recall": per_fold_recall,
"per_fold_confusion_matrix": per_fold_cm,
"per_fold_best_iterations": fold_best_iterations,
"threshold_candidates_count": int(len(candidate_thresholds)),
}
return best_mean_f1, best_threshold, report
def optuna_lgbm_mean_f1(
X: np.ndarray,
y: np.ndarray,
n_trials: int,
random_state: int | None,
) -> Tuple[dict[str, Any], float, float, optuna.Study, Dict[str, Any]]:
"""
Optuna search maximizing CV-mean F1 with a single global threshold across folds.
Returns: (best_params, best_cv_mean_f1, best_threshold, study, best_report)
"""
# Fixed splits
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)
splits = [(train_idx, valid_idx) for train_idx, valid_idx in skf.split(X, y)]
# scale_pos_weight
pos = int(np.sum(y == 1))
neg = int(np.sum(y == 0))
scale_pos_weight = float(neg / max(pos, 1))
def objective(trial: optuna.Trial) -> float:
params: dict[str, Any] = {
"objective": "binary",
"boosting_type": "gbdt",
"n_estimators": 2000,
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
"num_leaves": trial.suggest_int("num_leaves", 16, 255),
"max_depth": trial.suggest_int("max_depth", -1, 10),
"min_child_samples": trial.suggest_int("min_child_samples", 10, 300),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), # feature_fraction
"subsample": trial.suggest_float("subsample", 0.5, 1.0), # bagging_fraction
"subsample_freq": trial.suggest_int("subsample_freq", 0, 6), # bagging_freq
"reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 100.0, log=True), # lambda_l1
"reg_lambda": trial.suggest_float("reg_lambda", 1e-3, 100.0, log=True),# lambda_l2
"min_split_gain": trial.suggest_float("min_split_gain", 0.0, 1.0), # min_gain_to_split
"n_jobs": -1,
"random_state": random_state if random_state is not None else 42,
"scale_pos_weight": scale_pos_weight,
}
mean_f1, best_threshold, _ = mean_f1_with_global_threshold(
params=params,
X=X,
y=y,
cv=skf,
thresholds=np.array([0.5]), # placeholder; rebuilt inside
splits=splits,
)
trial.set_user_attr("threshold", best_threshold)
return mean_f1
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
best_params = study.best_params
best_params = {
**best_params,
"objective": "binary",
"boosting_type": "gbdt",
"n_estimators": 2000,
"n_jobs": -1,
"random_state": random_state if random_state is not None else 42,
"scale_pos_weight": scale_pos_weight,
}
best_mean_f1, best_threshold, best_report = mean_f1_with_global_threshold(
params=best_params,
X=X,
y=y,
cv=skf,
thresholds=np.array([0.5]), # placeholder
splits=splits,
)
return best_params, float(best_mean_f1), float(best_threshold), study, best_report
使い方
best_params, cv_mean_f1, best_threshold, study, report = optuna_lgbm_mean_f1(
X, y,
n_trials=100,
random_state=42,
)
final_model = LGBMClassifier(**best_params).fit(X, y)
y_hat = (final_model.predict_proba(X_test)[:, 1] > best_threshold).astype(int)