はじめに
Model Context Protocol (MCP)は、AIアプリケーションとデータソース間の標準化されたインターフェースを提供し、詳細なメタデータを生成します。本記事では、MCPから得られる豊富なメタデータを活用した時系列分析により、最先端のコンテンツ需要予測モデルを構築する手法を解説します。
MCPメタデータの豊富さと時系列分析の可能性
MCPが生成する多次元メタデータ
MCPは従来のアクセスログを超えた、構造化された豊富なメタデータを提供します:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import matplotlib.pyplot as plt
import seaborn as sns
class MCPMetadataExtractor:
def __init__(self, log_file_path: str):
self.log_file = Path(log_file_path)
self.raw_logs = self.load_mcp_logs()
def load_mcp_logs(self) -> List[Dict]:
"""MCPログの読み込み"""
logs = []
if self.log_file.exists():
with open(self.log_file, 'r') as f:
for line in f:
try:
log_entry = json.loads(line.strip())
logs.append(log_entry)
except json.JSONDecodeError:
continue
return logs
def extract_rich_metadata(self) -> pd.DataFrame:
"""MCPから豊富なメタデータを抽出"""
metadata_records = []
for log in self.raw_logs:
# 基本メタデータ
record = {
'timestamp': pd.to_datetime(log['timestamp']),
'resource_uri': log['resource_uri'],
'client_id': log['client_id'],
'action': log['action'],
'success': log['success'],
'session_id': log.get('session_id', 'unknown')
}
# MCPコンテキストメタデータの抽出
context = log.get('context', '')
record['context_length'] = len(context)
record['context_complexity'] = self.calculate_context_complexity(context)
record['intent_category'] = self.classify_intent(context)
# リソースメタデータ
record['resource_type'] = self.identify_resource_type(log['resource_uri'])
record['resource_size'] = log.get('resource_size', 0)
record['file_extension'] = self.extract_file_extension(log['resource_uri'])
# ツール利用メタデータ
record['tools_used'] = len(log.get('tools_called', []))
record['tool_success_rate'] = self.calculate_tool_success_rate(log.get('tools_called', []))
record['tool_chain_length'] = self.calculate_tool_chain_length(log.get('tools_called', []))
# 時間的メタデータ
record['hour'] = record['timestamp'].hour
record['day_of_week'] = record['timestamp'].dayofweek
record['is_weekend'] = record['day_of_week'].isin([5, 6])
record['is_business_hour'] = 9 <= record['hour'] <= 17
record['quarter_of_day'] = record['hour'] // 6 # 0-3の4区分
# パフォーマンスメタデータ
record['response_time'] = log.get('response_time_ms', 0)
record['error_count'] = len(log.get('errors', []))
record['retry_count'] = log.get('retry_count', 0)
metadata_records.append(record)
df = pd.DataFrame(metadata_records)
return df.sort_values('timestamp')
def calculate_context_complexity(self, context: str) -> float:
"""コンテキストの複雑度を計算"""
if not context:
return 0.0
# 単語数、文の数、特殊文字の使用頻度などから複雑度を算出
word_count = len(context.split())
sentence_count = context.count('.') + context.count('!') + context.count('?')
special_char_ratio = sum(1 for c in context if not c.isalnum() and not c.isspace()) / len(context)
complexity = (word_count * 0.1 + sentence_count * 0.3 + special_char_ratio * 10)
return min(complexity, 10.0) # 0-10の範囲に正規化
def classify_intent(self, context: str) -> str:
"""コンテキストから意図を分類"""
context_lower = context.lower()
if any(word in context_lower for word in ['analyze', 'research', 'study', 'investigate']):
return 'analysis'
elif any(word in context_lower for word in ['create', 'generate', 'build', 'develop']):
return 'creation'
elif any(word in context_lower for word in ['learn', 'understand', 'explain', 'teach']):
return 'learning'
elif any(word in context_lower for word in ['fix', 'debug', 'solve', 'troubleshoot']):
return 'troubleshooting'
else:
return 'general'
def identify_resource_type(self, uri: str) -> str:
"""リソースタイプの識別"""
if 'document' in uri.lower() or '.md' in uri or '.txt' in uri:
return 'document'
elif 'image' in uri.lower() or any(ext in uri for ext in ['.jpg', '.png', '.gif']):
return 'image'
elif 'data' in uri.lower() or any(ext in uri for ext in ['.csv', '.json', '.xml']):
return 'data'
elif 'code' in uri.lower() or any(ext in uri for ext in ['.py', '.js', '.html']):
return 'code'
else:
return 'other'
def extract_file_extension(self, uri: str) -> str:
"""ファイル拡張子の抽出"""
parts = uri.split('.')
return parts[-1].lower() if len(parts) > 1 else 'unknown'
def calculate_tool_success_rate(self, tools_called: List[Dict]) -> float:
"""ツール成功率の計算"""
if not tools_called:
return 1.0
successful_tools = sum(1 for tool in tools_called if tool.get('success', False))
return successful_tools / len(tools_called)
def calculate_tool_chain_length(self, tools_called: List[Dict]) -> int:
"""ツールチェーンの長さを計算"""
return len(tools_called)
# メタデータ抽出の実行
extractor = MCPMetadataExtractor('mcp_detailed_logs.jsonl')
metadata_df = extractor.extract_rich_metadata()
print("MCPメタデータの概要:")
print(f"データ期間: {metadata_df['timestamp'].min()} - {metadata_df['timestamp'].max()}")
print(f"レコード数: {len(metadata_df)}")
print(f"ユニークリソース数: {metadata_df['resource_uri'].nunique()}")
print(f"メタデータ次元数: {metadata_df.shape[1]}")
print("\nメタデータの詳細:")
print(metadata_df.describe())
高次元時系列特徴量の構築
MCPメタデータから多次元の時系列特徴量を生成します:
class AdvancedTimeSeriesFeatureBuilder:
def __init__(self, metadata_df: pd.DataFrame):
self.metadata_df = metadata_df
def build_multidimensional_timeseries(self, aggregation_period: str = 'H') -> pd.DataFrame:
"""多次元時系列特徴量の構築"""
df = self.metadata_df.copy()
df = df.set_index('timestamp')
# リソース別の高次元時系列データを構築
timeseries_features = []
for resource_uri in df['resource_uri'].unique():
resource_df = df[df['resource_uri'] == resource_uri].copy()
if len(resource_df) < 10: # 最低限のデータポイント
continue
# 基本的な集計特徴量
basic_agg = resource_df.resample(aggregation_period).agg({
'client_id': 'nunique', # ユニーククライアント数
'success': ['sum', 'mean'], # 成功数と成功率
'context_length': ['mean', 'std'], # コンテキスト長の平均と標準偏差
'context_complexity': ['mean', 'max'], # コンテキスト複雑度
'tools_used': ['sum', 'mean'], # ツール利用数
'tool_success_rate': 'mean', # ツール成功率
'response_time': ['mean', 'median', 'max'], # レスポンス時間統計
'error_count': 'sum' # エラー数
}).fillna(0)
# カラム名の平坦化
basic_agg.columns = [f"{col[0]}_{col[1]}" if col[1] else col[0]
for col in basic_agg.columns]
# 意図別の集計特徴量
intent_counts = resource_df.groupby(resource_df.index.floor(aggregation_period))['intent_category'].value_counts().unstack(fill_value=0)
intent_ratios = intent_counts.div(intent_counts.sum(axis=1), axis=0)
intent_ratios.columns = [f"intent_ratio_{col}" for col in intent_ratios.columns]
# リソースタイプ別特徴量(この例では単一リソースなので定数)
basic_agg['resource_type_encoded'] = pd.Categorical([
resource_df['resource_type'].iloc[0]
] * len(basic_agg)).codes[0] if not basic_agg.empty else 0
# 時間的特徴量
basic_agg['hour_sin'] = np.sin(2 * np.pi * basic_agg.index.hour / 24)
basic_agg['hour_cos'] = np.cos(2 * np.pi * basic_agg.index.hour / 24)
basic_agg['day_sin'] = np.sin(2 * np.pi * basic_agg.index.dayofweek / 7)
basic_agg['day_cos'] = np.cos(2 * np.pi * basic_agg.index.dayofweek / 7)
basic_agg['is_weekend'] = basic_agg.index.dayofweek.isin([5, 6]).astype(int)
basic_agg['is_business_hour'] = ((basic_agg.index.hour >= 9) &
(basic_agg.index.hour <= 17)).astype(int)
# 高次の統計的特徴量
for window in [3, 6, 12, 24]: # 異なる時間窓での移動統計
if aggregation_period == 'H':
basic_agg[f'access_ma_{window}h'] = basic_agg['client_id_nunique'].rolling(window).mean()
basic_agg[f'success_rate_ma_{window}h'] = basic_agg['success_mean'].rolling(window).mean()
basic_agg[f'complexity_trend_{window}h'] = basic_agg['context_complexity_mean'].rolling(window).apply(
lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) > 1 else 0
)
# ラグ特徴量(時系列の自己回帰的パターン)
for lag in [1, 3, 6, 12, 24]:
basic_agg[f'access_lag_{lag}'] = basic_agg['client_id_nunique'].shift(lag)
basic_agg[f'success_lag_{lag}'] = basic_agg['success_mean'].shift(lag)
# 変化率特徴量
basic_agg['access_change_rate'] = basic_agg['client_id_nunique'].pct_change()
basic_agg['success_rate_change'] = basic_agg['success_mean'].pct_change()
basic_agg['complexity_change_rate'] = basic_agg['context_complexity_mean'].pct_change()
# 異常度特徴量(統計的外れ値検出)
basic_agg['access_zscore'] = (basic_agg['client_id_nunique'] -
basic_agg['client_id_nunique'].mean()) / basic_agg['client_id_nunique'].std()
# メタデータ情報を追加
basic_agg['resource_uri'] = resource_uri
basic_agg = basic_agg.reset_index()
# 意図別情報をマージ
if not intent_ratios.empty:
basic_agg = basic_agg.merge(intent_ratios.reset_index(), on='timestamp', how='left').fillna(0)
timeseries_features.append(basic_agg)
if timeseries_features:
result_df = pd.concat(timeseries_features, ignore_index=True)
return result_df.sort_values(['resource_uri', 'timestamp'])
else:
return pd.DataFrame()
# 高次元特徴量の構築
feature_builder = AdvancedTimeSeriesFeatureBuilder(metadata_df)
advanced_features = feature_builder.build_multidimensional_timeseries('H') # 時間別集計
print(f"構築された特徴量の次元: {advanced_features.shape}")
print(f"特徴量一覧 ({advanced_features.shape[1]}次元):")
for i, col in enumerate(advanced_features.columns):
print(f"{i+1:2d}. {col}")
最先端の需要予測モデル
1. 深層学習による多変量時系列予測
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Input, Attention
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
class DeepLearningDemandPredictor:
def __init__(self, feature_data: pd.DataFrame):
self.feature_data = feature_data
self.scalers = {}
self.models = {}
def prepare_sequences(self, resource_uri: str, sequence_length: int = 24,
target_column: str = 'client_id_nunique') -> Tuple[np.ndarray, np.ndarray, List[str]]:
"""時系列データをシーケンスに変換"""
resource_data = self.feature_data[
self.feature_data['resource_uri'] == resource_uri
].copy().sort_values('timestamp')
if len(resource_data) < sequence_length + 10:
raise ValueError(f"Insufficient data for {resource_uri}")
# 特徴量の選択(数値特徴量のみ)
numeric_columns = resource_data.select_dtypes(include=[np.number]).columns.tolist()
if 'timestamp' in numeric_columns:
numeric_columns.remove('timestamp')
if target_column in numeric_columns:
numeric_columns.remove(target_column)
feature_columns = numeric_columns
# データの正規化
scaler_features = StandardScaler()
scaler_target = StandardScaler()
scaled_features = scaler_features.fit_transform(resource_data[feature_columns])
scaled_target = scaler_target.fit_transform(resource_data[[target_column]])
self.scalers[f"{resource_uri}_features"] = scaler_features
self.scalers[f"{resource_uri}_target"] = scaler_target
# シーケンス作成
X, y = [], []
for i in range(sequence_length, len(scaled_features)):
X.append(scaled_features[i-sequence_length:i])
y.append(scaled_target[i])
return np.array(X), np.array(y), feature_columns
def build_advanced_lstm_model(self, input_shape: Tuple[int, int]) -> Model:
"""先進的なLSTMモデルの構築"""
inputs = Input(shape=input_shape)
# エンコーダー部分
lstm1 = LSTM(128, return_sequences=True, return_state=True)
lstm1_output, state_h1, state_c1 = lstm1(inputs)
lstm2 = LSTM(64, return_sequences=True, return_state=True)
lstm2_output, state_h2, state_c2 = lstm2(lstm1_output)
# アテンション機構
attention = Attention()
context_vector = attention([lstm2_output, lstm2_output])
# デコーダー部分
decoder_dense = Dense(32, activation='relu')(context_vector[:, -1, :])
decoder_dropout = Dropout(0.2)(decoder_dense)
# 出力層
outputs = Dense(1, activation='linear')(decoder_dropout)
model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer=Adam(learning_rate=0.001),
loss='mse', metrics=['mae'])
return model
def build_transformer_model(self, input_shape: Tuple[int, int]) -> Model:
"""Transformer-based時系列予測モデル"""
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization
inputs = Input(shape=input_shape)
# Multi-Head Attention
attention_layer = MultiHeadAttention(num_heads=8, key_dim=64)
attention_output = attention_layer(inputs, inputs)
# Add & Norm
add_norm1 = LayerNormalization()(inputs + attention_output)
# Feed Forward
ff = Dense(128, activation='relu')(add_norm1)
ff = Dense(input_shape[-1])(ff)
# Add & Norm
add_norm2 = LayerNormalization()(add_norm1 + ff)
# Global Average Pooling & Output
pooled = tf.keras.layers.GlobalAveragePooling1D()(add_norm2)
outputs = Dense(1, activation='linear')(pooled)
model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer=Adam(learning_rate=0.001),
loss='mse', metrics=['mae'])
return model
def train_ensemble_model(self, resource_uri: str, epochs: int = 100) -> Dict:
"""アンサンブル学習による予測モデル"""
try:
X, y, feature_columns = self.prepare_sequences(resource_uri, sequence_length=24)
except ValueError as e:
return {'error': str(e)}
# データ分割
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
models = {}
predictions = {}
# LSTM モデル
lstm_model = self.build_advanced_lstm_model((X.shape[1], X.shape[2]))
lstm_history = lstm_model.fit(
X_train, y_train,
epochs=epochs,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
tf.keras.callbacks.ReduceLROnPlateau(patience=5)
]
)
models['lstm'] = lstm_model
predictions['lstm'] = lstm_model.predict(X_test)
# Transformer モデル
transformer_model = self.build_transformer_model((X.shape[1], X.shape[2]))
transformer_history = transformer_model.fit(
X_train, y_train,
epochs=epochs,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
tf.keras.callbacks.ReduceLROnPlateau(patience=5)
]
)
models['transformer'] = transformer_model
predictions['transformer'] = transformer_model.predict(X_test)
# アンサンブル予測(重み付き平均)
ensemble_pred = 0.6 * predictions['lstm'] + 0.4 * predictions['transformer']
# 評価指標の計算
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
results = {
'models': models,
'predictions': predictions,
'ensemble_prediction': ensemble_pred,
'test_data': {'X': X_test, 'y': y_test},
'feature_columns': feature_columns,
'metrics': {
'lstm': {
'mae': mean_absolute_error(y_test, predictions['lstm']),
'rmse': np.sqrt(mean_squared_error(y_test, predictions['lstm'])),
'r2': r2_score(y_test, predictions['lstm'])
},
'transformer': {
'mae': mean_absolute_error(y_test, predictions['transformer']),
'rmse': np.sqrt(mean_squared_error(y_test, predictions['transformer'])),
'r2': r2_score(y_test, predictions['transformer'])
},
'ensemble': {
'mae': mean_absolute_error(y_test, ensemble_pred),
'rmse': np.sqrt(mean_squared_error(y_test, ensemble_pred)),
'r2': r2_score(y_test, ensemble_pred)
}
},
'resource_uri': resource_uri
}
self.models[resource_uri] = results
return results
# 深層学習モデルの訓練
if not advanced_features.empty:
dl_predictor = DeepLearningDemandPredictor(advanced_features)
# 最もデータの多いリソースを選択
resource_counts = advanced_features['resource_uri'].value_counts()
top_resource = resource_counts.index[0]
print(f"深層学習モデルを訓練中: {top_resource}")
print(f"データポイント数: {resource_counts.iloc[0]}")
dl_results = dl_predictor.train_ensemble_model(top_resource, epochs=50)
if 'error' not in dl_results:
print("\n=== 深層学習モデルの性能 ===")
for model_name, metrics in dl_results['metrics'].items():
print(f"{model_name.upper()}:")
print(f" MAE: {metrics['mae']:.4f}")
print(f" RMSE: {metrics['rmse']:.4f}")
print(f" R²: {metrics['r2']:.4f}")
2. 高度な外部要因統合モデル
class ExternalFactorIntegration:
def __init__(self, mcp_features: pd.DataFrame):
self.mcp_features = mcp_features
self.external_data = {}
def simulate_external_factors(self) -> pd.DataFrame:
"""外部要因データのシミュレーション(実際にはAPIから取得)"""
date_range = pd.date_range(
start=self.mcp_features['timestamp'].min(),
end=self.mcp_features['timestamp'].max(),
freq='H'
)
# 模擬的な外部要因データ
external_factors = pd.DataFrame({
'timestamp': date_range,
'market_trend': np.sin(2 * np.pi * np.arange(len(date_range)) / (24 * 7)) + \
np.random.normal(0, 0.1, len(date_range)), # 週次周期
'social_media_buzz': np.random.lognormal(0, 0.5, len(date_range)),
'competitor_activity': np.random.exponential(1, len(date_range)),
'seasonal_factor': np.sin(2 * np.pi * np.arange(len(date_range)) / (24 * 365)) * 0.3,
'economic_indicator': np.cumsum(np.random.normal(0, 0.01, len(date_range))),
'weather_impact': np.sin(2 * np.pi * np.arange(len(date_range)) / 24) * \
np.random.uniform(0.5, 1.5, len(date_range))
})
return external_factors
def create_integrated_features(self, resource_uri: str) -> pd.DataFrame:
"""MCPメタデータと外部要因を統合した特徴量"""
# MCPデータのフィルタリング
mcp_data = self.mcp_features[
self.mcp_features['resource_uri'] == resource_uri
].copy()
# 外部要因データの生成
external_data = self.simulate_external_factors()
# タイムスタンプでマージ
integrated_data = pd.merge_asof(
mcp_data.sort_values('timestamp'),
external_data.sort_values('timestamp'),
on='timestamp',
direction='backward'
)
# 相互作用特徴量の作成
integrated_data['mcp_market_interaction'] = \
integrated_data['client_id_nunique'] * integrated_data['market_trend']
integrated_data['complexity_buzz_interaction'] = \
integrated_data['context_complexity_mean'] * integrated_data['social_media_buzz']
integrated_data['success_weather_interaction'] = \
integrated_data['success_mean'] * integrated_data['weather_impact']
# 累積的影響の計算
for window in [6, 12, 24]: # 6時間、12時間、24時間の累積影響
integrated_data[f'cumulative_buzz_{window}h'] = \
integrated_data['social_media_buzz'].rolling(window).sum()
integrated_data[f'market_momentum_{window}h'] = \
integrated_data['market_trend'].rolling(window).mean()
return integrated_data
# 統合特徴量の作成
if not advanced_features.empty:
integrator = ExternalFactorIntegration(advanced_features)
top_resource = advanced_features['resource_uri'].value_counts().index[0]
integrated_features = integrator.create_integrated_features(top_resource)
print("統合特徴量の概要:")
print(f"特徴量次元: {integrated_features.shape[1]}")
print(f"データポイント: {len(integrated_features)}")
# 新しく追加された特徴量
new_features = [col for col in integrated_features.columns
if col not in advanced_features.columns]
print(f"追加された外部要因特徴量: {len(new_features)}個")
for feature in new_features[:10]: # 最初の10個を表示
print(f" - {feature}")
予測精度の可視化と解釈
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
class AdvancedVisualization:
def __init__(self, dl_results: Dict, integrated_features: pd.DataFrame):
self.dl_results = dl_results
self.integrated_features = integrated_features
def create_comprehensive_forecast_dashboard(self):
"""包括的な予測ダッシュボード"""
fig = make_subplots(
rows=3, cols=2,
subplot_titles=(
'実測値 vs 予測値(アンサンブル)',
'予測精度比較',
'MCPメタデータの時系列推移',
'外部要因の影響',
'モデル信頼度区間',
'特徴量重要度分析'
),
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": True}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# 実測値 vs 予測値
test_data = self.dl_results['test_data']
y_test = test_data['y'].flatten()
ensemble_pred = self.dl_results['ensemble_prediction'].flatten()
fig.add_trace(
go.Scatter(
x=list(range(len(y_test))),
y=y_test,
mode='lines+markers',
name='実測値',
line=dict(color='blue')
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=list(range(len(ensemble_pred))),
y=ensemble_pred,
mode='lines+markers',
name='アンサンブル予測',
line=dict(color='red', dash='dash')
),
row=1, col=1
)
# 予測精度比較
metrics_data = []
for model_name, metrics in self.dl_results['metrics'].items():
metrics_data.append({
'Model': model_name.upper(),
'MAE': metrics['mae'],
'RMSE': metrics['rmse'],
'R²': metrics['r2']
})
metrics_df = pd.DataFrame(metrics_data)
fig.add_trace(
go.Bar(
x=metrics_df['Model'],
y=metrics_df['MAE'],
name='MAE',
marker_color='lightblue'
),
row=1, col=2
)
# MCPメタデータの時系列推移
resource_uri = self.dl_results['resource_uri']
resource_data = self.integrated_features[
self.integrated_features['resource_uri'] == resource_uri
].sort_values('timestamp')
if not resource_data.empty:
fig.add_trace(
go.Scatter(
x=resource_data['timestamp'],
y=resource_data['client_id_nunique'],
mode='lines',
name='アクセス数',
line=dict(color='green')
),
row=2, col=1
)
fig.add_trace(
go.Scatter(
x=resource_data['timestamp'],
y=resource_data['context_complexity_mean'],
mode='lines',
name='コンテキスト複雑度',
line=dict(color='orange'),
yaxis='y2'
),
row=2, col=1, secondary_y=True
)
# 外部要因の影響
if 'market_trend' in self.integrated_features.columns:
fig.add_trace(
go.Scatter(
x=resource_data['timestamp'],
y=resource_data['market_trend'],
mode='lines',
name='市場トレンド',
line=dict(color='purple')
),
row=2, col=2
)
fig.add_trace(
go.Scatter(
x=resource_data['timestamp'],
y=resource_data['social_media_buzz'],
mode='lines',
name='SNSバズ',
line=dict(color='pink')
),
row=2, col=2
)
# 信頼度区間の計算と表示
prediction_std = np.std(ensemble_pred - y_test)
upper_bound = ensemble_pred + 1.96 * prediction_std
lower_bound = ensemble_pred - 1.96 * prediction_std
fig.add_trace(
go.Scatter(
x=list(range(len(ensemble_pred))),
y=upper_bound,
mode='lines',
name='上限(95%信頼区間)',
line=dict(color='red', width=0.5),
showlegend=False
),
row=3, col=1
)
fig.add_trace(
go.Scatter(
x=list(range(len(ensemble_pred))),
y=lower_bound,
mode='lines',
name='下限(95%信頼区間)',
line=dict(color='red', width=0.5),
fill='tonexty',
fillcolor='rgba(255,0,0,0.2)',
showlegend=True
),
row=3, col=1
)
fig.add_trace(
go.Scatter(
x=list(range(len(ensemble_pred))),
y=ensemble_pred,
mode='lines',
name='予測値',
line=dict(color='red')
),
row=3, col=1
)
# 特徴量重要度(模擬データ)
feature_importance = {
'context_complexity_mean': 0.25,
'client_id_nunique_lag_24': 0.20,
'market_trend': 0.15,
'success_mean': 0.12,
'social_media_buzz': 0.10,
'tools_used_sum': 0.08,
'weather_impact': 0.06,
'seasonal_factor': 0.04
}
fig.add_trace(
go.Bar(
x=list(feature_importance.values()),
y=list(feature_importance.keys()),
orientation='h',
name='特徴量重要度',
marker_color='lightgreen'
),
row=3, col=2
)
fig.update_layout(
height=1000,
title_text="MCPメタデータ時系列分析:総合ダッシュボード",
showlegend=True
)
fig.show()
def create_feature_correlation_heatmap(self):
"""特徴量相関ヒートマップ"""
numeric_cols = self.integrated_features.select_dtypes(include=[np.number]).columns
correlation_matrix = self.integrated_features[numeric_cols].corr()
fig = go.Figure(data=go.Heatmap(
z=correlation_matrix.values,
x=correlation_matrix.columns,
y=correlation_matrix.columns,
colorscale='RdBu',
zmid=0
))
fig.update_layout(
title="MCPメタデータ特徴量相関分析",
width=800,
height=800
)
fig.show()
return correlation_matrix
# 可視化の実行
if 'dl_results' in locals() and 'integrated_features' in locals():
if 'error' not in dl_results:
visualizer = AdvancedVisualization(dl_results, integrated_features)
print("=== 高度な予測結果の可視化 ===")
visualizer.create_comprehensive_forecast_dashboard()
print("\n=== 特徴量相関分析 ===")
correlation_matrix = visualizer.create_feature_correlation_heatmap()
# 高相関の特徴量ペアを特定
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
for j in range(i+1, len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if abs(corr_value) > 0.7:
high_corr_pairs.append({
'feature1': correlation_matrix.columns[i],
'feature2': correlation_matrix.columns[j],
'correlation': corr_value
})
if high_corr_pairs:
print("高相関特徴量ペア(|r| > 0.7):")
for pair in high_corr_pairs[:10]:
print(f" {pair['feature1']} ↔ {pair['feature2']}: {pair['correlation']:.3f}")
## 実務応用と ROI 最大化戦略
class BusinessIntelligenceEngine:
def __init__(self, prediction_results: Dict, metadata_df: pd.DataFrame):
self.prediction_results = prediction_results
self.metadata_df = metadata_df
def calculate_content_value_score(self) -> pd.DataFrame:
"""コンテンツ価値スコアの算出"""
# リソース別の統合スコア計算
resource_scores = []
for resource_uri in self.metadata_df['resource_uri'].unique():
resource_data = self.metadata_df[
self.metadata_df['resource_uri'] == resource_uri
]
if len(resource_data) < 10:
continue
# 基本メトリクス
avg_access = resource_data.groupby(resource_data['timestamp'].dt.date)['client_id'].nunique().mean()
success_rate = resource_data['success'].mean()
complexity_score = resource_data['context_complexity'].mean()
tool_utilization = resource_data['tools_used'].mean()
# トレンド分析
daily_access = resource_data.groupby(resource_data['timestamp'].dt.date)['client_id'].nunique()
if len(daily_access) > 7:
trend_slope = np.polyfit(range(len(daily_access)), daily_access.values, 1)[0]
else:
trend_slope = 0
# 統合価値スコア
value_score = (
avg_access * 0.3 + # アクセス頻度 (30%)
success_rate * 100 * 0.25 + # 成功率 (25%)
complexity_score * 10 * 0.2 + # コンテキスト複雑度 (20%)
tool_utilization * 20 * 0.15 + # ツール活用度 (15%)
max(0, trend_slope) * 50 * 0.1 # 成長トレンド (10%)
)
resource_scores.append({
'resource_uri': resource_uri,
'value_score': value_score,
'avg_daily_access': avg_access,
'success_rate': success_rate,
'complexity_score': complexity_score,
'tool_utilization': tool_utilization,
'trend_slope': trend_slope,
'total_records': len(resource_data)
})
return pd.DataFrame(resource_scores).sort_values('value_score', ascending=False)
def generate_strategic_recommendations(self, value_scores: pd.DataFrame) -> List[Dict]:
"""戦略的推奨事項の生成"""
recommendations = []
# 高価値コンテンツの特定
top_performers = value_scores.head(5)
recommendations.append({
'category': 'high_value_content',
'priority': 'high',
'title': 'トップパフォーマーの拡張戦略',
'description': f'上位5つのコンテンツ(平均価値スコア: {top_performers["value_score"].mean():.1f})の特徴を分析し、類似コンテンツの制作を優先する',
'action_items': [
'成功要因の詳細分析',
'類似コンテンツテンプレートの作成',
'関連キーワード・トピックの調査'
]
})
# 成長潜在性のあるコンテンツ
growing_content = value_scores[
(value_scores['trend_slope'] > 0) &
(value_scores['value_score'] < value_scores['value_score'].median())
]
if not growing_content.empty:
recommendations.append({
'category': 'growth_opportunity',
'priority': 'medium',
'title': '成長機会コンテンツの強化',
'description': f'{len(growing_content)}個のコンテンツが成長トレンドを示している',
'action_items': [
'プロモーション予算の追加投入',
'SEO最適化の実施',
'SNS連携強化'
]
})
# 改善が必要なコンテンツ
low_performers = value_scores[
(value_scores['success_rate'] < 0.6) |
(value_scores['trend_slope'] < -0.1)
]
if not low_performers.empty:
recommendations.append({
'category': 'improvement_needed',
'priority': 'high',
'title': 'パフォーマンス改善が必要なコンテンツ',
'description': f'{len(low_performers)}個のコンテンツが改善を必要としている',
'action_items': [
'コンテンツ品質の再評価',
'ユーザビリティテストの実施',
'リライト・リファクタリングの検討'
]
})
return recommendations
def forecast_roi_scenarios(self, value_scores: pd.DataFrame) -> Dict:
"""ROIシナリオ分析"""
scenarios = {}
# 現状維持シナリオ
current_total_value = value_scores['value_score'].sum()
scenarios['baseline'] = {
'description': '現状維持',
'expected_value_increase': 0,
'investment_required': 0,
'roi': 0
}
# 最適化シナリオ
optimization_potential = value_scores[value_scores['success_rate'] < 0.8]['value_score'].sum() * 0.3
scenarios['optimization'] = {
'description': '既存コンテンツ最適化',
'expected_value_increase': optimization_potential,
'investment_required': optimization_potential * 0.2,
'roi': (optimization_potential / (optimization_potential * 0.2) - 1) * 100 if optimization_potential > 0 else 0
}
# 拡張シナリオ
top_performer_avg = value_scores.head(3)['value_score'].mean()
expansion_value = top_performer_avg * 0.7 * 5 # 類似コンテンツ5個追加
scenarios['expansion'] = {
'description': '高価値コンテンツの拡張',
'expected_value_increase': expansion_value,
'investment_required': expansion_value * 0.4,
'roi': (expansion_value / (expansion_value * 0.4) - 1) * 100 if expansion_value > 0 else 0
}
return scenarios
# ビジネスインテリジェンス分析の実行
if not metadata_df.empty:
bi_engine = BusinessIntelligenceEngine({}, metadata_df)
print("=== ビジネスインテリジェンス分析 ===")
value_scores = bi_engine.calculate_content_value_score()
print("コンテンツ価値ランキング(上位10):")
for idx, row in value_scores.head(10).iterrows():
resource_name = row['resource_uri'].split('/')[-1][:30]
print(f"{idx+1:2d}. {resource_name:<30} スコア: {row['value_score']:6.1f} "
f"成功率: {row['success_rate']:.1%} トレンド: {row['trend_slope']:+.3f}")
print("\n=== 戦略的推奨事項 ===")
recommendations = bi_engine.generate_strategic_recommendations(value_scores)
for i, rec in enumerate(recommendations, 1):
print(f"\n{i}. [{rec['priority'].upper()}] {rec['title']}")
print(f" {rec['description']}")
print(" アクション項目:")
for action in rec['action_items']:
print(f" • {action}")
print("\n=== ROIシナリオ分析 ===")
roi_scenarios = bi_engine.forecast_roi_scenarios(value_scores)
for scenario_name, scenario in roi_scenarios.items():
print(f"\n{scenario_name.upper()}シナリオ: {scenario['description']}")
print(f" 期待価値増加: {scenario['expected_value_increase']:,.0f}")
print(f" 必要投資額: {scenario['investment_required']:,.0f}")
print(f" ROI: {scenario['roi']:.1f}%")
まとめと今後の展望
MCPメタデータを活用した最先端の時系列分析により、以下の革新的な価値を実現できます:
1. 多次元データの活用
- 豊富なメタデータ: コンテキスト複雑度、意図分類、ツール利用パターンなど
- 外部要因統合: 市場トレンド、SNSバズ、季節性要因との相関分析
- 高次元特徴量: ラグ、移動平均、変化率、相互作用項の生成
2. 最先端の予測モデル
- 深層学習: LSTM、Transformer、アンサンブル学習
- 統計的手法: ARIMA、Prophet、外部要因回帰
- リアルタイム更新: 継続学習による予測精度の向上
3. ビジネス価値の最大化
- データドリブン意思決定: 客観的な価値スコア算出
- 戦略的推奨: 成長機会と改善点の自動特定
- ROI最適化: 投資効果の定量的評価
この包括的なアプローチにより、従来の「勘と経験」に依存したコンテンツ戦略から、「データと科学」に基づく最適化された戦略への転換が可能になります。MCPメタデータの豊富さを最大限に活用することで、コンテンツエコノミーの新時代における競争優位性を確立できるのです。
注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。