LightGBMのCVで得られた特徴量毎の重要度(gain)をひとつにまとめたい
正規化したあとに全CVの平均をとる。
from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
import lightgbm as lgb
def run_cv_collect_importance(
X: pd.DataFrame,
y: pd.Series | np.ndarray,
model_params: dict[str, Any],
n_splits: int,
importance_type: str,
early_stopping_rounds: int,
eval_metric: str | None,
categorical_feature: list[str] | None = None,
splitter: Any | None = None,
) -> dict[str, Any]:
"""
Loop 1: Train per fold, produce OOF predictions, and collect per-fold feature importance.
Returns raw and L1-normalized importance per fold (DataFrames with feature as index).
"""
X = X.copy()
features = list(X.columns)
if categorical_feature:
for col in categorical_feature:
if col in X.columns and not pd.api.types.is_categorical_dtype(X[col]):
X[col] = X[col].astype("category")
if splitter is None:
splitter = KFold(n_splits=n_splits, shuffle=True, random_state=42)
objective = model_params.get("objective", None)
if objective and "binary" in objective:
Estimator = lgb.LGBMClassifier
else:
Estimator = lgb.LGBMRegressor
y_arr = y.values if isinstance(y, (pd.Series, pd.DataFrame)) else np.asarray(y)
oof = np.zeros(len(X), dtype=float)
models: list[Any] = []
importance_raw_list: list[pd.DataFrame] = []
importance_norm_list: list[pd.DataFrame] = []
for fold, (train_idx, val_idx) in enumerate(splitter.split(X, y_arr), start=1):
X_train, y_train = X.iloc[train_idx], y_arr[train_idx]
X_val, y_val = X.iloc[val_idx], y_arr[val_idx]
model = Estimator(**model_params)
model.fit(
X_train,
y_train,
eval_set=[(X_val, y_val)],
eval_metric=eval_metric,
callbacks=[lgb.early_stopping(stopping_rounds=early_stopping_rounds, verbose=False)],
categorical_feature=categorical_feature,
)
models.append(model)
# OOF prediction
if hasattr(model, "predict_proba"):
proba = model.predict_proba(X_val, num_iteration=model.best_iteration_)
oof[val_idx] = proba[:, 1]
else:
oof[val_idx] = model.predict(X_val, num_iteration=model.best_iteration_)
# Per-fold feature importance (as DataFrame with feature index)
booster = model.booster_
imp = booster.feature_importance(
importance_type=importance_type,
iteration=model.best_iteration_,
)
df_raw = pd.DataFrame(
{f"fold{fold}": imp}, index=features
)
total = df_raw[f"fold{fold}"].sum()
if total <= 0:
raise ValueError(
f"Fold {fold}: total importance is {total}. "
"Cannot normalize feature importance because denominator is non-positive."
)
df_norm = df_raw / total
importance_raw_list.append(df_raw)
importance_norm_list.append(df_norm)
# Safe concat by index alignment
importance_fold_raw = pd.concat(importance_raw_list, axis=1).fillna(0.0)
importance_fold_norm = pd.concat(importance_norm_list, axis=1).fillna(0.0)
return {
"oof": oof,
"models": models,
"importance_fold_raw": importance_fold_raw, # index=feature, columns=folds
"importance_fold_norm": importance_fold_norm, # index=feature, columns=folds
}
def aggregate_importance_summary(
importance_fold_raw: pd.DataFrame,
importance_fold_norm: pd.DataFrame,
) -> pd.DataFrame:
"""
Loop 2: Aggregate per-fold importance into a single summary table.
Returns a DataFrame with mean/std of raw and normalized importance.
"""
features = importance_fold_raw.index.tolist()
importance_summary = pd.DataFrame({
"feature": features,
"raw_mean": importance_fold_raw.mean(axis=1).values,
"raw_std": importance_fold_raw.std(axis=1).values,
"norm_mean": importance_fold_norm.mean(axis=1).values,
"norm_std": importance_fold_norm.std(axis=1).values,
}).sort_values("norm_mean", ascending=False, ignore_index=True)
importance_summary["rank_norm"] = np.arange(1, len(importance_summary) + 1)
return importance_summary