はじめに
実務で機械学習をやっていると、クロスバリデーション(CV)の手法選択が思っている以上に重要だということに気づいた。
単純にK-Foldを使えばいいというものではなく、データの性質や業務要件に応じて適切な手法を選ばないと、本番環境で全く性能が出ないという痛い目に遭う。
今回、様々なプロジェクトで遭遇した課題を整理しながら、実務で本当に使えるCV手法を体系的に調べてまとめてみた。失敗事例も含めて共有する。
基本手法の実装と落とし穴
K-Fold:意外と奥が深い
まず基本のK-Foldから。シンプルに見えて、実はパラメータの選択次第で結果が大きく変わる。
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, cross_val_score, validation_curve
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# データ生成
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2,
n_informative=15, n_redundant=5, random_state=42)
def analyze_kfold_variations():
"""K-Foldの各種設定が結果に与える影響を調べてみた"""
model = RandomForestClassifier(n_estimators=100, random_state=42)
# shuffle有無の影響
kfold_no_shuffle = KFold(n_splits=5, shuffle=False)
kfold_shuffle = KFold(n_splits=5, shuffle=True, random_state=42)
scores_no_shuffle = cross_val_score(model, X, y, cv=kfold_no_shuffle)
scores_shuffle = cross_val_score(model, X, y, cv=kfold_shuffle)
print("Shuffle効果の検証:")
print(f"Shuffle無し: {scores_no_shuffle.mean():.4f} ± {scores_no_shuffle.std():.4f}")
print(f"Shuffle有り: {scores_shuffle.mean():.4f} ± {scores_shuffle.std():.4f}")
# K値の選択
k_values = [3, 5, 10, 20]
k_results = []
for k in k_values:
if k <= len(X): # K > サンプル数を防ぐ
kfold = KFold(n_splits=k, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=kfold)
k_results.append({
'k': k,
'mean_score': scores.mean(),
'std_score': scores.std(),
'scores': scores
})
print("\nK値の影響:")
for result in k_results:
print(f"K={result['k']}: {result['mean_score']:.4f} ± {result['std_score']:.4f}")
return k_results
k_analysis = analyze_kfold_variations()
実務での教訓: データがソートされている場合、shuffle=Falseだと偏ったCV結果になる。時系列以外では必ずshuffle=Trueにしている。
Stratified K-Fold:不均衡データでの必須ツール
ECサイトのコンバージョン予測(CV率1-3%)や機械の故障予測など、実務では不均衡データが圧倒的に多い。
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix
def imbalanced_data_experiment():
"""極度に不均衡なデータでCV手法を比較してみた"""
# 実際のECデータに近い不均衡データを生成
X_imb, y_imb = make_classification(
n_samples=10000, n_features=20, n_classes=2,
weights=[0.97, 0.03], # 3%のコンバージョン率
n_informative=15, random_state=42
)
print(f"クラス分布: {np.bincount(y_imb)} (比率: {np.bincount(y_imb)[1]/len(y_imb)*100:.1f}%)")
model = RandomForestClassifier(n_estimators=100, random_state=42,
class_weight='balanced') # 重要な設定
# 通常のK-Fold
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
# Stratified K-Fold
stratified = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
# 詳細比較
def detailed_cv_analysis(cv_method, method_name):
scores = []
fold_distributions = []
for fold, (train_idx, val_idx) in enumerate(cv_method.split(X_imb, y_imb)):
# 各フォールドのクラス分布
val_dist = np.bincount(y_imb[val_idx])
fold_distributions.append(val_dist[1] / len(val_idx) * 100)
# モデル学習・評価
model.fit(X_imb[train_idx], y_imb[train_idx])
score = model.score(X_imb[val_idx], y_imb[val_idx])
scores.append(score)
print(f"{method_name} Fold {fold+1}: Positive率={fold_distributions[-1]:.1f}%, Score={score:.4f}")
print(f"{method_name} 平均: {np.mean(scores):.4f} ± {np.std(scores):.4f}")
print(f"Positive率の分散: {np.std(fold_distributions):.2f}")
print()
return scores, fold_distributions
kfold_scores, kfold_dist = detailed_cv_analysis(kfold, "K-Fold")
stratified_scores, stratified_dist = detailed_cv_analysis(stratified, "Stratified")
return {
'kfold_scores': kfold_scores,
'stratified_scores': stratified_scores,
'kfold_dist': kfold_dist,
'stratified_dist': stratified_dist
}
imbalance_results = imbalanced_data_experiment()
実務での失敗談: 初期の頃、不均衡データで通常のK-Foldを使って「精度95%!」と喜んでいたが、実際は全部Negativeクラスを予測していただけだった。今は必ずStratifiedを使い、precision/recall/F1も見るようにしている。
時系列データのCV:最も難しい領域
金融データや需要予測など、時系列データのCVは本当に難しい。未来の情報を使ってしまうdata leakageが起きやすい。
Time Series Split:基本だが制約が多い
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import pandas as pd
def time_series_cv_comparison():
"""時系列CVの各種手法を比較してみた"""
# 株価っぽい時系列データを生成
np.random.seed(42)
n_days = 1000
dates = pd.date_range('2020-01-01', periods=n_days, freq='D')
# トレンド + 季節性 + ノイズ
trend = np.linspace(100, 200, n_days)
seasonal = 10 * np.sin(2 * np.pi * np.arange(n_days) / 365.25)
noise = np.random.normal(0, 5, n_days)
price = trend + seasonal + noise
# 特徴量作成(過去のラグ特徴量)
df = pd.DataFrame({'date': dates, 'price': price})
# ラグ特徴量
for lag in [1, 2, 3, 5, 10]:
df[f'price_lag_{lag}'] = df['price'].shift(lag)
# 移動平均
for window in [5, 10, 20]:
df[f'ma_{window}'] = df['price'].rolling(window=window).mean()
# 欠損値除去
df = df.dropna().reset_index(drop=True)
feature_cols = [col for col in df.columns if col not in ['date', 'price']]
X_ts = df[feature_cols].values
y_ts = df['price'].values
print(f"時系列データ形状: {X_ts.shape}")
# 1. 通常のK-Fold(間違った方法)
normal_scores = cross_val_score(LinearRegression(), X_ts, y_ts, cv=5,
scoring='neg_mean_squared_error')
# 2. Time Series Split
tscv = TimeSeriesSplit(n_splits=5)
ts_scores = cross_val_score(LinearRegression(), X_ts, y_ts, cv=tscv,
scoring='neg_mean_squared_error')
print("時系列CV比較(MSE):")
print(f"通常のK-Fold: {-normal_scores.mean():.2f} ± {normal_scores.std():.2f}")
print(f"Time Series Split: {-ts_scores.mean():.2f} ± {ts_scores.std():.2f}")
# 3. カスタムWalk Forward
def walk_forward_validation(X, y, test_size=100, min_train_size=200):
scores = []
for i in range(min_train_size, len(X) - test_size, test_size):
X_train, y_train = X[:i], y[:i]
X_test, y_test = X[i:i+test_size], y[i:i+test_size]
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = np.mean((y_test - y_pred) ** 2)
scores.append(mse)
return np.array(scores)
wf_scores = walk_forward_validation(X_ts, y_ts)
print(f"Walk Forward: {wf_scores.mean():.2f} ± {wf_scores.std():.2f}")
return df, normal_scores, ts_scores, wf_scores
ts_results = time_series_cv_comparison()
Purged Group Time Series Split:実務で重宝する手法
金融業界でよく使われる、より実践的な時系列CV手法を実装してみた。
class PurgedGroupTimeSeriesSplit:
"""
金融データ用の高度な時系列分割
- グループ(銘柄、顧客など)を考慮
- Purge期間でdata leakageを防止
"""
def __init__(self, n_splits=5, max_train_group_size=None,
group_gap=1, max_test_group_size=1):
self.n_splits = n_splits
self.max_train_group_size = max_train_group_size
self.group_gap = group_gap
self.max_test_group_size = max_test_group_size
def split(self, X, y=None, groups=None):
if groups is None:
raise ValueError("Groups must be provided")
unique_groups = np.unique(groups)
n_groups = len(unique_groups)
# テストグループのサイズ
test_size = max(1, n_groups // self.n_splits)
for i in range(self.n_splits):
# テスト期間
test_group_start = i * test_size
test_group_end = min(test_group_start + self.max_test_group_size, n_groups)
test_groups = unique_groups[test_group_start:test_group_end]
# 訓練期間(purge期間を考慮)
train_group_end = max(0, test_group_start - self.group_gap)
if self.max_train_group_size:
train_group_start = max(0, train_group_end - self.max_train_group_size)
else:
train_group_start = 0
train_groups = unique_groups[train_group_start:train_group_end]
# インデックス取得
train_idx = np.where(np.isin(groups, train_groups))[0]
test_idx = np.where(np.isin(groups, test_groups))[0]
if len(train_idx) > 0 and len(test_idx) > 0:
yield train_idx, test_idx
# 使用例:株式データ
def stock_prediction_cv():
"""株式予測でPurged Time Series CVを試してみた"""
# 模擬株式データ
n_days = 500
n_stocks = 50
# 日付ベースのグループ
dates = pd.date_range('2022-01-01', periods=n_days, freq='D')
date_groups = np.arange(n_days)
# 特徴量とターゲット
X_stock = np.random.randn(n_days, 10) # 技術指標など
y_stock = np.random.randn(n_days) # リターン
# 通常のTime Series Splitと比較
tscv = TimeSeriesSplit(n_splits=5)
ptscv = PurgedGroupTimeSeriesSplit(n_splits=5, group_gap=5) # 5日のpurge期間
ts_scores = cross_val_score(LinearRegression(), X_stock, y_stock, cv=tscv,
scoring='neg_mean_squared_error')
# Purged版(手動実装)
purged_scores = []
for train_idx, test_idx in ptscv.split(X_stock, y_stock, groups=date_groups):
model = LinearRegression()
model.fit(X_stock[train_idx], y_stock[train_idx])
score = model.score(X_stock[test_idx], y_stock[test_idx])
purged_scores.append(score)
print("金融データCV比較:")
print(f"通常Time Series: {-ts_scores.mean():.4f} ± {ts_scores.std():.4f}")
print(f"Purged Time Series: {np.mean(purged_scores):.4f} ± {np.std(purged_scores):.4f}")
stock_prediction_cv()
実務での教訓: 株式データでは、決算発表やニュース発表後の価格変動が数日続くことがある。単純なTime Series Splitだと、この影響がテストデータに漏れてしまう。Purge期間を設けることで、より現実的な評価ができるようになった。
Group K-Fold:データリークの救世主
画像認識や医療データなど、同一対象から複数サンプルが得られるデータでは必須。
from sklearn.model_selection import GroupKFold, LeaveOneGroupOut
def group_cv_medical_example():
"""医療データでのGroup CV実装例"""
# 医療データの模擬(患者ごとに複数の検査結果)
n_patients = 100
n_tests_per_patient = 15
# 患者ID(グループ)
patient_ids = np.repeat(np.arange(n_patients), n_tests_per_patient)
# 特徴量(血液検査値など)
X_medical = np.random.randn(len(patient_ids), 20)
# 患者レベルの疾患有無(同一患者は同じラベル)
patient_labels = np.random.choice([0, 1], n_patients, p=[0.7, 0.3])
y_medical = np.repeat(patient_labels, n_tests_per_patient)
print(f"患者数: {n_patients}")
print(f"総サンプル数: {len(X_medical)}")
print(f"疾患有病率: {patient_labels.mean():.1%}")
# 1. 通常のStratified K-Fold(間違った方法)
stratified = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
normal_scores = cross_val_score(RandomForestClassifier(random_state=42),
X_medical, y_medical, cv=stratified)
# 2. Group K-Fold(正しい方法)
group_kfold = GroupKFold(n_splits=5)
group_scores = cross_val_score(RandomForestClassifier(random_state=42),
X_medical, y_medical, cv=group_kfold,
groups=patient_ids)
# 3. Leave One Group Out(患者一人ずつ)
logo = LeaveOneGroupOut()
# LOGOは時間がかかるので、サンプル版
sample_patients = np.random.choice(np.unique(patient_ids), 10, replace=False)
sample_mask = np.isin(patient_ids, sample_patients)
sample_scores = cross_val_score(RandomForestClassifier(random_state=42),
X_medical[sample_mask], y_medical[sample_mask],
cv=logo, groups=patient_ids[sample_mask])
print("\n医療データCV比較:")
print(f"Stratified K-Fold: {normal_scores.mean():.4f} ± {normal_scores.std():.4f}")
print(f"Group K-Fold: {group_scores.mean():.4f} ± {group_scores.std():.4f}")
print(f"Leave One Group Out (sample): {sample_scores.mean():.4f} ± {sample_scores.std():.4f}")
# データリークの検証
print("\nデータリーク検証:")
for fold, (train_idx, val_idx) in enumerate(stratified.split(X_medical, y_medical)):
train_patients = set(patient_ids[train_idx])
val_patients = set(patient_ids[val_idx])
overlap = len(train_patients & val_patients)
print(f"Stratified Fold {fold+1}: 患者重複数 = {overlap}")
if fold == 2: # 最初の3フォールドだけ表示
break
print()
for fold, (train_idx, val_idx) in enumerate(group_kfold.split(X_medical, y_medical, patient_ids)):
train_patients = set(patient_ids[train_idx])
val_patients = set(patient_ids[val_idx])
overlap = len(train_patients & val_patients)
print(f"Group K-Fold {fold+1}: 患者重複数 = {overlap}")
if fold == 2:
break
group_cv_medical_example()
実務での痛い経験: 顔認識プロジェクトで同一人物の複数写真が訓練・テストに分かれてしまい、異常に高い精度が出た。本番では全く使えなかった。今はGroup CVを徹底している。
高度なCV手法:実務で差がつくテクニック
Nested Cross Validation:ハイパーパラメータ調整の正しい評価
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
def nested_cv_proper_implementation():
"""Nested CVの正しい実装を検証してみた"""
# データ準備
X, y = make_classification(n_samples=500, n_features=20, n_classes=2, random_state=42)
# パラメータグリッド
param_grid = {
'C': [0.01, 0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1],
'kernel': ['rbf', 'linear']
}
# 間違った方法:同じデータでグリッドサーチと評価
grid_search = GridSearchCV(SVC(), param_grid, cv=5, scoring='accuracy')
grid_search.fit(X, y)
wrong_score = grid_search.best_score_
# 正しい方法:Nested CV
outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
inner_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
nested_scores = []
best_params_list = []
for fold, (train_outer, test_outer) in enumerate(outer_cv.split(X, y)):
X_train_outer, X_test_outer = X[train_outer], X[test_outer]
y_train_outer, y_test_outer = y[train_outer], y[test_outer]
# 内側でハイパーパラメータ調整
inner_grid = GridSearchCV(SVC(), param_grid, cv=inner_cv, scoring='accuracy')
inner_grid.fit(X_train_outer, y_train_outer)
# 外側でテスト
test_score = inner_grid.score(X_test_outer, y_test_outer)
nested_scores.append(test_score)
best_params_list.append(inner_grid.best_params_)
print(f"Outer Fold {fold+1}: {test_score:.4f}, Best params: {inner_grid.best_params_}")
print(f"\n間違った評価: {wrong_score:.4f}")
print(f"正しいNested CV評価: {np.mean(nested_scores):.4f} ± {np.std(nested_scores):.4f}")
# パラメータの安定性確認
param_stability = {}
for param in param_grid.keys():
values = [params[param] for params in best_params_list]
param_stability[param] = len(set(values))
print("\nパラメータ選択の安定性:")
for param, unique_count in param_stability.items():
print(f"{param}: {unique_count}/{len(best_params_list)} 種類の値が選択された")
nested_cv_proper_implementation()
Adversarial Validation:ドメイン適応の確認
def adversarial_validation_check():
"""本番データとの分布差をAdversarial Validationで検証してみた"""
# 訓練データ
X_train, _ = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
# 本番データ(わずかに分布が異なる)
X_prod = make_classification(n_samples=500, n_features=20,
n_informative=15, random_state=123)[0]
X_prod = X_prod * 1.2 + 0.1 # 分布をわずかにシフト
# Adversarial Validationデータセット作成
X_adv = np.vstack([X_train, X_prod])
y_adv = np.hstack([np.zeros(len(X_train)), np.ones(len(X_prod))])
# 分布差の検出
adv_scores = cross_val_score(RandomForestClassifier(random_state=42),
X_adv, y_adv, cv=5)
print("Adversarial Validation結果:")
print(f"AUC: {adv_scores.mean():.4f} ± {adv_scores.std():.4f}")
if adv_scores.mean() > 0.7:
print("⚠️ 訓練データと本番データの分布に大きな差があります")
print("CVの結果は本番性能を正しく反映しない可能性があります")
else:
print("✅ 訓練データと本番データの分布は似ています")
# 特徴量レベルでの分析
from sklearn.feature_selection import mutual_info_classif
feature_importance = mutual_info_classif(X_adv, y_adv, random_state=42)
important_features = np.argsort(feature_importance)[-5:]
print(f"\n分布差の主要因子: 特徴量 {important_features}")
adversarial_validation_check()
Stratified Group K-Fold:究極のバランス型CV
from sklearn.model_selection import StratifiedGroupKFold
def stratified_group_kfold_example():
"""クラス比率とグループの両方を考慮したCVを試してみた"""
# 複雑なデータ設定:店舗(グループ)ごとの顧客データ(不均衡)
n_stores = 20
customers_per_store = np.random.randint(30, 100, n_stores)
store_ids = []
X_store = []
y_store = []
for store_id in range(n_stores):
n_customers = customers_per_store[store_id]
# 店舗ごとに異なるコンバージョン率
store_conversion_rate = np.random.uniform(0.02, 0.15)
# 顧客データ
X_customers = np.random.randn(n_customers, 15)
y_customers = np.random.choice([0, 1], n_customers,
p=[1-store_conversion_rate, store_conversion_rate])
store_ids.extend([store_id] * n_customers)
X_store.append(X_customers)
y_store.extend(y_customers)
X_store = np.vstack(X_store)
y_store = np.array(y_store)
store_groups = np.array(store_ids)
print(f"総サンプル数: {len(X_store)}")
print(f"店舗数: {len(np.unique(store_groups))}")
print(f"全体コンバージョン率: {y_store.mean():.3f}")
# 各CV手法の比較
cv_methods = {
'StratifiedKFold': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
'GroupKFold': GroupKFold(n_splits=5),
'StratifiedGroupKFold': StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
}
for name, cv_method in cv_methods.items():
if name == 'StratifiedGroupKFold':
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_store, y_store, cv=cv_method, groups=store_groups)
elif name == 'GroupKFold':
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_store, y_store, cv=cv_method, groups=store_groups)
else:
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_store, y_store, cv=cv_method)
print(f"{name}: {scores.mean():.4f} ± {scores.std():.4f}")
# フォールドごとの詳細分析
fold_details = []
if name == 'StratifiedGroupKFold':
splits = cv_method.split(X_store, y_store, groups=store_groups)
elif name == 'GroupKFold':
splits = cv_method.split(X_store, y_store, groups=store_groups)
else:
splits = cv_method.split(X_store, y_store)
for fold, (train_idx, val_idx) in enumerate(splits):
val_conversion = y_store[val_idx].mean()
if name in ['GroupKFold', 'StratifiedGroupKFold']:
val_stores = len(np.unique(store_groups[val_idx]))
fold_details.append(f" Fold {fold+1}: CV率={val_conversion:.3f}, 店舗数={val_stores}")
else:
fold_details.append(f" Fold {fold+1}: CV率={val_conversion:.3f}")
for detail in fold_details[:3]: # 最初の3フォールドのみ表示
print(detail)
print()
stratified_group_kfold_example()
特殊ドメインでのCV戦略
地理データ:空間相関を考慮したCV
def geographical_cv_implementation():
"""地理データでの空間相関を考慮したCVを実装してみた"""
# 不動産価格予測を想定した地理データ
n_properties = 2000
# 緯度・経度(東京都内を想定)
lat_range = (35.6, 35.8)
lon_range = (139.6, 139.8)
np.random.seed(42)
latitudes = np.random.uniform(lat_range[0], lat_range[1], n_properties)
longitudes = np.random.uniform(lon_range[0], lon_range[1], n_properties)
coordinates = np.column_stack([latitudes, longitudes])
# 特徴量(面積、築年数など)
X_geo = np.random.randn(n_properties, 10)
# 価格(地理的にクラスタ化された傾向を持つ)
from sklearn.cluster import KMeans
geo_clusters = KMeans(n_clusters=5, random_state=42).fit_predict(coordinates)
cluster_effects = np.random.uniform(0.8, 1.2, 5) # クラスタごとの価格倍率
base_price = 100 + X_geo[:, 0] * 20 + X_geo[:, 1] * 15 # 基本価格
y_geo = base_price * cluster_effects[geo_clusters] + np.random.normal(0, 10, n_properties)
print(f"不動産データ: {n_properties}件")
print(f"地理クラスタ数: {len(np.unique(geo_clusters))}")
# 1. 通常のK-Fold(空間相関を無視)
normal_scores = cross_val_score(RandomForestRegressor(random_state=42),
X_geo, y_geo, cv=5, scoring='r2')
# 2. 地理クラスタベースのGroup CV
cluster_cv = GroupKFold(n_splits=5)
cluster_scores = cross_val_score(RandomForestRegressor(random_state=42),
X_geo, y_geo, cv=cluster_cv,
groups=geo_clusters, scoring='r2')
# 3. 距離ベースの分割
def distance_based_split(coordinates, n_splits=5):
"""距離ベースでデータを分割"""
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_splits, random_state=42)
spatial_groups = kmeans.fit_predict(coordinates)
for test_group in range(n_splits):
train_idx = np.where(spatial_groups != test_group)[0]
test_idx = np.where(spatial_groups == test_group)[0]
yield train_idx, test_idx
distance_scores = []
for train_idx, test_idx in distance_based_split(coordinates):
model = RandomForestRegressor(random_state=42)
model.fit(X_geo[train_idx], y_geo[train_idx])
score = model.score(X_geo[test_idx], y_geo[test_idx])
distance_scores.append(score)
print("\n地理データCV比較(R²):")
print(f"通常K-Fold: {normal_scores.mean():.4f} ± {normal_scores.std():.4f}")
print(f"クラスタGroup CV: {cluster_scores.mean():.4f} ± {cluster_scores.std():.4f}")
print(f"距離ベース分割: {np.mean(distance_scores):.4f} ± {np.std(distance_scores):.4f}")
# 空間相関の可視化
def plot_spatial_correlation():
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.subplot(131)
plt.scatter(longitudes, latitudes, c=y_geo, cmap='viridis', alpha=0.6)
plt.title('Price Distribution')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.colorbar()
plt.subplot(132)
plt.scatter(longitudes, latitudes, c=geo_clusters, cmap='tab10', alpha=0.6)
plt.title('Geographic Clusters')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.subplot(133)
# CV分割の例(1つのフォールド)
train_idx, test_idx = next(distance_based_split(coordinates))
plt.scatter(longitudes[train_idx], latitudes[train_idx],
c='blue', alpha=0.6, label='Train')
plt.scatter(longitudes[test_idx], latitudes[test_idx],
c='red', alpha=0.6, label='Test')
plt.title('CV Split Example')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.legend()
plt.tight_layout()
plt.show()
# plot_spatial_correlation() # 実際の環境では有効化
geographical_cv_implementation()
製造業:センサーデータの時空間CV
def manufacturing_sensor_cv():
"""製造業のセンサーデータでの複合的CVを実装してみた"""
# 工場の複数ラインからのセンサーデータ
n_lines = 8
n_days = 200
readings_per_day = 24 # 1時間ごと
line_ids = []
timestamps = []
X_sensor = []
y_defect = []
for line_id in range(n_lines):
# ライン固有の特性
line_baseline_temp = 200 + np.random.normal(0, 10)
line_defect_rate = np.random.uniform(0.01, 0.05)
for day in range(n_days):
for hour in range(readings_per_day):
timestamp = day * 24 + hour
# センサー値(温度、圧力、振動など)
temp = line_baseline_temp + np.random.normal(0, 5)
pressure = 50 + np.random.normal(0, 2)
vibration = 0.5 + np.random.normal(0, 0.1)
# 時間的なドリフト
temp += timestamp * 0.001 # 徐々に上昇
sensors = [temp, pressure, vibration] + list(np.random.randn(7))
# 不良品発生(センサー値に依存)
defect_prob = line_defect_rate
if temp > line_baseline_temp + 10:
defect_prob *= 3
if vibration > 0.7:
defect_prob *= 2
is_defect = np.random.random() < defect_prob
line_ids.append(line_id)
timestamps.append(timestamp)
X_sensor.append(sensors)
y_defect.append(int(is_defect))
X_sensor = np.array(X_sensor)
y_defect = np.array(y_defect)
line_groups = np.array(line_ids)
time_groups = np.array(timestamps) // 24 # 日単位でグループ化
print(f"製造データ: {len(X_sensor)}レコード")
print(f"ライン数: {len(np.unique(line_groups))}")
print(f"日数: {len(np.unique(time_groups))}")
print(f"不良率: {np.mean(y_defect):.3f}")
# 複数のCV戦略を比較
cv_strategies = {
'Random': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
'ByLine': GroupKFold(n_splits=5), # ライン単位
'ByTime': GroupKFold(n_splits=5), # 時間単位
'TimeSeries': TimeSeriesSplit(n_splits=5)
}
print("\n製造データCV比較:")
for strategy_name, cv in cv_strategies.items():
if strategy_name == 'ByLine':
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_sensor, y_defect, cv=cv, groups=line_groups)
elif strategy_name == 'ByTime':
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_sensor, y_defect, cv=cv, groups=time_groups)
else:
scores = cross_val_score(RandomForestClassifier(random_state=42),
X_sensor, y_defect, cv=cv)
print(f"{strategy_name}: {scores.mean():.4f} ± {scores.std():.4f}")
manufacturing_sensor_cv()
計算コスト最適化:大規模データでの実践的テクニック
実務では数百万〜数千万レコードのデータを扱うことも多く、CVの計算コストが問題になる。
import time
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import psutil
import os
def computational_cost_analysis():
"""大規模データでのCV計算コスト最適化を検証してみた"""
# データサイズを変えて計算時間を測定
data_sizes = [1000, 5000, 10000, 50000]
models = {
'LogisticRegression': LogisticRegression(max_iter=1000),
'RandomForest': RandomForestClassifier(n_estimators=50, n_jobs=1)
}
results = []
for size in data_sizes:
print(f"\nデータサイズ: {size:,}")
X, y = make_classification(n_samples=size, n_features=20,
n_classes=2, random_state=42)
for model_name, model in models.items():
# メモリ使用量測定開始
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / 1024 / 1024 # MB
# CV実行
start_time = time.time()
scores = cross_val_score(model, X, y, cv=5)
end_time = time.time()
# メモリ使用量測定終了
mem_after = process.memory_info().rss / 1024 / 1024 # MB
results.append({
'size': size,
'model': model_name,
'time': end_time - start_time,
'memory_used': mem_after - mem_before,
'score': scores.mean(),
'score_std': scores.std()
})
print(f" {model_name}: {end_time - start_time:.2f}秒, "
f"メモリ: {mem_after - mem_before:.1f}MB, "
f"精度: {scores.mean():.4f}")
return results
# 実行(時間がかかるのでコメントアウト)
# cost_results = computational_cost_analysis()
def cv_optimization_techniques():
"""CV計算の最適化テクニックを試してみた"""
# 中規模データでテスト
X, y = make_classification(n_samples=20000, n_features=50, random_state=42)
print("CV最適化テクニック比較:")
# 1. 標準的なCV
start_time = time.time()
standard_scores = cross_val_score(RandomForestClassifier(n_estimators=100, random_state=42),
X, y, cv=5)
standard_time = time.time() - start_time
print(f"標準CV: {standard_time:.2f}秒, 精度: {standard_scores.mean():.4f}")
# 2. 並列化
start_time = time.time()
parallel_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
X, y, cv=5)
parallel_time = time.time() - start_time
print(f"並列化CV: {parallel_time:.2f}秒, 精度: {parallel_scores.mean():.4f}")
# 3. 少ないフォールド数
start_time = time.time()
fewer_folds_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
X, y, cv=3)
fewer_folds_time = time.time() - start_time
print(f"3-Fold CV: {fewer_folds_time:.2f}秒, 精度: {fewer_folds_scores.mean():.4f}")
# 4. サンプリング + CV
sample_size = 10000
sample_idx = np.random.choice(len(X), sample_size, replace=False)
X_sample, y_sample = X[sample_idx], y[sample_idx]
start_time = time.time()
sample_scores = cross_val_score(RandomForestClassifier(n_estimators=100, n_jobs=-1, random_state=42),
X_sample, y_sample, cv=5)
sample_time = time.time() - start_time
print(f"サンプリングCV: {sample_time:.2f}秒, 精度: {sample_scores.mean():.4f}")
# 5. 軽量モデル + CV
start_time = time.time()
light_scores = cross_val_score(LogisticRegression(max_iter=1000),
X, y, cv=5)
light_time = time.time() - start_time
print(f"軽量モデルCV: {light_time:.2f}秒, 精度: {light_scores.mean():.4f}")
cv_optimization_techniques()
分散処理でのCV実装
def distributed_cv_simulation():
"""分散処理でのCV実装パターンを検証してみた"""
# 大規模データをシミュレート
n_samples = 100000
X_large, y_large = make_classification(n_samples=n_samples, n_features=30,
n_classes=2, random_state=42)
print(f"大規模データ: {n_samples:,} サンプル")
# 手動でのCV分散処理シミュレーション
def manual_distributed_cv(X, y, n_splits=5):
"""手動実装による分散CV(概念実証)"""
from sklearn.model_selection import KFold
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)
fold_results = []
for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
print(f" Fold {fold+1} 処理中...")
# 実際の分散環境では、ここで各ワーカーに処理を分散
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
# 軽量モデルで高速化
model = LogisticRegression(max_iter=500)
start_time = time.time()
model.fit(X_train, y_train)
score = model.score(X_val, y_val)
fold_time = time.time() - start_time
fold_results.append({
'fold': fold + 1,
'score': score,
'time': fold_time,
'train_size': len(X_train),
'val_size': len(X_val)
})
print(f" 完了: {fold_time:.2f}秒, 精度: {score:.4f}")
return fold_results
print("分散CV処理シミュレーション:")
distributed_results = manual_distributed_cv(X_large, y_large)
total_time = sum(r['time'] for r in distributed_results)
avg_score = np.mean([r['score'] for r in distributed_results])
print(f"\n総処理時間: {total_time:.2f}秒")
print(f"平均精度: {avg_score:.4f}")
print(f"並列化で理論的には {total_time/5:.2f}秒まで短縮可能")
distributed_cv_simulation()
失敗事例から学ぶ:時系列データでの大失敗
実際に経験した痛い失敗事例を共有する。
def time_series_failure_case():
"""時系列データでの大失敗事例を再現してみた"""
print("=== 実際にやらかした失敗事例 ===")
print("プロジェクト: ECサイトの需要予測システム")
print("データ: 2年分の日次売上データ")
print("目標: 来月の売上を予測するモデル構築")
print()
# 実際のデータに近い時系列データを生成
np.random.seed(42)
n_days = 730 # 2年分
dates = pd.date_range('2022-01-01', periods=n_days, freq='D')
# 売上の生成(トレンド + 季節性 + イベント効果)
base_trend = np.linspace(1000, 1500, n_days) # 成長トレンド
seasonal = 200 * np.sin(2 * np.pi * np.arange(n_days) / 365.25) # 年次季節性
weekly = 100 * np.sin(2 * np.pi * np.arange(n_days) / 7) # 週次パターン
# 特殊イベント(セール期間など)
event_days = np.random.choice(n_days, 20, replace=False)
event_effect = np.zeros(n_days)
event_effect[event_days] = np.random.uniform(500, 1000, 20)
# ノイズ
noise = np.random.normal(0, 50, n_days)
sales = base_trend + seasonal + weekly + event_effect + noise
sales = np.maximum(sales, 0) # 負の売上は除外
# 特徴量作成
df = pd.DataFrame({'date': dates, 'sales': sales})
df['day_of_week'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
df['day_of_month'] = df['date'].dt.day
# ラグ特徴量
for lag in [1, 7, 14, 30]:
df[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# 移動平均
for window in [7, 14, 30]:
df[f'sales_ma_{window}'] = df['sales'].rolling(window=window).mean()
df = df.dropna().reset_index(drop=True)
feature_cols = [col for col in df.columns if col not in ['date', 'sales']]
X_ts = df[feature_cols].values
y_ts = df['sales'].values
print(f"データ形状: {X_ts.shape}")
print(f"期間: {df['date'].min()} - {df['date'].max()}")
print()
# === 失敗パターン1: 通常のK-FoldでCV ===
print("【失敗パターン1】通常のK-FoldでCV評価")
print("当時の考え: 「CVで高い精度が出てるから大丈夫だろう」")
wrong_cv = KFold(n_splits=5, shuffle=True, random_state=42)
wrong_scores = cross_val_score(RandomForestRegressor(n_estimators=100, random_state=42),
X_ts, y_ts, cv=wrong_cv, scoring='neg_mean_absolute_error')
print(f"K-Fold CV MAE: {-wrong_scores.mean():.2f} ± {wrong_scores.std():.2f}")
print("結果: とても良い精度が出た!")
print()
# === 現実的な評価: Time Series Split ===
print("【正しい評価】Time Series Splitで再評価")
tscv = TimeSeriesSplit(n_splits=5)
correct_scores = cross_val_score(RandomForestRegressor(n_estimators=100, random_state=42),
X_ts, y_ts, cv=tscv, scoring='neg_mean_absolute_error')
print(f"Time Series CV MAE: {-correct_scores.mean():.2f} ± {correct_scores.std():.2f}")
print()
# === 失敗の原因分析 ===
print("【失敗の原因分析】")
print("1. 未来の情報を使ったdata leakage")
print("2. 時系列の構造を無視した評価")
print("3. 季節性やトレンドの過学習")
print()
# data leakageの実証
print("【Data Leakageの実証】")
fold_analysis = []
for fold, (train_idx, val_idx) in enumerate(wrong_cv.split(X_ts)):
train_dates = df.iloc[train_idx]['date']
val_dates = df.iloc[val_idx]['date']
# 時系列の混在をチェック
future_in_train = (train_dates.max() > val_dates.min())
past_in_val = (val_dates.min() < train_dates.max())
fold_analysis.append({
'fold': fold + 1,
'train_period': f"{train_dates.min().strftime('%Y-%m')} - {train_dates.max().strftime('%Y-%m')}",
'val_period': f"{val_dates.min().strftime('%Y-%m')} - {val_dates.max().strftime('%Y-%m')}",
'leakage': future_in_train and past_in_val
})
for analysis in fold_analysis:
print(f"Fold {analysis['fold']}: Train={analysis['train_period']}, Val={analysis['val_period']}, Leakage={analysis['leakage']}")
print()
print("【本番での結果】")
print("実装後の実際の性能: 予測MAEが3-4倍悪化")
print("原因: CVで見た性能は未来の情報を使った虚偽の高精度")
print("対策: Time Series CVによる正しい評価を導入")
# === 改善後のワークフロー ===
print("\n【改善後のワークフロー】")
# より現実的なWalk Forward Validation
def walk_forward_validation_detailed(X, y, dates, initial_train_size=300,
step_size=30, max_test_size=30):
scores = []
predictions = []
for i in range(initial_train_size, len(X) - max_test_size, step_size):
# 訓練データ:過去のデータのみ
X_train, y_train = X[:i], y[:i]
# テストデータ:次の期間
test_end = min(i + max_test_size, len(X))
X_test, y_test = X[i:test_end], y[i:test_end]
test_dates = dates.iloc[i:test_end]
# モデル訓練
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 予測
y_pred = model.predict(X_test)
# 評価
mae = np.mean(np.abs(y_test - y_pred))
scores.append(mae)
predictions.extend(list(zip(test_dates, y_test, y_pred)))
print(f"期間 {test_dates.min().strftime('%Y-%m-%d')} - {test_dates.max().strftime('%Y-%m-%d')}: MAE = {mae:.2f}")
return scores, predictions
wf_scores, wf_predictions = walk_forward_validation_detailed(X_ts, y_ts, df['date'])
print(f"\nWalk Forward Validation MAE: {np.mean(wf_scores):.2f} ± {np.std(wf_scores):.2f}")
print("これが実際の本番性能に近い値")
time_series_failure_case()
業界別CV戦略まとめ
def industry_specific_cv_guide():
"""業界別CV戦略ガイドを整理してみた"""
cv_strategies = {
"金融・投資": {
"データ特性": "時系列、高次元、ノイズ多、レジーム変化",
"推奨CV": "Purged Group Time Series Split",
"注意点": "Look-ahead bias、Market regime変化",
"実装例": "PurgedGroupTimeSeriesSplit(group_gap=5, max_train_group_size=252)"
},
"Eコマース・小売": {
"データ特性": "季節性、プロモーション効果、顧客セグメント",
"推奨CV": "Time Series Split + Stratified Group",
"注意点": "季節性の学習、プロモーション期間の扱い",
"実装例": "TimeSeriesSplit + 顧客グループでの検証"
},
"医療・ヘルスケア": {
"データ特性": "患者グループ、不均衡、倫理的制約",
"推奨CV": "Stratified Group K-Fold",
"注意点": "患者間リーク、病院間の違い",
"実装例": "StratifiedGroupKFold(groups=patient_id)"
},
"製造業・IoT": {
"データ特性": "センサー時系列、設備グループ、ドリフト",
"推奨CV": "Group Time Series Split",
"注意点": "設備間の違い、時間的ドリフト、メンテナンス期間",
"実装例": "GroupKFold(groups=equipment_id) + 時系列考慮"
},
"不動産・地理": {
"データ特性": "空間相関、地域特性、時系列トレンド",
"推奨CV": "Spatial Group K-Fold",
"注意点": "近隣物件の相関、地域経済の影響",
"実装例": "地理的クラスタリング + GroupKFold"
},
"画像・コンピュータビジョン": {
"データ特性": "同一対象の複数画像、データ拡張",
"推奨CV": "Group K-Fold (対象別)",
"注意点": "同一人物・物体の画像リーク、データ拡張の扱い",
"実装例": "GroupKFold(groups=subject_id)"
},
"自然言語処理": {
"データ特性": "文書グループ、ドメイン差、時間的変化",
"推奨CV": "Stratified Group K-Fold",
"注意点": "同一著者・ソースからのリーク、言語進化",
"実装例": "StratifiedGroupKFold(groups=document_source)"
}
}
print("業界別CV戦略ガイド:")
print("=" * 50)
for industry, strategy in cv_strategies.items():
print(f"\n【{industry}】")
print(f"データ特性: {strategy['データ特性']}")
print(f"推奨CV: {strategy['推奨CV']}")
print(f"注意点: {strategy['注意点']}")
print(f"実装例: {strategy['実装例']}")
industry_specific_cv_guide()
実装チェックリスト:本番運用前の確認項目
def cv_implementation_checklist():
"""本番運用前のCV実装チェックリストを作成してみた"""
checklist = {
"データ理解": [
"データの時間的構造を理解しているか",
"グループ構造(顧客、患者、機器など)を特定したか",
"クラス不均衡の程度を確認したか",
"地理的・空間的相関があるか確認したか",
"外れ値やデータ品質の問題を把握したか"
],
"CV手法選択": [
"時系列データで未来情報リークを防いでいるか",
"グループ構造を適切に考慮しているか",
"不均衡データでStratifiedを使用しているか",
"空間相関を考慮したか(該当する場合)",
"業界特有のドメイン知識を反映したか"
],
"実装検証": [
"各フォールドでデータリークが発生していないか",
"フォールド間でクラス分布が適切に保たれているか",
"グループの重複がないか確認したか",
"時系列順序が保たれているか(該当する場合)",
"計算時間が許容範囲内か"
],
"結果解釈": [
"CV結果と本番性能のギャップを理解しているか",
"フォールド間の性能バラツキを分析したか",
"複数の評価指標で検証したか",
"統計的有意性を確認したか",
"ベースラインモデルと比較したか"
],
"本番対応": [
"本番データでAdversarial Validationを実施したか",
"モデル更新時のCV戦略を定義したか",
"性能劣化の監視方法を設計したか",
"A/Bテストでの検証計画があるか",
"障害時のフォールバック戦略があるか"
]
}
print("CV実装チェックリスト:")
print("=" * 40)
for category, items in checklist.items():
print(f"\n【{category}】")
for i, item in enumerate(items, 1):
print(f" {i}. □ {item}")
print("\n" + "=" * 40)
print("全項目をチェックしてから本番運用を開始してください")
cv_implementation_checklist()
性能監視と継続的改善
def cv_monitoring_framework():
"""CV性能の継続的監視フレームワークを構築してみた"""
class CVPerformanceMonitor:
def __init__(self):
self.cv_history = []
self.production_history = []
def log_cv_performance(self, cv_scores, cv_method, model_version):
"""CV性能をログ"""
self.cv_history.append({
'timestamp': pd.Timestamp.now(),
'cv_mean': np.mean(cv_scores),
'cv_std': np.std(cv_scores),
'cv_method': cv_method,
'model_version': model_version,
'individual_scores': cv_scores
})
def log_production_performance(self, actual_score, prediction_period):
"""本番性能をログ"""
self.production_history.append({
'timestamp': pd.Timestamp.now(),
'actual_score': actual_score,
'period': prediction_period
})
def calculate_cv_production_gap(self, lookback_days=30):
"""CV性能と本番性能のギャップを分析"""
if not self.cv_history or not self.production_history:
return None
# 最近のCV性能
recent_cv = [h for h in self.cv_history
if (pd.Timestamp.now() - h['timestamp']).days <= lookback_days]
# 最近の本番性能
recent_prod = [h for h in self.production_history
if (pd.Timestamp.now() - h['timestamp']).days <= lookback_days]
if not recent_cv or not recent_prod:
return None
avg_cv_score = np.mean([h['cv_mean'] for h in recent_cv])
avg_prod_score = np.mean([h['actual_score'] for h in recent_prod])
gap = abs(avg_cv_score - avg_prod_score)
gap_percentage = gap / avg_cv_score * 100
return {
'cv_score': avg_cv_score,
'production_score': avg_prod_score,
'absolute_gap': gap,
'percentage_gap': gap_percentage,
'samples': len(recent_cv) + len(recent_prod)
}
def detect_performance_drift(self, threshold=0.05):
"""性能劣化の検出"""
if len(self.production_history) < 10:
return False
# 最近の性能 vs 過去の性能
recent_scores = [h['actual_score'] for h in self.production_history[-5:]]
past_scores = [h['actual_score'] for h in self.production_history[-15:-5]]
if len(past_scores) < 5:
return False
recent_mean = np.mean(recent_scores)
past_mean = np.mean(past_scores)
drift = abs(recent_mean - past_mean) / past_mean
return drift > threshold
# 使用例
monitor = CVPerformanceMonitor()
# CV性能をログ(模擬)
cv_scores = [0.85, 0.87, 0.84, 0.86, 0.85]
monitor.log_cv_performance(cv_scores, "StratifiedKFold", "v1.2")
# 本番性能をログ(模擬)
for i in range(10):
# 時間経過とともに性能が劣化する例
degraded_score = 0.85 - (i * 0.01) + np.random.normal(0, 0.02)
monitor.log_production_performance(degraded_score, f"week_{i+1}")
# ギャップ分析
gap_analysis = monitor.calculate_cv_production_gap()
if gap_analysis:
print("CV-本番性能ギャップ分析:")
print(f"CV性能: {gap_analysis['cv_score']:.4f}")
print(f"本番性能: {gap_analysis['production_score']:.4f}")
print(f"ギャップ: {gap_analysis['percentage_gap']:.1f}%")
# ドリフト検出
drift_detected = monitor.detect_performance_drift()
print(f"\n性能劣化検出: {'要注意' if drift_detected else '正常'}")
cv_monitoring_framework()
まとめ:実務で使えるCV戦略
def final_cv_decision_framework():
"""最終的なCV手法選択フレームワーク"""
def recommend_cv_strategy(data_characteristics):
"""データ特性に基づいてCV戦略を推奨"""
recommendations = []
# 時系列チェック
if data_characteristics.get('is_time_series', False):
if data_characteristics.get('has_groups', False):
recommendations.append("Purged Group Time Series Split")
else:
recommendations.append("Time Series Split または Walk Forward Validation")
# グループ構造チェック
elif data_characteristics.get('has_groups', False):
if data_characteristics.get('is_imbalanced', False):
recommendations.append("Stratified Group K-Fold")
else:
recommendations.append("Group K-Fold")
# 不均衡データチェック
elif data_characteristics.get('is_imbalanced', False):
recommendations.append("Stratified K-Fold")
# 地理データチェック
elif data_characteristics.get('has_spatial_correlation', False):
recommendations.append("Spatial Group K-Fold (カスタム実装)")
# デフォルト
else:
if data_characteristics.get('problem_type') == 'classification':
recommendations.append("Stratified K-Fold")
else:
recommendations.append("K-Fold")
# サンプルサイズ考慮
if data_characteristics.get('sample_size', 1000) < 100:
recommendations.append("LOOCV または 少ないフォールド数")
return recommendations
# 実例での推奨
examples = [
{
'name': '株価予測',
'characteristics': {
'is_time_series': True,
'has_groups': True, # 銘柄グループ
'sample_size': 10000,
'problem_type': 'regression'
}
},
{
'name': '医療診断',
'characteristics': {
'is_time_series': False,
'has_groups': True, # 患者グループ
'is_imbalanced': True,
'sample_size': 5000,
'problem_type': 'classification'
}
},
{
'name': 'ECサイト需要予測',
'characteristics': {
'is_time_series': True,
'has_groups': False,
'sample_size': 100000,
'problem_type': 'regression'
}
},
{
'name': '不動産価格予測',
'characteristics': {
'is_time_series': False,
'has_groups': False,
'has_spatial_correlation': True,
'sample_size': 20000,
'problem_type': 'regression'
}
}
]
print("実務での CV 戦略推奨:")
print("=" * 50)
for example in examples:
recommendations = recommend_cv_strategy(example['characteristics'])
print(f"\n【{example['name']}】")
for rec in recommendations:
print(f" 推奨: {rec}")
final_cv_decision_framework()
print("\n" + "=" * 60)
print("実務でのCV選択:重要なポイント")
print("=" * 60)
print("""
1. データの構造を最優先で理解する
- 時系列性、グループ構造、空間相関
2. ドメイン知識を活かす
- 業界特有のデータ特性を考慮
- ビジネス上の制約を反映
3. 計算コストと精度のバランス
- 大規模データでは並列化・サンプリング
- プロトタイプ段階では軽量CV
4. 継続的な監視と改善
- CV性能と本番性能のギャップ監視
- データドリフトへの対応
5. 失敗から学ぶ
- データリークは必ず発生すると想定
- 保守的な評価を心がける
""")
実務でクロスバリデーションを使いこなすには、データの性質を深く理解し、適切な手法を選択することが最も重要だと痛感している。特に時系列データでの失敗は本当に痛い教訓だった。
この記事で紹介した手法と考慮点を参考に、ぜひ自分のプロジェクトに適したCV戦略を構築してほしい。最初は保守的に始めて、徐々に高度な手法を取り入れていくのがおすすめだ。