3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

実務で使いがちなクロスバリデーション手法を調べたメモ

Posted at

はじめに

実務で機械学習をやっていると、クロスバリデーション(CV)の手法選択が思っている以上に重要だということに気づいた。
単純にK-Foldを使えばいいというものではなく、データの性質や業務要件に応じて適切な手法を選ばないと、本番環境で全く性能が出ないという痛い目に遭う。

今回、様々なプロジェクトで遭遇した課題を整理しながら、実務で本当に使えるCV手法を体系的に調べてまとめてみた。失敗事例も含めて共有する。

基本手法の実装と落とし穴

K-Fold:意外と奥が深い

まず基本のK-Foldから。シンプルに見えて、実はパラメータの選択次第で結果が大きく変わる。

import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, cross_val_score, validation_curve
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

# データ生成
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, 
                         n_informative=15, n_redundant=5, random_state=42)

def analyze_kfold_variations():
    """K-Foldの各種設定が結果に与える影響を調べてみた"""
    
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    
    # shuffle有無の影響
    kfold_no_shuffle = KFold(n_splits=5, shuffle=False)
    kfold_shuffle = KFold(n_splits=5, shuffle=True, random_state=42)
    
    scores_no_shuffle = cross_val_score(model, X, y, cv=kfold_no_shuffle)
    scores_shuffle = cross_val_score(model, X, y, cv=kfold_shuffle)
    
    print("Shuffle効果の検証:")
    print(f"Shuffle無し: {scores_no_shuffle.mean():.4f} ± {scores_no_shuffle.std():.4f}")
    print(f"Shuffle有り: {scores_shuffle.mean():.4f} ± {scores_shuffle.std():.4f}")
    
    # K値の選択
    k_values = [3, 5, 10, 20]
    k_results = []
    
    for k in k_values:
        if k <= len(X):  # K > サンプル数を防ぐ
            kfold = KFold(n_splits=k, shuffle=True, random_state=42)
            scores = cross_val_score(model, X, y, cv=kfold)
            k_results.append({
                'k': k,
                'mean_score': scores.mean(),
                'std_score': scores.std(),
                'scores': scores
            })
    
    print("\nK値の影響:")
    for result in k_results:
        print(f"K={result['k']}: {result['mean_score']:.4f} ± {result['std_score']:.4f}")
    
    return k_results

k_analysis = analyze_kfold_variations()

実務での教訓: データがソートされている場合、shuffle=Falseだと偏ったCV結果になる。時系列以外では必ずshuffle=Trueにしている。

Stratified K-Fold:不均衡データでの必須ツール

ECサイトのコンバージョン予測(CV率1-3%)や機械の故障予測など、実務では不均衡データが圧倒的に多い。

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix

def imbalanced_data_experiment():
    """極度に不均衡なデータでCV手法を比較してみた"""
    
    # 実際のECデータに近い不均衡データを生成
    X_imb, y_imb = make_classification(
        n_samples=10000, n_features=20, n_classes=2,
        weights=[0.97, 0.03],  # 3%のコンバージョン率
        n_informative=15, random_state=42
    )
    
    print(f"クラス分布: {np.bincount(y_imb)} (比率: {np.bincount(y_imb)[1]/len(y_imb)*100:.1f}%)")
    
    model = RandomForestClassifier(n_estimators=100, random_state=42, 
                                 class_weight='balanced')  # 重要な設定
    
    # 通常のK-Fold
    kfold = KFold(n_splits=5, shuffle=True, random_state=42)
    
    # Stratified K-Fold
    stratified = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    
    # 詳細比較
    def detailed_cv_analysis(cv_method, method_name):
        scores = []
        fold_distributions = []
        
        for fold, (train_idx, val_idx) in enumerate(cv_method.split(X_imb, y_imb)):
            # 各フォールドのクラス分布
            val_dist = np.bincount(y_imb[val_idx])
            fold_distributions.append(val_dist[1] / len(val_idx) * 100)
            
            # モデル学習・評価
            model.fit(X_imb[train_idx], y_imb[train_idx])
            score = model.score(X_imb[val_idx], y_imb[val_idx])
            scores.append(score)
            
            print(f"{method_name} Fold {fold+1}: Positive率={fold_distributions[-1]:.1f}%, Score={score:.4f}")
        
        print(f"{method_name} 平均: {np.mean(scores):.4f} ± {np.std(scores):.4f}")
        print(f"Positive率の分散: {np.std(fold_distributions):.2f}")
        print()
        
        return scores, fold_distributions
    
    kfold_scores, kfold_dist = detailed_cv_analysis(kfold, "K-Fold")
    stratified_scores, stratified_dist = detailed_cv_analysis(stratified, "Stratified")
    
    return {
        'kfold_scores': kfold_scores,
        'stratified_scores': stratified_scores,
        'kfold_dist': kfold_dist,
        'stratified_dist': stratified_dist
    }

imbalance_results = imbalanced_data_experiment()

実務での失敗談: 初期の頃、不均衡データで通常のK-Foldを使って「精度95%!」と喜んでいたが、実際は全部Negativeクラスを予測していただけだった。今は必ずStratifiedを使い、precision/recall/F1も見るようにしている。

時系列データのCV:最も難しい領域

金融データや需要予測など、時系列データのCVは本当に難しい。未来の情報を使ってしまうdata leakageが起きやすい。

Time Series Split:基本だが制約が多い

from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import pandas as pd

def time_series_cv_comparison():
    """時系列CVの各種手法を比較してみた"""
    
    # 株価っぽい時系列データを生成
    np.random.seed(42)
    n_days = 1000
    dates = pd.date_range('2020-01-01', periods=n_days, freq='D')
    
    # トレンド + 季節性 + ノイズ
    trend = np.linspace(100, 200, n_days)
    seasonal = 10 * np.sin(2 * np.pi * np.arange(n_days) / 365.25)
    noise = np.random.normal(0, 5, n_days)
    price = trend + seasonal + noise
    
    # 特徴量作成(過去のラグ特徴量)
    df = pd.DataFrame({'date': dates, 'price': price})
    
    # ラグ特徴量
    for lag in [1, 2, 3, 5, 10]:
        df[f'price_lag_{lag}'] = df['price'].shift(lag)
    
    # 移動平均
    for window in [5, 10, 20]:
        df[f'ma_{window}'] = df['price'].rolling(window=window).mean()
    
    # 欠損値除去
    df = df.dropna().reset_index(drop=True)
    
    feature_cols = [col for col in df.columns if col not in ['date', 'price']]
    X_ts = df[feature_cols].values
    y_ts = df['price'].values
    
    print(f"時系列データ形状: {X_ts.shape}")
    
    # 1. 通常のK-Fold(間違った方法)
    normal_scores = cross_val_score(LinearRegression(), X_ts, y_ts, cv=5, 
                                  scoring='neg_mean_squared_error')
    
    # 2. Time Series Split
    tscv = TimeSeriesSplit(n_splits=5)
    ts_scores = cross_val_score(LinearRegression(), X_ts, y_ts, cv=tscv,
                               scoring='neg_mean_squared_error')
    
    print("時系列CV比較(MSE):")
    print(f"通常のK-Fold: {-normal_scores.mean():.2f} ± {normal_scores.std():.2f}")
    print(f"Time Series Split: {-ts_scores.mean():.2f} ± {ts_scores.std():.2f}")
    
    # 3. カスタムWalk Forward
    def walk_forward_validation(X, y, test_size=100, min_train_size=200):
        scores = []
        for i in range(min_train_size, len(X) - test_size, test_size):
            X_train, y_train = X[:i], y[:i]
            X_test, y_test = X[i:i+test_size], y[i:i+test_size]
            
            model = LinearRegression()
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            
            mse = np.mean((y_test - y_pred) ** 2)
            scores.append(mse)
        
        return np.array(scores)
    
    wf_scores = walk_forward_validation(X_ts, y_ts)
    print(f"Walk Forward: {wf_scores.mean():.2f} ± {wf_scores.std():.2f}")
    
    return df, normal_scores, ts_scores, wf_scores

ts_results = time_series_cv_comparison()

Purged Group Time Series Split:実務で重宝する手法

金融業界でよく使われる、より実践的な時系列CV手法を実装してみた。

class PurgedGroupTimeSeriesSplit:
    """
    金融データ用の高度な時系列分割
    - グループ(銘柄、顧客など)を考慮
    - Purge期間でdata leakageを防止
    """
    
    def __init__(self, n_splits=5, max_train_group_size=None, 
                 group_gap=1, max_test_group_size=1):
        self.n_splits = n_splits
        self.max_train_group_size = max_train_group_size
        self.group_gap = group_gap
        self.max_test_group_size = max_test_group_size
    
    def split(self, X, y=None, groups=None):
        if groups is None:
            raise ValueError("Groups must be provided")
        
        unique_groups = np.unique(groups)
        n_groups = len(unique_groups)
        
        # テストグループのサイズ
        test_size = max(1, n_groups // self.n_splits)
        
        for i in range(self.n_splits):
            # テスト期間
            test_group_start = i * test_size
            test_group_end = min(test_group_start + self.max_test_group_size, n_groups)
            test_groups = unique_groups[test_group_start:test_group_end]
            
            # 訓練期間(purge期間を考慮)
            train_group_end = max(0, test_group_start - self.group_gap)
            if self.max_train_group_size:
                train_group_start = max(0, train_group_end - self.max_train_group_size)
            else:
                train_group_start = 0
            
            train_groups = unique_groups[train_group_start:train_group_end]
            
            # インデックス取得
            train_idx = np.where(np.isin(groups, train_groups))[0]
            test_idx = np.where(np.isin(groups, test_groups))[0]
            
            if len(train_idx) > 0 and len(test_idx) > 0:
                yield train_idx, test_idx

# 使用例:株式データ
def stock_prediction_cv():
    """株式予測でPurged Time Series CVを試してみた"""
    
    # 模擬株式データ
    n_days = 500
    n_stocks = 50
    
    # 日付ベースのグループ
    dates = pd.date_range('2022-01-01', periods=n_days, freq='D')
    date_groups = np.arange(n_days)
    
    # 特徴量とターゲット
    X_stock = np.random.randn(n_days, 10)  # 技術指標など
    y_stock = np.random.randn(n_days)      # リターン
    
    # 通常のTime Series Splitと比較
    tscv = TimeSeriesSplit(n_splits=5)
    ptscv = PurgedGroupTimeSeriesSplit(n_splits=5, group_gap=5)  # 5日のpurge期間
    
    ts_scores = cross_val_score(LinearRegression(), X_stock, y_stock, cv=tscv,
                               scoring='neg_mean_squared_error')
    
    # Purged版(手動実装)
    purged_scores = []
    for train_idx, test_idx in ptscv.split(X_stock, y_stock, groups=date_groups):
        model = LinearRegression()
        model.fit(X_stock[train_idx], y_stock[train_idx])
        score = model.score(X_stock[test_idx], y_stock[test_idx])
        purged_scores.append(score)
    
    print("金融データCV比較:")
    print(f"通常Time Series: {-ts_scores.mean():.4f} ± {ts_scores.std():.4f}")
    print(f"Purged Time Series: {np.mean(purged_scores):.4f} ± {np.std(purged_scores):.4f}")

stock_prediction_cv()

実務での教訓: 株式データでは、決算発表やニュース発表後の価格変動が数日続くことがある。単純なTime Series Splitだと、この影響がテストデータに漏れてしまう。Purge期間を設けることで、より現実的な評価ができるようになった。

Group K-Fold:データリークの救世主

画像認識や医療データなど、同一対象から複数サンプルが得られるデータでは必須。

from sklearn.model_selection import GroupKFold, LeaveOneGroupOut

def group_cv_medical_example():
    """医療データでのGroup CV実装例"""
    
    # 医療データの模擬(患者ごとに複数の検査結果)
    n_patients = 100
    n_tests_per_patient = 15
    
    # 患者ID(グループ)
    patient_ids = np.repeat(np.arange(n_patients), n_tests_per_patient)
    
    # 特徴量(血液検査値など)
    X_medical = np.random.randn(len(patient_ids), 20)
    
    # 患者レベルの疾患有無(同一患者は同じラベル)
    patient_labels = np.random.choice([0, 1], n_patients, p=[0.7, 0.3])
    y_medical = np.repeat(patient_labels, n_tests_per_patient)
    
    print(f"患者数: {n_patients}")
    print(f"総サンプル数: {len(X_medical)}")
    print(f"疾患有病率: {patient_labels.mean():.1%}")
    
    # 1. 通常のStratified K-Fold(間違った方法)
    stratified = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    normal_scores = cross_val_score(RandomForestClassifier(random_state=42), 
                                  X_medical, y_medical, cv=stratified)
    
    # 2. Group K-Fold(正しい方法)
    group_kfold = GroupKFold(n_splits=5)
    group_scores = cross_val_score(RandomForestClassifier(random_state=42),
                                 X_medical, y_medical, cv=group_kfold, 
                                 groups=patient_ids)
    
    # 3. Leave One Group Out(患者一人ずつ)
    logo = LeaveOneGroupOut()
    # LOGOは時間がかかるので、サンプル版
    sample_patients = np.random.choice(np.unique(patient_ids), 10, replace=False)
    sample_mask = np.isin(patient_ids, sample_patients)
    
    sample_scores = cross_val_score(RandomForestClassifier(random_state=42),
                                  X_medical[sample_mask], y_medical[sample_mask],
                                  cv=logo, groups=patient_ids[sample_mask])
    
    print("\n医療データCV比較:")
    print(f"Stratified K-Fold: {normal_scores.mean():.4f} ± {normal_scores.std():.4f}")
    print(f"Group K-Fold: {group_scores.mean():.4f} ± {group_scores.std():.4f}")
    print(f"Leave One Group Out (sample): {sample_scores.mean():.4f} ± {sample_scores.std():.4f}")
    
    # データリークの検証
    print("\nデータリーク検証:")
    for fold, (train_idx, val_idx) in enumerate(stratified.split(X_medical, y_medical)):
        train_patients = set(patient_ids[train_idx])
        val_patients = set(patient_ids[val_idx])
        overlap = len(train_patients & val_patients)
        print(f"Stratified Fold {fold+1}: 患者重複数 = {overlap}")
        if fold == 2:  # 最初の3フォールドだけ表示
            break
    
    print()
    for fold, (train_idx, val_idx) in enumerate(group_kfold.split(X_medical, y_medical, patient_ids)):
        train_patients = set(patient_ids[train_idx])
        val_patients = set(patient_ids[val_idx])
        overlap = len(train_patients & val_patients)
        print(f"Group K-Fold {fold+1}: 患者重複数 = {overlap}")
        if fold == 2:
            break

group_cv_medical_example()

実務での痛い経験: 顔認識プロジェクトで同一人物の複数写真が訓練・テストに分かれてしまい、異常に高い精度が出た。本番では全く使えなかった。今はGroup CVを徹底している。

高度なCV手法:実務で差がつくテクニック

Nested Cross Validation:ハイパーパラメータ調整の正しい評価

from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

def nested_cv_proper_implementation():
    """Nested CVの正しい実装を検証してみた"""
    
    # データ準備
    X, y = make_classification(n_samples=500, n_features=20, n_classes=2, random_state=42)
    
    # パラメータグリッド
    param_grid = {
        'C': [0.01, 0.1, 1, 10, 100],
        'gamma': ['scale', 'auto', 0.001, 0.01, 0.1],
        'kernel': ['rbf', 'linear']
    }
    
    # 間違った方法:同じデータでグリッドサーチと評価
    grid_search = GridSearchCV(SVC(), param_grid, cv=5, scoring='accuracy')
    grid_search.fit(X, y)
    wrong_score = grid_search.best_score_
    
    # 正しい方法:Nested CV
    outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    inner_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    
    nested_scores = []
    best_params_list = []
    
    for fold, (train_outer, test_outer) in enumerate(outer_cv.split(X, y)):
        X_train_outer, X_test_outer = X[train_outer], X[test_outer]
        y_train_outer, y_test_outer = y[train_outer], y[test_outer]
        
        # 内側でハイパーパラメータ調整
        inner_grid = GridSearchCV(SVC(), param_grid, cv=inner_cv, scoring='accuracy')
        inner_grid.fit(X_train_outer, y_train_outer)
        
        # 外側でテスト
        test_score = inner_grid.score(X_test_outer, y_test_outer)
        nested_scores.append(test_score)
        best_params_list.append(inner_grid.best_params_)
        
        print(f"Outer Fold {fold+1}: {test_score:.4f}, Best params: {inner_grid.best_params_}")
    
    print(f"\n間違った評価: {wrong_score:.4f}")
    print(f"正しいNested CV評価: {np.mean(nested_scores):.4f} ± {np.std(nested_scores):.4f}")
    
    # パラメータの安定性確認
    param_stability = {}
    for param in param_grid.keys():
        values = [params[param] for params in best_params_list]
        param_stability[param] = len(set(values))
    
    print("\nパラメータ選択の安定性:")
    for param, unique_count in param_stability.items():
        print(f"{param}: {unique_count}/{len(best_params_list)} 種類の値が選択された")

nested_cv_proper_implementation()

Adversarial Validation:ドメイン適応の確認

def adversarial_validation_check():
    """本番データとの分布差をAdversarial Validationで検証してみた"""
    
    # 訓練データ
    X_train, _ = make_classification(n_samples=1000, n_features=20, 
                                   n_informative=15, random_state=42)
    
    # 本番データ(わずかに分布が異なる)
    X_prod = make_classification(n_samples=500, n_features=20,
                               n_informative=15, random_state=123)[0]
    X_prod = X_prod * 1.2 + 0.1  # 分布をわずかにシフト
    
    # Adversarial Validationデータセット作成
    X_adv = np.vstack([X_train, X_prod])
    y_adv = np.hstack([np.zeros(len(X_train)), np.ones(len(X_prod))])
    
    # 分布差の検出
    adv_scores = cross_val_score(RandomForestClassifier(random_state=42),
                               X_adv, y_adv, cv=5)
    
    print("Adversarial Validation結果:")
    print(f"AUC: {adv_scores.mean():.4f} ± {adv_scores.std():.4f}")
    
    if adv_scores.mean() > 0.7:
        print("⚠️  訓練データと本番データの分布に大きな差があります")
        print("CVの結果は本番性能を正しく反映しない可能性があります")
    else:
        print("✅ 訓練データと本番データの分布は似ています")
    
    # 特徴量レベルでの分析
    from sklearn.feature_selection import mutual_info_classif
    
    feature_importance = mutual_info_classif(X_adv, y_adv, random_state=42)
    important_features = np.argsort(feature_importance)[-5:]
    
    print(f"\n分布差の主要因子: 特徴量 {important_features}")

adversarial_validation_check()

Stratified Group K-Fold:究極のバランス型CV

from sklearn.model_selection import StratifiedGroupKFold

def stratified_group_kfold_example():
    """クラス比率とグループの両方を考慮したCVを試してみた"""
    
    # 複雑なデータ設定:店舗(グループ)ごとの顧客データ(不均衡)
    n_stores = 20
    customers_per_store = np.random.randint(30, 100, n_stores)
    
    store_ids = []
    X_store = []
    y_store = []
    
    for store_id in range(n_stores):
        n_customers = customers_per_store[store_id]
        
        # 店舗ごとに異なるコンバージョン率
        store_conversion_rate = np.random.uniform(0.02, 0.15)
        
        # 顧客データ
        X_customers = np.random.randn(n_customers, 15)
        y_customers = np.random.choice([0, 1], n_customers, 
                                     p=[1-store_conversion_rate, store_conversion_rate])
        
        store_ids.extend([store_id] * n_customers)
        X_store.append(X_customers)
        y_store.extend(y_customers)
    
    X_store = np.vstack(X_store)
    y_store = np.array(y_store)
    store_groups = np.array(store_ids)
    
    print(f"総サンプル数: {len(X_store)}")
    print(f"店舗数: {len(np.unique(store_groups))}")
    print(f"全体コンバージョン率: {y_store.mean():.3f}")
    
    # 各CV手法の比較
    cv_methods = {
        'StratifiedKFold': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
        'GroupKFold': GroupKFold(n_splits=5),
        'StratifiedGroupKFold': StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
    }
    
    for name, cv_method in cv_methods.items():
        if name == 'StratifiedGroupKFold':
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_store, y_store, cv=cv_method, groups=store_groups)
        elif name == 'GroupKFold':
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_store, y_store, cv=cv_method, groups=store_groups)
        else:
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_store, y_store, cv=cv_method)
        
        print(f"{name}: {scores.mean():.4f} ± {scores.std():.4f}")
        
        # フォールドごとの詳細分析
        fold_details = []
        if name == 'StratifiedGroupKFold':
            splits = cv_method.split(X_store, y_store, groups=store_groups)
        elif name == 'GroupKFold':
            splits = cv_method.split(X_store, y_store, groups=store_groups)
        else:
            splits = cv_method.split(X_store, y_store)
            
        for fold, (train_idx, val_idx) in enumerate(splits):
            val_conversion = y_store[val_idx].mean()
            if name in ['GroupKFold', 'StratifiedGroupKFold']:
                val_stores = len(np.unique(store_groups[val_idx]))
                fold_details.append(f"  Fold {fold+1}: CV率={val_conversion:.3f}, 店舗数={val_stores}")
            else:
                fold_details.append(f"  Fold {fold+1}: CV率={val_conversion:.3f}")
        
        for detail in fold_details[:3]:  # 最初の3フォールドのみ表示
            print(detail)
        print()

stratified_group_kfold_example()

特殊ドメインでのCV戦略

地理データ:空間相関を考慮したCV

def geographical_cv_implementation():
    """地理データでの空間相関を考慮したCVを実装してみた"""
    
    # 不動産価格予測を想定した地理データ
    n_properties = 2000
    
    # 緯度・経度(東京都内を想定)
    lat_range = (35.6, 35.8)
    lon_range = (139.6, 139.8)
    
    np.random.seed(42)
    latitudes = np.random.uniform(lat_range[0], lat_range[1], n_properties)
    longitudes = np.random.uniform(lon_range[0], lon_range[1], n_properties)
    coordinates = np.column_stack([latitudes, longitudes])
    
    # 特徴量(面積、築年数など)
    X_geo = np.random.randn(n_properties, 10)
    
    # 価格(地理的にクラスタ化された傾向を持つ)
    from sklearn.cluster import KMeans
    geo_clusters = KMeans(n_clusters=5, random_state=42).fit_predict(coordinates)
    cluster_effects = np.random.uniform(0.8, 1.2, 5)  # クラスタごとの価格倍率
    
    base_price = 100 + X_geo[:, 0] * 20 + X_geo[:, 1] * 15  # 基本価格
    y_geo = base_price * cluster_effects[geo_clusters] + np.random.normal(0, 10, n_properties)
    
    print(f"不動産データ: {n_properties}")
    print(f"地理クラスタ数: {len(np.unique(geo_clusters))}")
    
    # 1. 通常のK-Fold(空間相関を無視)
    normal_scores = cross_val_score(RandomForestRegressor(random_state=42),
                                  X_geo, y_geo, cv=5, scoring='r2')
    
    # 2. 地理クラスタベースのGroup CV
    cluster_cv = GroupKFold(n_splits=5)
    cluster_scores = cross_val_score(RandomForestRegressor(random_state=42),
                                   X_geo, y_geo, cv=cluster_cv, 
                                   groups=geo_clusters, scoring='r2')
    
    # 3. 距離ベースの分割
    def distance_based_split(coordinates, n_splits=5):
        """距離ベースでデータを分割"""
        from sklearn.cluster import KMeans
        
        kmeans = KMeans(n_clusters=n_splits, random_state=42)
        spatial_groups = kmeans.fit_predict(coordinates)
        
        for test_group in range(n_splits):
            train_idx = np.where(spatial_groups != test_group)[0]
            test_idx = np.where(spatial_groups == test_group)[0]
            yield train_idx, test_idx
    
    distance_scores = []
    for train_idx, test_idx in distance_based_split(coordinates):
        model = RandomForestRegressor(random_state=42)
        model.fit(X_geo[train_idx], y_geo[train_idx])
        score = model.score(X_geo[test_idx], y_geo[test_idx])
        distance_scores.append(score)
    
    print("\n地理データCV比較(R²):")
    print(f"通常K-Fold: {normal_scores.mean():.4f} ± {normal_scores.std():.4f}")
    print(f"クラスタGroup CV: {cluster_scores.mean():.4f} ± {cluster_scores.std():.4f}")
    print(f"距離ベース分割: {np.mean(distance_scores):.4f} ± {np.std(distance_scores):.4f}")
    
    # 空間相関の可視化
    def plot_spatial_correlation():
        import matplotlib.pyplot as plt
        
        plt.figure(figsize=(12, 4))
        
        plt.subplot(131)
        plt.scatter(longitudes, latitudes, c=y_geo, cmap='viridis', alpha=0.6)
        plt.title('Price Distribution')
        plt.xlabel('Longitude')
        plt.ylabel('Latitude')
        plt.colorbar()
        
        plt.subplot(132)
        plt.scatter(longitudes, latitudes, c=geo_clusters, cmap='tab10', alpha=0.6)
        plt.title('Geographic Clusters')
        plt.xlabel('Longitude')
        plt.ylabel('Latitude')
        
        plt.subplot(133)
        # CV分割の例(1つのフォールド)
        train_idx, test_idx = next(distance_based_split(coordinates))
        plt.scatter(longitudes[train_idx], latitudes[train_idx], 
                   c='blue', alpha=0.6, label='Train')
        plt.scatter(longitudes[test_idx], latitudes[test_idx], 
                   c='red', alpha=0.6, label='Test')
        plt.title('CV Split Example')
        plt.xlabel('Longitude')
        plt.ylabel('Latitude')
        plt.legend()
        
        plt.tight_layout()
        plt.show()
    
    # plot_spatial_correlation()  # 実際の環境では有効化

geographical_cv_implementation()

製造業:センサーデータの時空間CV

def manufacturing_sensor_cv():
    """製造業のセンサーデータでの複合的CVを実装してみた"""
    
    # 工場の複数ラインからのセンサーデータ
    n_lines = 8
    n_days = 200
    readings_per_day = 24  # 1時間ごと
    
    line_ids = []
    timestamps = []
    X_sensor = []
    y_defect = []
    
    for line_id in range(n_lines):
        # ライン固有の特性
        line_baseline_temp = 200 + np.random.normal(0, 10)
        line_defect_rate = np.random.uniform(0.01, 0.05)
        
        for day in range(n_days):
            for hour in range(readings_per_day):
                timestamp = day * 24 + hour
                
                # センサー値(温度、圧力、振動など)
                temp = line_baseline_temp + np.random.normal(0, 5)
                pressure = 50 + np.random.normal(0, 2)
                vibration = 0.5 + np.random.normal(0, 0.1)
                
                # 時間的なドリフト
                temp += timestamp * 0.001  # 徐々に上昇
                
                sensors = [temp, pressure, vibration] + list(np.random.randn(7))
                
                # 不良品発生(センサー値に依存)
                defect_prob = line_defect_rate
                if temp > line_baseline_temp + 10:
                    defect_prob *= 3
                if vibration > 0.7:
                    defect_prob *= 2
                
                is_defect = np.random.random() < defect_prob
                
                line_ids.append(line_id)
                timestamps.append(timestamp)
                X_sensor.append(sensors)
                y_defect.append(int(is_defect))
    
    X_sensor = np.array(X_sensor)
    y_defect = np.array(y_defect)
    line_groups = np.array(line_ids)
    time_groups = np.array(timestamps) // 24  # 日単位でグループ化
    
    print(f"製造データ: {len(X_sensor)}レコード")
    print(f"ライン数: {len(np.unique(line_groups))}")
    print(f"日数: {len(np.unique(time_groups))}")
    print(f"不良率: {np.mean(y_defect):.3f}")
    
    # 複数のCV戦略を比較
    cv_strategies = {
        'Random': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
        'ByLine': GroupKFold(n_splits=5),  # ライン単位
        'ByTime': GroupKFold(n_splits=5),  # 時間単位
        'TimeSeries': TimeSeriesSplit(n_splits=5)
    }
    
    print("\n製造データCV比較:")
    for strategy_name, cv in cv_strategies.items():
        if strategy_name == 'ByLine':
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_sensor, y_defect, cv=cv, groups=line_groups)
        elif strategy_name == 'ByTime':
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_sensor, y_defect, cv=cv, groups=time_groups)
        else:
            scores = cross_val_score(RandomForestClassifier(random_state=42),
                                   X_sensor, y_defect, cv=cv)
        
        print(f"{strategy_name}: {scores.mean():.4f} ± {scores.std():.4f}")

manufacturing_sensor_cv()

計算コスト最適化:大規模データでの実践的テクニック

実務では数百万〜数千万レコードのデータを扱うことも多く、CVの計算コストが問題になる。

import time
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import psutil
import os

def computational_cost_analysis():
    """大規模データでのCV計算コスト最適化を検証してみた"""
    
    # データサイズを変えて計算時間を測定
    data_sizes = [1000, 5000, 10000, 50000]
    models = {
        'LogisticRegression': LogisticRegression(max_iter=1000),
        'RandomForest': RandomForestClassifier(n_estimators=50, n_jobs=1)
    }
    
    results = []
    
    for size in data_sizes:
        print(f"\nデータサイズ: {size:,}")
        X, y = make_classification(n_samples=size, n_features=20, 
                                 n_classes=2, random_state=42)
        
        for model_name, model in models.items():
            # メモリ使用量測定開始
            process = psutil.Process(os.getpid())
            mem_before = process.memory_info().rss / 1024 / 1024  # MB
            
            # CV実行
            start_time = time.time()
            scores = cross_val_score(model, X, y, cv=5)
            end_time = time.time()
            
            # メモリ使用量測定終了
            mem_after = process.memory_info().rss / 1024 / 1024  # MB
            
            results.append({
                'size': size,
                'model': model_name,
                'time': end_time - start_time,
                'memory_used': mem_after - mem_before,
                'score': scores.mean(),
                'score_std': scores.std()
            })
            
            print(f"  {model_name}: {end_time - start_time:.2f}秒, "
                  f"メモリ: {mem_after - mem_before:.1f}MB, "
                  f"精度: {scores.mean():.4f}")
    
    return results

# 実行(時間がかかるのでコメントアウト)
# cost_results = computational_cost_analysis()

def cv_optimization_techniques():
    """CV計算の最適化テクニックを試してみた"""
    
    # 中規模データでテスト
    X, y = make_classification(n_samples=20000, n_features=50, random_state=42)
    
    print("CV最適化テクニック比較:")
    
    # 1. 標準的なCV
    start_time = time.time()
    standard_scores = cross_val_score(RandomForestClassifier(n_estimators=100, random_state=42),
                                    X, y, cv=5)
    standard_time = time.time() - start_time
    print(f"標準CV: {standard_time:.2f}秒, 精度: {standard_scores.mean():.4f}")
    
    # 2. 並列化
    start_time = time.time()
    parallel_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
                                    X, y, cv=5)
    parallel_time = time.time() - start_time
    print(f"並列化CV: {parallel_time:.2f}秒, 精度: {parallel_scores.mean():.4f}")
    
    # 3. 少ないフォールド数
    start_time = time.time()
    fewer_folds_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
                                       X, y, cv=3)
    fewer_folds_time = time.time() - start_time
    print(f"3-Fold CV: {fewer_folds_time:.2f}秒, 精度: {fewer_folds_scores.mean():.4f}")
    
    # 4. サンプリング + CV
    sample_size = 10000
    sample_idx = np.random.choice(len(X), sample_size, replace=False)
    X_sample, y_sample = X[sample_idx], y[sample_idx]
    
    start_time = time.time()
    sample_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
                                  X_sample, y_sample, cv=5)
    sample_time = time.time() - start_time
    print(f"サンプリングCV: {sample_time:.2f}秒, 精度: {sample_scores.mean():.4f}")
    
    # 5. 軽量モデル + CV
    start_time = time.time()
    light_scores = cross_val_score(LogisticRegression(max_iter=1000),
                                 X, y, cv=5)
    light_time = time.time() - start_time
    print(f"軽量モデルCV: {light_time:.2f}秒, 精度: {light_scores.mean():.4f}")

cv_optimization_techniques()

分散処理でのCV実装

def distributed_cv_simulation():
    """分散処理でのCV実装パターンを検証してみた"""
    
    # 大規模データをシミュレート
    n_samples = 100000
    X_large, y_large = make_classification(n_samples=n_samples, n_features=30,
                                         n_classes=2, random_state=42)
    
    print(f"大規模データ: {n_samples:,} サンプル")
    
    # 手動でのCV分散処理シミュレーション
    def manual_distributed_cv(X, y, n_splits=5):
        """手動実装による分散CV(概念実証)"""
        
        from sklearn.model_selection import KFold
        kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)
        
        fold_results = []
        
        for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
            print(f"  Fold {fold+1} 処理中...")
            
            # 実際の分散環境では、ここで各ワーカーに処理を分散
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            # 軽量モデルで高速化
            model = LogisticRegression(max_iter=500)
            
            start_time = time.time()
            model.fit(X_train, y_train)
            score = model.score(X_val, y_val)
            fold_time = time.time() - start_time
            
            fold_results.append({
                'fold': fold + 1,
                'score': score,
                'time': fold_time,
                'train_size': len(X_train),
                'val_size': len(X_val)
            })
            
            print(f"    完了: {fold_time:.2f}秒, 精度: {score:.4f}")
        
        return fold_results
    
    print("分散CV処理シミュレーション:")
    distributed_results = manual_distributed_cv(X_large, y_large)
    
    total_time = sum(r['time'] for r in distributed_results)
    avg_score = np.mean([r['score'] for r in distributed_results])
    
    print(f"\n総処理時間: {total_time:.2f}")
    print(f"平均精度: {avg_score:.4f}")
    print(f"並列化で理論的には {total_time/5:.2f}秒まで短縮可能")

distributed_cv_simulation()

失敗事例から学ぶ:時系列データでの大失敗

実際に経験した痛い失敗事例を共有する。

def time_series_failure_case():
    """時系列データでの大失敗事例を再現してみた"""
    
    print("=== 実際にやらかした失敗事例 ===")
    print("プロジェクト: ECサイトの需要予測システム")
    print("データ: 2年分の日次売上データ")
    print("目標: 来月の売上を予測するモデル構築")
    print()
    
    # 実際のデータに近い時系列データを生成
    np.random.seed(42)
    n_days = 730  # 2年分
    dates = pd.date_range('2022-01-01', periods=n_days, freq='D')
    
    # 売上の生成(トレンド + 季節性 + イベント効果)
    base_trend = np.linspace(1000, 1500, n_days)  # 成長トレンド
    seasonal = 200 * np.sin(2 * np.pi * np.arange(n_days) / 365.25)  # 年次季節性
    weekly = 100 * np.sin(2 * np.pi * np.arange(n_days) / 7)  # 週次パターン
    
    # 特殊イベント(セール期間など)
    event_days = np.random.choice(n_days, 20, replace=False)
    event_effect = np.zeros(n_days)
    event_effect[event_days] = np.random.uniform(500, 1000, 20)
    
    # ノイズ
    noise = np.random.normal(0, 50, n_days)
    
    sales = base_trend + seasonal + weekly + event_effect + noise
    sales = np.maximum(sales, 0)  # 負の売上は除外
    
    # 特徴量作成
    df = pd.DataFrame({'date': dates, 'sales': sales})
    df['day_of_week'] = df['date'].dt.dayofweek
    df['month'] = df['date'].dt.month
    df['day_of_month'] = df['date'].dt.day
    
    # ラグ特徴量
    for lag in [1, 7, 14, 30]:
        df[f'sales_lag_{lag}'] = df['sales'].shift(lag)
    
    # 移動平均
    for window in [7, 14, 30]:
        df[f'sales_ma_{window}'] = df['sales'].rolling(window=window).mean()
    
    df = df.dropna().reset_index(drop=True)
    
    feature_cols = [col for col in df.columns if col not in ['date', 'sales']]
    X_ts = df[feature_cols].values
    y_ts = df['sales'].values
    
    print(f"データ形状: {X_ts.shape}")
    print(f"期間: {df['date'].min()} - {df['date'].max()}")
    print()
    
    # === 失敗パターン1: 通常のK-FoldでCV ===
    print("【失敗パターン1】通常のK-FoldでCV評価")
    print("当時の考え: 「CVで高い精度が出てるから大丈夫だろう」")
    
    wrong_cv = KFold(n_splits=5, shuffle=True, random_state=42)
    wrong_scores = cross_val_score(RandomForestRegressor(n_estimators=100, random_state=42),
                                 X_ts, y_ts, cv=wrong_cv, scoring='neg_mean_absolute_error')
    
    print(f"K-Fold CV MAE: {-wrong_scores.mean():.2f} ± {wrong_scores.std():.2f}")
    print("結果: とても良い精度が出た!")
    print()
    
    # === 現実的な評価: Time Series Split ===
    print("【正しい評価】Time Series Splitで再評価")
    
    tscv = TimeSeriesSplit(n_splits=5)
    correct_scores = cross_val_score(RandomForestRegressor(n_estimators=100, random_state=42),
                                   X_ts, y_ts, cv=tscv, scoring='neg_mean_absolute_error')
    
    print(f"Time Series CV MAE: {-correct_scores.mean():.2f} ± {correct_scores.std():.2f}")
    print()
    
    # === 失敗の原因分析 ===
    print("【失敗の原因分析】")
    print("1. 未来の情報を使ったdata leakage")
    print("2. 時系列の構造を無視した評価")
    print("3. 季節性やトレンドの過学習")
    print()
    
    # data leakageの実証
    print("【Data Leakageの実証】")
    fold_analysis = []
    
    for fold, (train_idx, val_idx) in enumerate(wrong_cv.split(X_ts)):
        train_dates = df.iloc[train_idx]['date']
        val_dates = df.iloc[val_idx]['date']
        
        # 時系列の混在をチェック
        future_in_train = (train_dates.max() > val_dates.min())
        past_in_val = (val_dates.min() < train_dates.max())
        
        fold_analysis.append({
            'fold': fold + 1,
            'train_period': f"{train_dates.min().strftime('%Y-%m')} - {train_dates.max().strftime('%Y-%m')}",
            'val_period': f"{val_dates.min().strftime('%Y-%m')} - {val_dates.max().strftime('%Y-%m')}",
            'leakage': future_in_train and past_in_val
        })
    
    for analysis in fold_analysis:
        print(f"Fold {analysis['fold']}: Train={analysis['train_period']}, Val={analysis['val_period']}, Leakage={analysis['leakage']}")
    
    print()
    print("【本番での結果】")
    print("実装後の実際の性能: 予測MAEが3-4倍悪化")
    print("原因: CVで見た性能は未来の情報を使った虚偽の高精度")
    print("対策: Time Series CVによる正しい評価を導入")
    
    # === 改善後のワークフロー ===
    print("\n【改善後のワークフロー】")
    
    # より現実的なWalk Forward Validation
    def walk_forward_validation_detailed(X, y, dates, initial_train_size=300, 
                                       step_size=30, max_test_size=30):
        scores = []
        predictions = []
        
        for i in range(initial_train_size, len(X) - max_test_size, step_size):
            # 訓練データ:過去のデータのみ
            X_train, y_train = X[:i], y[:i]
            
            # テストデータ:次の期間
            test_end = min(i + max_test_size, len(X))
            X_test, y_test = X[i:test_end], y[i:test_end]
            test_dates = dates.iloc[i:test_end]
            
            # モデル訓練
            model = RandomForestRegressor(n_estimators=100, random_state=42)
            model.fit(X_train, y_train)
            
            # 予測
            y_pred = model.predict(X_test)
            
            # 評価
            mae = np.mean(np.abs(y_test - y_pred))
            scores.append(mae)
            
            predictions.extend(list(zip(test_dates, y_test, y_pred)))
            
            print(f"期間 {test_dates.min().strftime('%Y-%m-%d')} - {test_dates.max().strftime('%Y-%m-%d')}: MAE = {mae:.2f}")
        
        return scores, predictions
    
    wf_scores, wf_predictions = walk_forward_validation_detailed(X_ts, y_ts, df['date'])
    
    print(f"\nWalk Forward Validation MAE: {np.mean(wf_scores):.2f} ± {np.std(wf_scores):.2f}")
    print("これが実際の本番性能に近い値")

time_series_failure_case()

業界別CV戦略まとめ

def industry_specific_cv_guide():
    """業界別CV戦略ガイドを整理してみた"""
    
    cv_strategies = {
        "金融・投資": {
            "データ特性": "時系列、高次元、ノイズ多、レジーム変化",
            "推奨CV": "Purged Group Time Series Split",
            "注意点": "Look-ahead bias、Market regime変化",
            "実装例": "PurgedGroupTimeSeriesSplit(group_gap=5, max_train_group_size=252)"
        },
        
        "Eコマース・小売": {
            "データ特性": "季節性、プロモーション効果、顧客セグメント",
            "推奨CV": "Time Series Split + Stratified Group",
            "注意点": "季節性の学習、プロモーション期間の扱い",
            "実装例": "TimeSeriesSplit + 顧客グループでの検証"
        },
        
        "医療・ヘルスケア": {
            "データ特性": "患者グループ、不均衡、倫理的制約",
            "推奨CV": "Stratified Group K-Fold",
            "注意点": "患者間リーク、病院間の違い",
            "実装例": "StratifiedGroupKFold(groups=patient_id)"
        },
        
        "製造業・IoT": {
            "データ特性": "センサー時系列、設備グループ、ドリフト",
            "推奨CV": "Group Time Series Split",
            "注意点": "設備間の違い、時間的ドリフト、メンテナンス期間",
            "実装例": "GroupKFold(groups=equipment_id) + 時系列考慮"
        },
        
        "不動産・地理": {
            "データ特性": "空間相関、地域特性、時系列トレンド",
            "推奨CV": "Spatial Group K-Fold",
            "注意点": "近隣物件の相関、地域経済の影響",
            "実装例": "地理的クラスタリング + GroupKFold"
        },
        
        "画像・コンピュータビジョン": {
            "データ特性": "同一対象の複数画像、データ拡張",
            "推奨CV": "Group K-Fold (対象別)",
            "注意点": "同一人物・物体の画像リーク、データ拡張の扱い",
            "実装例": "GroupKFold(groups=subject_id)"
        },
        
        "自然言語処理": {
            "データ特性": "文書グループ、ドメイン差、時間的変化",
            "推奨CV": "Stratified Group K-Fold",
            "注意点": "同一著者・ソースからのリーク、言語進化",
            "実装例": "StratifiedGroupKFold(groups=document_source)"
        }
    }
    
    print("業界別CV戦略ガイド:")
    print("=" * 50)
    
    for industry, strategy in cv_strategies.items():
        print(f"\n{industry}")
        print(f"データ特性: {strategy['データ特性']}")
        print(f"推奨CV: {strategy['推奨CV']}")
        print(f"注意点: {strategy['注意点']}")
        print(f"実装例: {strategy['実装例']}")

industry_specific_cv_guide()

実装チェックリスト:本番運用前の確認項目

def cv_implementation_checklist():
    """本番運用前のCV実装チェックリストを作成してみた"""
    
    checklist = {
        "データ理解": [
            "データの時間的構造を理解しているか",
            "グループ構造(顧客、患者、機器など)を特定したか",
            "クラス不均衡の程度を確認したか",
            "地理的・空間的相関があるか確認したか",
            "外れ値やデータ品質の問題を把握したか"
        ],
        
        "CV手法選択": [
            "時系列データで未来情報リークを防いでいるか",
            "グループ構造を適切に考慮しているか",
            "不均衡データでStratifiedを使用しているか",
            "空間相関を考慮したか(該当する場合)",
            "業界特有のドメイン知識を反映したか"
        ],
        
        "実装検証": [
            "各フォールドでデータリークが発生していないか",
            "フォールド間でクラス分布が適切に保たれているか",
            "グループの重複がないか確認したか",
            "時系列順序が保たれているか(該当する場合)",
            "計算時間が許容範囲内か"
        ],
        
        "結果解釈": [
            "CV結果と本番性能のギャップを理解しているか",
            "フォールド間の性能バラツキを分析したか",
            "複数の評価指標で検証したか",
            "統計的有意性を確認したか",
            "ベースラインモデルと比較したか"
        ],
        
        "本番対応": [
            "本番データでAdversarial Validationを実施したか",
            "モデル更新時のCV戦略を定義したか",
            "性能劣化の監視方法を設計したか",
            "A/Bテストでの検証計画があるか",
            "障害時のフォールバック戦略があるか"
        ]
    }
    
    print("CV実装チェックリスト:")
    print("=" * 40)
    
    for category, items in checklist.items():
        print(f"\n{category}")
        for i, item in enumerate(items, 1):
            print(f"  {i}. □ {item}")
    
    print("\n" + "=" * 40)
    print("全項目をチェックしてから本番運用を開始してください")

cv_implementation_checklist()

性能監視と継続的改善

def cv_monitoring_framework():
    """CV性能の継続的監視フレームワークを構築してみた"""
    
    class CVPerformanceMonitor:
        def __init__(self):
            self.cv_history = []
            self.production_history = []
        
        def log_cv_performance(self, cv_scores, cv_method, model_version):
            """CV性能をログ"""
            self.cv_history.append({
                'timestamp': pd.Timestamp.now(),
                'cv_mean': np.mean(cv_scores),
                'cv_std': np.std(cv_scores),
                'cv_method': cv_method,
                'model_version': model_version,
                'individual_scores': cv_scores
            })
        
        def log_production_performance(self, actual_score, prediction_period):
            """本番性能をログ"""
            self.production_history.append({
                'timestamp': pd.Timestamp.now(),
                'actual_score': actual_score,
                'period': prediction_period
            })
        
        def calculate_cv_production_gap(self, lookback_days=30):
            """CV性能と本番性能のギャップを分析"""
            if not self.cv_history or not self.production_history:
                return None
            
            # 最近のCV性能
            recent_cv = [h for h in self.cv_history 
                        if (pd.Timestamp.now() - h['timestamp']).days <= lookback_days]
            
            # 最近の本番性能
            recent_prod = [h for h in self.production_history 
                          if (pd.Timestamp.now() - h['timestamp']).days <= lookback_days]
            
            if not recent_cv or not recent_prod:
                return None
            
            avg_cv_score = np.mean([h['cv_mean'] for h in recent_cv])
            avg_prod_score = np.mean([h['actual_score'] for h in recent_prod])
            
            gap = abs(avg_cv_score - avg_prod_score)
            gap_percentage = gap / avg_cv_score * 100
            
            return {
                'cv_score': avg_cv_score,
                'production_score': avg_prod_score,
                'absolute_gap': gap,
                'percentage_gap': gap_percentage,
                'samples': len(recent_cv) + len(recent_prod)
            }
        
        def detect_performance_drift(self, threshold=0.05):
            """性能劣化の検出"""
            if len(self.production_history) < 10:
                return False
            
            # 最近の性能 vs 過去の性能
            recent_scores = [h['actual_score'] for h in self.production_history[-5:]]
            past_scores = [h['actual_score'] for h in self.production_history[-15:-5]]
            
            if len(past_scores) < 5:
                return False
            
            recent_mean = np.mean(recent_scores)
            past_mean = np.mean(past_scores)
            
            drift = abs(recent_mean - past_mean) / past_mean
            
            return drift > threshold
    
    # 使用例
    monitor = CVPerformanceMonitor()
    
    # CV性能をログ(模擬)
    cv_scores = [0.85, 0.87, 0.84, 0.86, 0.85]
    monitor.log_cv_performance(cv_scores, "StratifiedKFold", "v1.2")
    
    # 本番性能をログ(模擬)
    for i in range(10):
        # 時間経過とともに性能が劣化する例
        degraded_score = 0.85 - (i * 0.01) + np.random.normal(0, 0.02)
        monitor.log_production_performance(degraded_score, f"week_{i+1}")
    
    # ギャップ分析
    gap_analysis = monitor.calculate_cv_production_gap()
    if gap_analysis:
        print("CV-本番性能ギャップ分析:")
        print(f"CV性能: {gap_analysis['cv_score']:.4f}")
        print(f"本番性能: {gap_analysis['production_score']:.4f}")
        print(f"ギャップ: {gap_analysis['percentage_gap']:.1f}%")
    
    # ドリフト検出
    drift_detected = monitor.detect_performance_drift()
    print(f"\n性能劣化検出: {'要注意' if drift_detected else '正常'}")

cv_monitoring_framework()

まとめ:実務で使えるCV戦略

def final_cv_decision_framework():
    """最終的なCV手法選択フレームワーク"""
    
    def recommend_cv_strategy(data_characteristics):
        """データ特性に基づいてCV戦略を推奨"""
        
        recommendations = []
        
        # 時系列チェック
        if data_characteristics.get('is_time_series', False):
            if data_characteristics.get('has_groups', False):
                recommendations.append("Purged Group Time Series Split")
            else:
                recommendations.append("Time Series Split または Walk Forward Validation")
        
        # グループ構造チェック
        elif data_characteristics.get('has_groups', False):
            if data_characteristics.get('is_imbalanced', False):
                recommendations.append("Stratified Group K-Fold")
            else:
                recommendations.append("Group K-Fold")
        
        # 不均衡データチェック
        elif data_characteristics.get('is_imbalanced', False):
            recommendations.append("Stratified K-Fold")
        
        # 地理データチェック
        elif data_characteristics.get('has_spatial_correlation', False):
            recommendations.append("Spatial Group K-Fold (カスタム実装)")
        
        # デフォルト
        else:
            if data_characteristics.get('problem_type') == 'classification':
                recommendations.append("Stratified K-Fold")
            else:
                recommendations.append("K-Fold")
        
        # サンプルサイズ考慮
        if data_characteristics.get('sample_size', 1000) < 100:
            recommendations.append("LOOCV または 少ないフォールド数")
        
        return recommendations
    
    # 実例での推奨
    examples = [
        {
            'name': '株価予測',
            'characteristics': {
                'is_time_series': True,
                'has_groups': True,  # 銘柄グループ
                'sample_size': 10000,
                'problem_type': 'regression'
            }
        },
        {
            'name': '医療診断',
            'characteristics': {
                'is_time_series': False,
                'has_groups': True,  # 患者グループ
                'is_imbalanced': True,
                'sample_size': 5000,
                'problem_type': 'classification'
            }
        },
        {
            'name': 'ECサイト需要予測',
            'characteristics': {
                'is_time_series': True,
                'has_groups': False,
                'sample_size': 100000,
                'problem_type': 'regression'
            }
        },
        {
            'name': '不動産価格予測',
            'characteristics': {
                'is_time_series': False,
                'has_groups': False,
                'has_spatial_correlation': True,
                'sample_size': 20000,
                'problem_type': 'regression'
            }
        }
    ]
    
    print("実務での CV 戦略推奨:")
    print("=" * 50)
    
    for example in examples:
        recommendations = recommend_cv_strategy(example['characteristics'])
        print(f"\n{example['name']}")
        for rec in recommendations:
            print(f"  推奨: {rec}")

final_cv_decision_framework()

print("\n" + "=" * 60)
print("実務でのCV選択:重要なポイント")
print("=" * 60)
print("""
1. データの構造を最優先で理解する
   - 時系列性、グループ構造、空間相関

2. ドメイン知識を活かす
   - 業界特有のデータ特性を考慮
   - ビジネス上の制約を反映

3. 計算コストと精度のバランス
   - 大規模データでは並列化・サンプリング
   - プロトタイプ段階では軽量CV

4. 継続的な監視と改善
   - CV性能と本番性能のギャップ監視
   - データドリフトへの対応

5. 失敗から学ぶ
   - データリークは必ず発生すると想定
   - 保守的な評価を心がける
""")

実務でクロスバリデーションを使いこなすには、データの性質を深く理解し、適切な手法を選択することが最も重要だと痛感している。特に時系列データでの失敗は本当に痛い教訓だった。

この記事で紹介した手法と考慮点を参考に、ぜひ自分のプロジェクトに適したCV戦略を構築してほしい。最初は保守的に始めて、徐々に高度な手法を取り入れていくのがおすすめだ。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?