複数の最適化シナリオを実行しデータフレームを作成する python code
import pandas as pd
import numpy as np
from meridian.analysis.optimizer import BudgetOptimizer, FixedBudgetScenario
def run_multiple_optimization_scenarios(meridian_model, base_budget=None, channels=None):
"""
複数の最適化シナリオを実行し、Power BI用のデータフレームを作成
Args:
meridian_model: 学習済みのMeridianモデル
base_budget: ベース予算(Noneの場合は履歴予算を使用)
channels: チャネル名のリスト(Noneの場合はモデルから取得)
Returns:
pandas.DataFrame: Power BI用の統合データフレーム
"""
# BudgetOptimizerの初期化
budget_optimizer = BudgetOptimizer(meridian_model)
# ベース予算の取得(履歴予算を使用)
if base_budget is None:
# 履歴データから予算を取得
historical_results = budget_optimizer.optimize(
fixed_budget=True,
spend_constraint_lower=0.3,
spend_constraint_upper=0.3
)
base_budget = historical_results.nonoptimized_data.budget
# チャネル情報の取得
if channels is None:
channels = meridian_model.input_data.get_all_paid_channels()
# シナリオ設定
budget_scenarios = [0.8, 0.9, 1.0, 1.1, 1.2] # 80%, 90%, 100%, 110%, 120%
scenario_names = ["80%", "90%", "100%", "110%", "120%"]
# 予算制約の設定(0.7-1.3倍の範囲)
spend_constraint_lower = 0.3 # 1-0.7 = 0.3
spend_constraint_upper = 0.3 # 1.3-1 = 0.3
# 結果を格納するリスト
all_results = []
print("最適化シナリオを実行中...")
for i, budget_multiplier in enumerate(budget_scenarios):
scenario_budget = base_budget * budget_multiplier
scenario_name = scenario_names[i]
print(f"シナリオ {scenario_name} を実行中... (予算: {scenario_budget:,.0f})")
try:
# 最適化の実行
optimization_results = budget_optimizer.optimize(
fixed_budget=True,
budget=scenario_budget,
spend_constraint_lower=spend_constraint_lower,
spend_constraint_upper=spend_constraint_upper,
use_posterior=True
)
# 非最適化データの取得
non_optimized_data = optimization_results.nonoptimized_data
optimized_data = optimization_results.optimized_data
# 各チャネルのデータを抽出
for channel in channels:
# 非最適化データ
non_opt_spend = float(non_optimized_data.spend.sel(channel=channel).values)
non_opt_revenue = float(non_optimized_data.incremental_outcome.sel(channel=channel, metric='mean').values)
non_opt_roi = float(non_optimized_data.roi.sel(channel=channel, metric='mean').values)
non_opt_mroi = float(non_optimized_data.mroi.sel(channel=channel, metric='mean').values)
# 最適化データ
opt_spend = float(optimized_data.spend.sel(channel=channel).values)
opt_revenue = float(optimized_data.incremental_outcome.sel(channel=channel, metric='mean').values)
opt_roi = float(optimized_data.roi.sel(channel=channel, metric='mean').values)
opt_mroi = float(optimized_data.mroi.sel(channel=channel, metric='mean').values)
# 結果をディクショナリに格納
result_row = {
# シナリオ情報
'scenario_name': scenario_name,
'scenario_budget_multiplier': budget_multiplier,
'total_scenario_budget': scenario_budget,
'channel': channel,
# 非最適化データ
'non_optimized_spend': non_opt_spend,
'non_optimized_revenue': non_opt_revenue,
'non_optimized_roi': non_opt_roi,
'non_optimized_mroi': non_opt_mroi,
# 最適化データ
'optimized_spend': opt_spend,
'optimized_revenue': opt_revenue,
'optimized_roi': opt_roi,
'optimized_mroi': opt_mroi,
# 改善指標
'spend_change': opt_spend - non_opt_spend,
'spend_change_pct': (opt_spend - non_opt_spend) / non_opt_spend * 100 if non_opt_spend > 0 else 0,
'revenue_change': opt_revenue - non_opt_revenue,
'revenue_change_pct': (opt_revenue - non_opt_revenue) / non_opt_revenue * 100 if non_opt_revenue > 0 else 0,
'roi_improvement': opt_roi - non_opt_roi,
'mroi_improvement': opt_mroi - non_opt_mroi,
# マーケター向け追加指標
'efficiency_score': opt_roi / non_opt_roi if non_opt_roi > 0 else 1,
'spend_allocation_pct_non_opt': non_opt_spend / scenario_budget * 100,
'spend_allocation_pct_opt': opt_spend / scenario_budget * 100,
'revenue_contribution_pct_non_opt': non_opt_revenue / non_optimized_data.total_incremental_outcome * 100,
'revenue_contribution_pct_opt': opt_revenue / optimized_data.total_incremental_outcome * 100,
# 総計情報
'total_non_optimized_budget': float(non_optimized_data.budget),
'total_optimized_budget': float(optimized_data.budget),
'total_non_optimized_revenue': float(non_optimized_data.total_incremental_outcome),
'total_optimized_revenue': float(optimized_data.total_incremental_outcome),
'total_roi_improvement': float(optimized_data.total_roi - non_optimized_data.total_roi)
}
all_results.append(result_row)
except Exception as e:
print(f"シナリオ {scenario_name} でエラーが発生しました: {str(e)}")
continue
# データフレームの作成
df_results = pd.DataFrame(all_results)
# データ型の最適化
df_results = optimize_dataframe_types(df_results)
print(f"最適化完了!{len(df_results)} 行のデータを生成しました。")
return df_results
def optimize_dataframe_types(df):
"""
データフレームのデータ型を最適化してPower BIでの読み込みを高速化
"""
# 数値列の最適化
numeric_columns = df.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
if df[col].dtype == 'float64':
df[col] = pd.to_numeric(df[col], downcast='float')
elif df[col].dtype == 'int64':
df[col] = pd.to_numeric(df[col], downcast='integer')
# カテゴリ変数の最適化
categorical_columns = ['scenario_name', 'channel']
for col in categorical_columns:
if col in df.columns:
df[col] = df[col].astype('category')
return df
def add_marketing_insights(df):
"""
マーケティング分析に役立つ追加指標を計算
"""
# パフォーマンスランキング
df['channel_roi_rank_non_opt'] = df.groupby('scenario_name')['non_optimized_roi'].rank(ascending=False)
df['channel_roi_rank_opt'] = df.groupby('scenario_name')['optimized_roi'].rank(ascending=False)
# 予算効率性指標
df['budget_efficiency'] = df['optimized_revenue'] / df['optimized_spend']
df['budget_efficiency_rank'] = df.groupby('scenario_name')['budget_efficiency'].rank(ascending=False)
# シナリオ間での変動性
df['spend_volatility'] = df.groupby('channel')['optimized_spend'].transform('std')
df['revenue_volatility'] = df.groupby('channel')['optimized_revenue'].transform('std')
# 最適化の安定性指標
df['optimization_consistency'] = 1 / (1 + df['spend_volatility'] / df['optimized_spend'].abs())
return df
def export_to_powerbi_format(df, file_path):
"""
Power BI用にデータフレームをエクスポート
"""
# Power BI用の追加指標を計算
df_enhanced = add_marketing_insights(df)
# CSVファイルとして保存(Power BIで読み込みやすい形式)
df_enhanced.to_csv(file_path, index=False, encoding='utf-8-sig')
# Excelファイルとしても保存(複数シートで詳細分析用)
excel_path = file_path.replace('.csv', '.xlsx')
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# メインデータ
df_enhanced.to_excel(writer, sheet_name='Main_Data', index=False)
# サマリーデータ
summary_df = create_summary_data(df_enhanced)
summary_df.to_excel(writer, sheet_name='Summary', index=False)
# チャネル別サマリー
channel_summary = create_channel_summary(df_enhanced)
channel_summary.to_excel(writer, sheet_name='Channel_Summary', index=False)
print(f"データを以下の形式で保存しました:")
print(f"CSV: {file_path}")
print(f"Excel: {excel_path}")
return df_enhanced
def create_summary_data(df):
"""
シナリオ別のサマリーデータを作成
"""
summary = df.groupby('scenario_name').agg({
'total_optimized_budget': 'first',
'total_optimized_revenue': 'first',
'total_roi_improvement': 'first',
'optimized_spend': 'sum',
'optimized_revenue': 'sum',
'revenue_change': 'sum',
'spend_change': 'sum'
}).reset_index()
summary['total_roi_optimized'] = summary['total_optimized_revenue'] / summary['total_optimized_budget']
summary['total_efficiency_gain'] = summary['revenue_change'] / summary['spend_change'].abs()
return summary
def create_channel_summary(df):
"""
チャネル別のサマリーデータを作成
"""
channel_summary = df.groupby('channel').agg({
'optimized_spend': ['mean', 'std', 'min', 'max'],
'optimized_revenue': ['mean', 'std', 'min', 'max'],
'optimized_roi': ['mean', 'std', 'min', 'max'],
'spend_change_pct': ['mean', 'std'],
'revenue_change_pct': ['mean', 'std'],
'optimization_consistency': 'mean'
}).reset_index()
# 列名を平坦化
channel_summary.columns = ['_'.join(col).strip('_') for col in channel_summary.columns]
return channel_summary
# 使用例
def main():
"""
メイン実行関数
"""
# Meridianモデルが既に学習済みであることを前提
# meridian_model = your_trained_meridian_model
# 複数シナリオの最適化実行
# results_df = run_multiple_optimization_scenarios(meridian_model)
# Power BI用にエクスポート
# enhanced_df = export_to_powerbi_format(results_df, 'meridian_optimization_results.csv')
# 基本的な統計情報の表示
# print("\n=== データフレーム情報 ===")
# print(f"形状: {enhanced_df.shape}")
# print(f"列数: {len(enhanced_df.columns)}")
# print(f"シナリオ数: {enhanced_df['scenario_name'].nunique()}")
# print(f"チャネル数: {enhanced_df['channel'].nunique()}")
print("サンプルコードを実行するには、上記のコメントアウトを解除してください。")
if __name__ == "__main__":
main()