import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf
# ステップ1: データ生成
# 平均0、標準偏差1のランダムな時系列データを生成(データポイント数は100)
np.random.seed(42) # 再現性のためのシード値設定
data = np.random.normal(loc=0, scale=1, size=100)
# ステップ2: データの自己相関を確認
# データフレームとして扱う
df = pd.DataFrame({'Value': data})
# ステップ3: 自己相関のプロット
plt.figure(figsize=(10, 6))
plot_acf(df['Value'], lags=20) # ラグ(遅れ)を20までプロット
plt.title('Autocorrelation of Random Time Series Data')
plt.xlabel('Lags')
plt.ylabel('Autocorrelation')
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# ステップ1: 日付を含むデータの生成
# 2023年1月1日から2023年12月31日までの日付を生成
date_range = pd.date_range(start='2023-01-01', end='2023-12-31')
# 架空の売上データを生成(ランダムなノイズを加えたデータ)
np.random.seed(42)
sales_data = 100 + np.random.normal(loc=0, scale=10, size=len(date_range))
# データフレームとしてまとめる
df = pd.DataFrame({'Date': date_range, 'Sales': sales_data})
# ステップ2: 日付処理
# 日付データをインデックスに設定
df.set_index('Date', inplace=True)
# 日、月、曜日などの情報を新しい列として追加
df['Year'] = df.index.year
df['Month'] = df.index.month
df['Day'] = df.index.day
df['Weekday'] = df.index.weekday
# 月ごとの売上を集計(集約)
monthly_sales = df['Sales'].resample('M').sum()
# 移動平均(7日間)を計算
df['7-Day Moving Average'] = df['Sales'].rolling(window=7).mean()
# 特定期間のデータを抽出(例: 2023年6月から8月のデータ)
summer_data = df['2023-06':'2023-08']
# ステップ3: データの可視化
# 月ごとの売上をプロット
plt.figure(figsize=(12, 6))
monthly_sales.plot(marker='o', linestyle='-')
plt.title('Monthly Sales in 2023')
plt.xlabel('Month')
plt.ylabel('Total Sales')
plt.grid(True)
plt.show()
# 売上と移動平均のプロット
plt.figure(figsize=(12, 6))
df['Sales'].plot(label='Daily Sales', color='blue')
df['7-Day Moving Average'].plot(label='7-Day Moving Average', color='orange')
plt.title('Daily Sales and 7-Day Moving Average')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.legend()
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
# ステップ1: ランダムウォークデータの生成
np.random.seed(42)
n = 100
random_walk = np.cumsum(np.random.normal(loc=0, scale=1, size=n)) # 累積和を取ることでランダムウォークを生成
# ステップ2: 非定常データの確認
# データフレームとしてまとめる
df = pd.DataFrame({'Y': random_walk, 'X': np.arange(n)})
# 定常性を確認するためのADF検定
adf_test = adfuller(df['Y'])
print(f'ADF Test Statistic: {adf_test[0]:.4f}')
print(f'p-value: {adf_test[1]:.4f}')
# ステップ3: 回帰分析の適用
# Y ~ X の単回帰モデルを作成
X = sm.add_constant(df['X']) # 定数項を追加
model = sm.OLS(df['Y'], X).fit() # 回帰モデルの適用
# 結果を表示
print(model.summary())
# ステップ4: プロットで確認
plt.figure(figsize=(12, 6))
plt.plot(df['X'], df['Y'], label='Random Walk (Y)')
plt.plot(df['X'], model.predict(X), label='OLS Regression Line', linestyle='--', color='orange')
plt.title('Random Walk and Linear Regression Line')
plt.xlabel('Time')
plt.ylabel('Y Value')
plt.legend()
plt.grid(True)
plt.show()
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# ステップ1: サンプルデータの生成(ランダムな時系列データを生成)
np.random.seed(42) # 再現性のためのシード設定
n = 50 # データポイント数
dates = pd.date_range(start='2023-01-01', periods=n, freq='D')
data = np.random.normal(loc=10, scale=2, size=n) # 平均10、標準偏差2のデータを生成
# データフレームを作成
df = pd.DataFrame({'Date': dates, 'Value': data})
df.set_index('Date', inplace=True)
# ステップ2: ナイーブ予測の実装
# Naive予測では、翌日の予測値は当日の値と同じとする
df['Naive_Forecast'] = df['Value'].shift(1)
# ステップ3: 実データとナイーブ予測の可視化
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['Value'], label='Actual Value', marker='o')
plt.plot(df.index, df['Naive_Forecast'], label='Naive Forecast', linestyle='--', marker='x', color='orange')
plt.title('Naive Forecast vs Actual Values')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
# ステップ4: ナイーブ予測の評価(Mean Absolute Error, MAEを計算)
# 最初の1つ目は予測不可なので除外
mae = np.mean(np.abs(df['Value'][1:] - df['Naive_Forecast'][1:]))
print(f'Mean Absolute Error (MAE) of Naive Forecast: {mae:.2f}')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# ステップ1: サンプルデータの生成(季節性とトレンドを持つ時系列データ)
np.random.seed(42)
n = 60 # データポイント数(例: 5年分の月次データ)
date_range = pd.date_range(start='2019-01-01', periods=n, freq='M')
# 季節性のあるデータを生成(例: Sine波 + ノイズ + 緩やかなトレンド)
seasonal_pattern = 10 + 5 * np.sin(2 * np.pi * np.arange(n) / 12) # 1年周期の季節性
trend = np.linspace(0, 10, n) # 緩やかなトレンド
noise = np.random.normal(0, 2, n) # ランダムノイズ
data = seasonal_pattern + trend + noise
# データフレームを作成
df = pd.DataFrame({'Date': date_range, 'Value': data})
df.set_index('Date', inplace=True)
# ステップ2: 移動平均によるトレンドの抽出(12ヶ月移動平均を計算)
df['12-Month MA'] = df['Value'].rolling(window=12, center=True).mean()
# ステップ3: 季節指数の計算
# 季節成分を計算するために、元データから移動平均を引く
df['Seasonal + Noise'] = df['Value'] - df['12-Month MA']
# 各月ごとの平均を求める(季節指数を計算)
monthly_seasonal_indices = df.groupby(df.index.month)['Seasonal + Noise'].mean()
print("Monthly Seasonal Indices:\n", monthly_seasonal_indices)
# ステップ4: 季節調整済みデータの作成
# 元のデータから各月の季節指数を引くことで季節調整を行う
df['Seasonally Adjusted'] = df['Value'] - df.index.month.map(monthly_seasonal_indices)
# ステップ5: 元のデータと季節調整済みデータのプロット
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['Value'], label='Original Data')
plt.plot(df.index, df['Seasonally Adjusted'], label='Seasonally Adjusted Data', linestyle='--')
plt.title('Original Data vs Seasonally Adjusted Data')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
# ステップ6: 季節成分の可視化
plt.figure(figsize=(12, 6))
plt.bar(monthly_seasonal_indices.index, monthly_seasonal_indices, color='skyblue')
plt.title('Seasonal Indices for Each Month')
plt.xlabel('Month')
plt.ylabel('Seasonal Index')
plt.grid(True)
plt.show()
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing
# ステップ1: サンプルデータの生成(ランダムなトレンド・季節性を持つデータ)
np.random.seed(42)
n = 100 # データポイント数
date_range = pd.date_range(start='2020-01-01', periods=n, freq='D')
# 季節性とトレンドを持つデータを生成
seasonal_pattern = 10 + 5 * np.sin(2 * np.pi * np.arange(n) / 30) # 30日周期の季節性
trend = np.linspace(0, 2, n) # 緩やかなトレンド
noise = np.random.normal(0, 1, n) # ランダムノイズ
data = seasonal_pattern + trend + noise
# データフレームを作成
df = pd.DataFrame({'Date': date_range, 'Value': data})
df.set_index('Date', inplace=True)
# ステップ2: 単純指数平滑化法(Simple Exponential Smoothing, SES)の適用
alpha = 0.2 # スムージングパラメータ(0 < alpha ≤ 1)
ses_model = SimpleExpSmoothing(df['Value']).fit(smoothing_level=alpha, optimized=False)
df['SES'] = ses_model.fittedvalues
# ステップ3: ホルトの線形指数平滑化法(Holt's Linear)を適用
holt_model = ExponentialSmoothing(df['Value'], trend='add', seasonal=None, seasonal_periods=None).fit()
df['Holt'] = holt_model.fittedvalues
# ステップ4: ホルト・ウィンター法(Holt-Winters)を適用
# 季節性周期を30日とし、トレンドと季節性の両方を考慮
hw_model = ExponentialSmoothing(df['Value'], trend='add', seasonal='add', seasonal_periods=30).fit()
df['Holt-Winters'] = hw_model.fittedvalues
# ステップ5: データの可視化
plt.figure(figsize=(12, 6))
plt.plot(df['Value'], label='Original Data', color='blue')
plt.plot(df['SES'], label='Simple Exponential Smoothing (SES)', linestyle='--', color='orange')
plt.plot(df['Holt'], label="Holt's Linear Trend", linestyle='--', color='green')
plt.plot(df['Holt-Winters'], label='Holt-Winters (Seasonal)', linestyle='--', color='red')
plt.title('Exponential Smoothing Methods')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.ar_model import AutoReg
# ステップ1: サンプル時系列データの生成
np.random.seed(42)
n = 200 # データポイント数
# データ生成: 0平均、1標準偏差のホワイトノイズに、自己回帰項を加える(AR(2)モデル)
ar_coefficients = [0.7, -0.3] # ARモデルの係数(AR(2))
noise = np.random.normal(0, 1, n)
data = [0, 0] # 初期値として2つの0を設定
# 自己回帰データの生成(2次のARモデル)
for i in range(2, n):
new_value = ar_coefficients[0] * data[-1] + ar_coefficients[1] * data[-2] + noise[i]
data.append(new_value)
# データをDataFrameに格納
df = pd.DataFrame({'Value': data})
df.index = pd.date_range(start='2020-01-01', periods=n, freq='D')
# ステップ2: ARモデルの適用
# 過去40期間のデータを使用した自己回帰モデルの適用
ar_lags = 40 # ラグ数を指定
ar_model = AutoReg(df['Value'], lags=ar_lags).fit()
# ステップ3: モデルのパラメータを表示
print("AR Model Coefficients:", ar_model.params)
# ステップ4: 予測の実施
# 最後のデータポイントから30日分の予測を行う
forecast_steps = 30
forecast = ar_model.predict(start=len(df), end=len(df) + forecast_steps - 1)
# ステップ5: 元データと予測データのプロット
plt.figure(figsize=(12, 6))
plt.plot(df['Value'], label='Original Data')
plt.plot(forecast.index, forecast, label='AR Model Forecast', color='orange')
plt.title(f'AR({ar_lags}) Model Forecast')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
# ステップ1: サンプル時系列データの生成
np.random.seed(42)
n = 200 # データポイント数
# ランダムなホワイトノイズデータを生成
noise = np.random.normal(loc=0, scale=1, size=n)
# 移動平均モデルを用いてデータを生成(MA(2))
# 係数を指定して生成 (MAモデルのラグ=2)
ma_coefficients = [0.8, 0.2] # MA(2)の係数
data = []
# 初期化
for i in range(len(ma_coefficients)):
data.append(noise[i]) # 最初の数値はノイズのみ
# MA(2)モデルの生成
for i in range(len(ma_coefficients), n):
new_value = noise[i] + ma_coefficients[0] * noise[i-1] + ma_coefficients[1] * noise[i-2]
data.append(new_value)
# データフレームを作成
df = pd.DataFrame({'Value': data})
df.index = pd.date_range(start='2020-01-01', periods=n, freq='D')
# ステップ2: MAモデルの適用
# MA(2)モデルを適用
ma_model = ARIMA(df['Value'], order=(0, 0, 2)).fit() # order=(p, d, q) -> (0, 0, 2)でMAモデルを指定
# ステップ3: モデルのパラメータを表示
print("MA Model Coefficients:")
print(ma_model.params)
# ステップ4: 予測の実施
# 最後のデータポイントから30日分の予測を行う
forecast_steps = 30
forecast = ma_model.predict(start=len(df), end=len(df) + forecast_steps - 1)
# ステップ5: 元データと予測データのプロット
plt.figure(figsize=(12, 6))
plt.plot(df['Value'], label='Original Data')
plt.plot(forecast.index, forecast, label='MA Model Forecast', color='orange')
plt.title(f'MA(2) Model Forecast')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
# ステップ1: サンプル時系列データの生成
np.random.seed(42)
n = 300 # データポイント数
# データ生成: トレンド + 季節性 + ランダムノイズ
trend = np.linspace(0, 20, n) # 緩やかな上昇トレンド
seasonal = 10 + 5 * np.sin(2 * np.pi * np.arange(n) / 30) # 30日周期の季節性
noise = np.random.normal(0, 1, n) # ランダムノイズ
data = trend + seasonal + noise
# データフレームを作成し、日次データのインデックスを設定
df = pd.DataFrame({'Value': data})
df.index = pd.date_range(start='2020-01-01', periods=n, freq='D')
# ステップ2: ARMAモデルの適用(定常データに対して)
# データが非定常のため、1階差分を取る(定常化を確認することを推奨)
df['Diff'] = df['Value'].diff().dropna()
# ARMAモデルの適用(差分を取ったデータに対して)
arma_model = ARIMA(df['Diff'].dropna(), order=(2, 0, 2)).fit() # p=2, q=2のARMAモデル
# ステップ3: ARIMAモデルの適用
# 元の非定常データに対してARIMAモデルを適用(1階差分)
arima_model = ARIMA(df['Value'], order=(2, 1, 2)).fit() # d=1で1階差分
# ステップ4: SARIMAモデルの適用(季節性を考慮)
# 季節性周期を30日としてSARIMAモデルを適用
sarima_model = SARIMAX(df['Value'], order=(2, 1, 2), seasonal_order=(1, 1, 1, 30)).fit()
# ステップ5: 予測の実施
# それぞれのモデルで最後のデータポイントから30日間の予測を実施
forecast_steps = 30
arma_forecast = arma_model.predict(start=len(df), end=len(df) + forecast_steps - 1)
arima_forecast = arima_model.predict(start=len(df), end=len(df) + forecast_steps - 1)
sarima_forecast = sarima_model.predict(start=len(df), end=len(df) + forecast_steps - 1)
# ステップ6: モデルの結果をデータフレームに格納
forecast_index = pd.date_range(start=df.index[-1] + pd.Timedelta(days=1), periods=forecast_steps, freq='D')
forecast_df = pd.DataFrame({
'ARMA_Forecast': arma_forecast,
'ARIMA_Forecast': arima_forecast,
'SARIMA_Forecast': sarima_forecast
}, index=forecast_index)
# ステップ7: 元データと予測データの可視化
plt.figure(figsize=(14, 8))
plt.plot(df['Value'], label='Original Data', color='blue')
plt.plot(forecast_df['ARMA_Forecast'], label='ARMA Forecast', linestyle='--', color='orange')
plt.plot(forecast_df['ARIMA_Forecast'], label='ARIMA Forecast', linestyle='--', color='green')
plt.plot(forecast_df['SARIMA_Forecast'], label='SARIMA Forecast', linestyle='--', color='red')
plt.title('ARMA, ARIMA, SARIMA Model Forecasts')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.show()
# ステップ8: モデルのパラメータ表示
print("ARMA Model Parameters:")
print(arma_model.params)
print("\nARIMA Model Parameters:")
print(arima_model.params)
print("\nSARIMA Model Parameters:")
print(sarima_model.params)
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings
# 警告メッセージを無視する
warnings.filterwarnings("ignore")
# 季節ランダムウォーク過程のシミュレーション
def simulate_seasonal_random_walk(n_periods, season_length, random_seed=42):
np.random.seed(random_seed)
seasonal_component = np.sin(2 * np.pi * np.arange(n_periods) / season_length)
errors = np.random.normal(loc=0, scale=0.5, size=n_periods)
y = np.zeros(n_periods)
for t in range(1, n_periods):
y[t] = y[t-1] + seasonal_component[t] + errors[t]
return y
# データの生成
n_periods = 200 # データの期間
season_length = 12 # 季節性の長さ
seasonal_rw_data = simulate_seasonal_random_walk(n_periods, season_length)
# データプロット
plt.figure(figsize=(12, 6))
plt.plot(seasonal_rw_data, label='Seasonal Random Walk')
plt.title('Seasonal Random Walk Process')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# データを Pandas DataFrame に変換
data = pd.DataFrame(seasonal_rw_data, columns=['Value'])
# SARIMA モデルのフィッティング
# SARIMA(p, d, q)(P, D, Q, s)の各次数を設定
order = (1, 1, 1) # p, d, q
seasonal_order = (1, 1, 1, season_length) # P, D, Q, s
# SARIMA モデルの適合
model = SARIMAX(data['Value'], order=order, seasonal_order=seasonal_order)
sarima_result = model.fit()
# モデルの要約表示
print(sarima_result.summary())
# 実データとモデルによる予測値のプロット
data['Fitted'] = sarima_result.fittedvalues
plt.figure(figsize=(12, 6))
plt.plot(data['Value'], label='Observed')
plt.plot(data['Fitted'], color='red', linestyle='--', label='Fitted')
plt.title('Observed vs Fitted (SARIMA Model)')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# 将来の予測を行う
forecast_steps = 24
forecast = sarima_result.get_forecast(steps=forecast_steps)
# 予測値と信頼区間の取得
forecast_values = forecast.predicted_mean
confidence_intervals = forecast.conf_int()
# 実データと予測値のプロット
plt.figure(figsize=(12, 6))
plt.plot(data['Value'], label='Observed')
plt.plot(np.arange(n_periods, n_periods + forecast_steps), forecast_values, color='green', linestyle='--', label='Forecast')
plt.fill_between(np.arange(n_periods, n_periods + forecast_steps),
confidence_intervals.iloc[:, 0], confidence_intervals.iloc[:, 1],
color='green', alpha=0.3)
plt.title('Observed vs Forecast (SARIMA Model)')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# 1. パラメータ設定
# システムの初期状態
x = np.array([[0], [1]]) # 初期状態 (位置0, 速度1)
P = np.eye(2) # 状態共分散行列の初期値
# 状態遷移行列
F = np.array([[1, 1], # 位置 = 位置 + 速度 * 時間
[0, 1]]) # 速度は変わらない
# 観測行列
H = np.array([[1, 0]]) # 観測できるのは位置のみ
# システムノイズと観測ノイズの共分散行列
Q = np.array([[0.001, 0], # システムノイズの共分散行列
[0, 0.001]])
R = np.array([[0.1]]) # 観測ノイズの共分散行列
# 2. データ生成
np.random.seed(42)
true_positions = []
observations = []
for _ in range(50):
x = F @ x + np.random.multivariate_normal([0, 0], Q).reshape(-1, 1)
observed = H @ x + np.random.normal(0, R).reshape(-1, 1)
true_positions.append(x[0, 0])
observations.append(observed[0, 0])
# 3. カルマンフィルターの実装
x_est = np.array([[0], [0]]) # 初期推定状態 (位置0, 速度0)
P_est = np.eye(2) # 初期推定共分散行列
estimated_positions = []
for z in observations:
# (1) 状態予測
x_pred = F @ x_est
P_pred = F @ P_est @ F.T + Q
# (2) カルマンゲインの計算
K = P_pred @ H.T @ np.linalg.inv(H @ P_pred @ H.T + R)
# (3) 観測の更新
z = np.array([[z]])
x_est = x_pred + K @ (z - H @ x_pred)
P_est = (np.eye(2) - K @ H) @ P_pred
# 推定値を格納
estimated_positions.append(x_est[0, 0])
# 4. 結果のプロット
plt.figure(figsize=(10, 5))
plt.plot(true_positions, label='True Position', linestyle='--', marker='o')
plt.plot(observations, label='Observations', linestyle=':', marker='x')
plt.plot(estimated_positions, label='Kalman Filter Estimate', linestyle='-', marker='s')
plt.xlabel('Time')
plt.ylabel('Position')
plt.legend()
plt.title('Kalman Filter: Position Estimation')
plt.grid(True)
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# 1. データ生成
np.random.seed(42)
n = 100 # データの数
true_level = np.zeros(n)
observed_data = np.zeros(n)
# ローカルレベルモデルのパラメータ
sigma_level = 0.5 # レベルの変動(システムノイズの標準偏差)
sigma_obs = 1.0 # 観測ノイズの標準偏差
# 初期値
true_level[0] = 10 # レベルの初期値
observed_data[0] = true_level[0] + np.random.normal(0, sigma_obs)
# 状態方程式と観測方程式を用いてデータ生成
for t in range(1, n):
true_level[t] = true_level[t-1] + np.random.normal(0, sigma_level)
observed_data[t] = true_level[t] + np.random.normal(0, sigma_obs)
# 2. カルマンフィルタによるレベル推定
# 初期化
mu_est = np.zeros(n) # 推定されたレベルの保存用
P_est = np.zeros(n) # 推定された共分散の保存用
mu_est[0] = observed_data[0] # 初期レベルを観測値の初期値に設定
P_est[0] = 1.0 # 初期共分散(適当な値)
# カルマンフィルタのパラメータ設定
Q = sigma_level ** 2 # システムノイズの分散
R = sigma_obs ** 2 # 観測ノイズの分散
# カルマンフィルタの実装
for t in range(1, n):
# 予測ステップ
mu_pred = mu_est[t-1] # レベルの予測値
P_pred = P_est[t-1] + Q # 共分散の予測
# 更新ステップ
K = P_pred / (P_pred + R) # カルマンゲインの計算
mu_est[t] = mu_pred + K * (observed_data[t] - mu_pred) # レベルの更新
P_est[t] = (1 - K) * P_pred # 共分散の更新
# 3. 結果のプロット
plt.figure(figsize=(10, 6))
plt.plot(true_level, label='True Level', linestyle='--', marker='o')
plt.plot(observed_data, label='Observed Data', linestyle=':', marker='x')
plt.plot(mu_est, label='Kalman Filter Estimate', linestyle='-', marker='s')
plt.xlabel('Time')
plt.ylabel('Level')
plt.legend()
plt.title('Local Level Model with Kalman Filter')
plt.grid(True)
plt.show()
import numpy as np
import pandas as pd
import json
import statsmodels.api as sm
import matplotlib.pyplot as plt
# 1. データの準備
# サンプル時系列データの作成(正弦波データにノイズを追加)
np.random.seed(42)
n_samples = 100
time_index = pd.date_range('2023-01-01', periods=n_samples, freq='D')
data = np.sin(np.linspace(0, 20, n_samples)) + 0.5 * np.random.normal(size=n_samples)
data_series = pd.Series(data, index=time_index)
# 2. 状態空間モデルの定義とパラメータ推定
# 単純なローカルレベルモデルを定義(時系列のトレンドを推定)
model = sm.tsa.UnobservedComponents(data_series, level='local level')
result = model.fit(disp=False)
# 3. モデルパラメータの保存
# 推定されたパラメータを取得し、辞書型に変換
params = result.params.to_dict()
# パラメータをJSONファイルとして保存
params_filename = 'state_space_params.json'
with open(params_filename, 'w') as f:
json.dump(params, f)
print(f"Saved parameters to {params_filename}")
# 4. 保存したパラメータを読み込む
with open(params_filename, 'r') as f:
loaded_params = json.load(f)
# 5. 読み込んだパラメータをリスト型に変換し、smoothに適用
# パラメータをリスト型に変換(statsmodelsのパラメータの順序は辞書からリストに変換時に順序を確認)
params_list = [loaded_params[key] for key in result.param_names]
# 新しいモデルを定義し、パラメータをセット
new_model = sm.tsa.UnobservedComponents(data_series, level='local level')
new_result = new_model.smooth(params_list)
# 6. 過去のデータに対するフィッティング結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(result.fittedvalues, label='Fitted Values', linestyle='--')
plt.legend()
plt.title("State Space Model Fit to Data")
plt.show()
# 7. 将来のデータを予測(10ステップ先まで)
future_steps = 10
pred = new_result.get_forecast(steps=future_steps)
# 予測結果を表示
future_index = pd.date_range(start=data_series.index[-1] + pd.Timedelta(days=1), periods=future_steps, freq='D')
predicted_mean = pred.predicted_mean
confidence_intervals = pred.conf_int()
# 8. 予測結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(result.fittedvalues, label='Fitted Values', linestyle='--')
plt.plot(future_index, predicted_mean, label='Forecast', color='orange')
plt.fill_between(future_index, confidence_intervals.iloc[:, 0], confidence_intervals.iloc[:, 1], color='orange', alpha=0.3)
plt.legend()
plt.title("State Space Model Forecast")
plt.show()
# 9. 将来予測の結果を表示
print("Future Predicted Values:")
print(predicted_mean)
print("\nConfidence Intervals:")
print(confidence_intervals)
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
# 1. データの準備
# サンプル時系列データ(正弦波にノイズを加えたデータ)
np.random.seed(42)
n_samples = 100
time_index = pd.date_range('2023-01-01', periods=n_samples, freq='D')
data = np.sin(np.linspace(0, 20, n_samples)) + 0.5 * np.random.normal(size=n_samples)
data_series = pd.Series(data, index=time_index)
# 2. 状態空間モデルの定義(線形ガウス型)
# ローカルレベルモデル(トレンド+ホワイトノイズ)
# 観測方程式: y_t = μ_t + ε_t, ε_t ~ N(0, σ_ε^2)
# 状態方程式: μ_t = μ_(t-1) + η_t, η_t ~ N(0, σ_η^2)
model = sm.tsa.UnobservedComponents(data_series, level='local level')
# 3. モデルのフィッティング(パラメータ推定)
result = model.fit(disp=False)
# 4. フィッティング結果の表示
print("Model Parameters:")
print(result.params)
# 5. フィッティング結果のプロット(観測値とフィルタリングされた値の比較)
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(result.fittedvalues, label='Fitted Values (Filtered State)', linestyle='--')
plt.title("Linear Gaussian State Space Model Fit")
plt.legend()
plt.show()
# 6. 状態推定結果の表示(フィルタリングと平滑化)
filtered_state = result.filtered_state[0]
smoothed_state = result.smoothed_state[0]
# 状態推定結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series.index, filtered_state, label='Filtered State')
plt.plot(data_series.index, smoothed_state, label='Smoothed State', linestyle='--')
plt.title("Filtered and Smoothed States")
plt.legend()
plt.show()
# 7. 将来のデータを予測(10ステップ先まで)
future_steps = 10
forecast_result = result.get_forecast(steps=future_steps)
forecast_mean = forecast_result.predicted_mean
forecast_conf_int = forecast_result.conf_int()
# 予測結果を表示
future_index = pd.date_range(start=data_series.index[-1] + pd.Timedelta(days=1), periods=future_steps, freq='D')
# 8. 予測結果のプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(result.fittedvalues, label='Fitted Values', linestyle='--')
plt.plot(future_index, forecast_mean, label='Forecast', color='orange')
plt.fill_between(future_index, forecast_conf_int.iloc[:, 0], forecast_conf_int.iloc[:, 1], color='orange', alpha=0.3)
plt.xlabel("Date")
plt.ylabel("Value")
plt.title("Forecast with Linear Gaussian State Space Model")
plt.legend()
plt.show()
# 9. 将来予測の結果を表示
print("Future Predicted Values:")
print(forecast_mean)
print("\nConfidence Intervals:")
print(forecast_conf_int)
# 10. フィルタリングされた状態と観測値のプロット
# 状態空間モデルのフィルタリングと平滑化の状態を比較
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(data_series.index, filtered_state, label='Filtered State', linestyle='--')
plt.plot(data_series.index, smoothed_state, label='Smoothed State', linestyle='-.')
plt.xlabel("Date")
plt.ylabel("Value")
plt.title("Filtered and Smoothed States with Original Data")
plt.legend()
plt.show()
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras import layers
# 1. データの準備
# サンプル時系列データ(正弦波データにノイズを加えたデータ)
np.random.seed(42)
n_samples = 200
data = np.sin(np.linspace(0, 20, n_samples)) + 0.5 * np.random.normal(size=n_samples)
# データをDataFrameに変換し、特徴量とターゲットに分割
df = pd.DataFrame({'feature': np.arange(n_samples), 'target': data})
# 訓練データとテストデータの分割(80%: 訓練データ、20%: テストデータ)
X = df[['feature']].values
y = df['target'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# 2. LightGBMによる時系列予測
# LightGBM用データセットの作成
lgb_train = lgb.Dataset(X_train, y_train)
lgb_test = lgb.Dataset(X_test, y_test, reference=lgb_train)
# モデルのパラメータ設定
params = {
'objective': 'regression',
'metric': 'mse',
'verbosity': -1
}
# モデルの訓練(コールバック関数を使って early stopping を設定)
callbacks = [lgb.early_stopping(stopping_rounds=10, verbose=False)]
lgb_model = lgb.train(params, lgb_train, valid_sets=[lgb_test], callbacks=callbacks)
# 予測と評価
y_pred_lgb = lgb_model.predict(X_test)
lgb_mse = mean_squared_error(y_test, y_pred_lgb)
print(f"LightGBM MSE: {lgb_mse:.4f}")
# 3. 深層学習(LSTM)による時系列予測
# データを0-1の範囲にスケーリング
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(df['target'].values.reshape(-1, 1))
# LSTM用データの準備(時系列データを特徴量とターゲットに分割)
def create_lstm_dataset(data, time_steps=5):
X, y = [], []
for i in range(len(data)-time_steps):
X.append(data[i:i+time_steps, 0])
y.append(data[i+time_steps, 0])
return np.array(X), np.array(y)
# LSTM用データセットの作成
time_steps = 10 # 過去10ステップを使って次の1ステップを予測
X_lstm, y_lstm = create_lstm_dataset(data_scaled, time_steps)
# データを訓練データとテストデータに分割
train_size = int(len(X_lstm) * 0.8)
X_train_lstm, X_test_lstm = X_lstm[:train_size], X_lstm[train_size:]
y_train_lstm, y_test_lstm = y_lstm[:train_size], y_lstm[train_size:]
# LSTM用にデータを3次元に変形 (サンプル数, 時間ステップ, 特徴量数)
X_train_lstm = X_train_lstm.reshape(X_train_lstm.shape[0], X_train_lstm.shape[1], 1)
X_test_lstm = X_test_lstm.reshape(X_test_lstm.shape[0], X_test_lstm.shape[1], 1)
# LSTMモデルの構築
lstm_model = tf.keras.Sequential([
layers.LSTM(50, activation='relu', input_shape=(X_train_lstm.shape[1], 1)),
layers.Dense(1)
])
# モデルのコンパイル
lstm_model.compile(optimizer='adam', loss='mse')
# モデルの訓練
lstm_model.fit(X_train_lstm, y_train_lstm, epochs=50, batch_size=16, validation_split=0.2, verbose=0)
# 予測と評価
y_pred_lstm = lstm_model.predict(X_test_lstm)
y_pred_lstm_rescaled = scaler.inverse_transform(y_pred_lstm)
y_test_lstm_rescaled = scaler.inverse_transform(y_test_lstm.reshape(-1, 1))
lstm_mse = mean_squared_error(y_test_lstm_rescaled, y_pred_lstm_rescaled)
print(f"LSTM MSE: {lstm_mse:.4f}")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
import statsmodels.tsa.api as tsa
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
# 1. データの準備
# サンプルの時系列データ(売上データなどの模擬データを作成)
np.random.seed(42)
n_samples = 200
date_range = pd.date_range(start='2020-01-01', periods=n_samples, freq='M') # 月次データ
data = np.cumsum(np.random.randn(n_samples)) + 100 # ランダムウォークデータ
data_series = pd.Series(data, index=date_range)
# データのプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.title("Time Series Data")
plt.legend()
plt.show()
# 2. 定常性の確認(ADF検定)
result = adfuller(data_series)
print("ADF Statistic: {:.4f}".format(result[0]))
print("p-value: {:.4f}".format(result[1]))
print("Critical Values:", result[4])
# p-valueが0.05より大きければ非定常と判断し、差分化を行う
if result[1] > 0.05:
data_diff = data_series.diff().dropna() # 1階差分を取る
else:
data_diff = data_series
# 差分データのプロット
plt.figure(figsize=(12, 6))
plt.plot(data_diff, label='Differenced Data')
plt.title("Differenced Time Series Data")
plt.legend()
plt.show()
# 3. ACF と PACF のプロット(自己相関と偏自己相関)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
plot_acf(data_diff, ax=ax1)
plot_pacf(data_diff, ax=ax2)
plt.show()
# 4. ARIMA モデルの識別と推定
# (p, d, q) を試しに設定し、ARIMA モデルをフィッティング
p, d, q = 2, 1, 2 # ACF、PACFプロットから推定(仮の値)
arima_model = sm.tsa.ARIMA(data_series, order=(p, d, q))
arima_result = arima_model.fit()
print("ARIMA Model Summary:\n", arima_result.summary())
# 5. 残差の診断
# 残差プロットと自己相関の確認
residuals = arima_result.resid
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
residuals.plot(title="Residuals", ax=ax1)
plot_acf(residuals, ax=ax2)
plt.show()
# 6. SARIMA モデルの設定と推定
# (p, d, q) と (P, D, Q, s) の設定
p, d, q = 2, 1, 2 # ARIMA部分の次数
P, D, Q, s = 1, 1, 1, 12 # SARIMA部分の次数(季節性12を仮定)
sarima_model = sm.tsa.statespace.SARIMAX(data_series, order=(p, d, q), seasonal_order=(P, D, Q, s))
sarima_result = sarima_model.fit()
print("SARIMA Model Summary:\n", sarima_result.summary())
# 7. モデル診断
# 残差プロットと自己相関の確認(SARIMAモデル)
residuals_sarima = sarima_result.resid
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
residuals_sarima.plot(title="SARIMA Residuals", ax=ax1)
plot_acf(residuals_sarima, ax=ax2)
plt.show()
# 8. 予測(SARIMAX モデルを用いた将来予測)
# 将来12ヶ月の予測を行う
forecast = sarima_result.get_forecast(steps=12)
forecast_mean = forecast.predicted_mean
forecast_conf_int = forecast.conf_int()
# 予測結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data_series, label='Original Data')
plt.plot(forecast_mean, label='Forecast', color='orange')
plt.fill_between(forecast_conf_int.index, forecast_conf_int.iloc[:, 0], forecast_conf_int.iloc[:, 1], color='orange', alpha=0.3)
plt.title("Forecast with SARIMAX Model")
plt.legend()
plt.show()
# 9. 予測結果の表示
print("Future Forecasted Values:\n", forecast_mean)
print("Confidence Intervals:\n", forecast_conf_int)
from sklearn.model_selection import cross_val_score, KFold
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
# 1. データの準備(サンプルデータを作成)
X, y = make_regression(n_samples=200, n_features=1, noise=0.1, random_state=42)
# 2. 線形回帰モデルの定義
lr_model = LinearRegression()
# 3. クロスバリデーションの設定(KFoldを用いた5分割交差検証)
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
# 4. クロスバリデーションの実行
cv_scores = cross_val_score(lr_model, X, y, cv=kfold, scoring='r2')
# 5. 各フォールドのスコアと平均スコアを表示
print(f"Cross-Validation Scores: {cv_scores}")
print(f"Average Cross-Validation Score: {cv_scores.mean():.4f}")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import STL
# 1. データの準備
np.random.seed(42)
n_samples = 365 * 2 # 2年分の日次データ
date_range = pd.date_range(start='2020-01-01', periods=n_samples, freq='D')
# 日次データを生成し、2つの季節成分(週次と年次)を加えたデータを作成
weekly_seasonality = 10 * np.sin(2 * np.pi * np.arange(n_samples) / 7)
yearly_seasonality = 20 * np.sin(2 * np.pi * np.arange(n_samples) / 365)
data = 50 + weekly_seasonality + yearly_seasonality + np.random.normal(scale=5, size=n_samples)
# データをDataFrameに変換
data_series = pd.DataFrame({'ds': date_range, 'y': data})
# 2. STL分解 (週次成分)
stl_week = STL(data_series['y'], period=7, robust=True)
result_week = stl_week.fit()
# 3. STL分解 (年次成分)
stl_year = STL(data_series['y'] - result_week.seasonal, period=365, robust=True)
result_year = stl_year.fit()
# 4. 各成分のプロット
plt.figure(figsize=(12, 8))
plt.subplot(3, 1, 1)
plt.plot(data_series['ds'], data_series['y'], label='Original Data')
plt.title('Original Data')
plt.legend()
plt.subplot(3, 1, 2)
plt.plot(data_series['ds'], result_week.seasonal, label='Weekly Seasonal Component')
plt.title('Weekly Seasonal Component')
plt.legend()
plt.subplot(3, 1, 3)
plt.plot(data_series['ds'], result_year.seasonal, label='Yearly Seasonal Component')
plt.title('Yearly Seasonal Component')
plt.legend()
plt.tight_layout()
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.api import VAR
# 1. データの準備(2つの時系列データを用いたサンプル)
np.random.seed(42)
n_samples = 200
time_index = pd.date_range('2020-01-01', periods=n_samples, freq='D')
# 2つの時系列データ(互いに依存するデータを生成)
ts1 = np.cumsum(np.random.randn(n_samples)) # ランダムウォーク
ts2 = 0.8 * ts1 + np.cumsum(np.random.randn(n_samples)) # ts1 に依存する ts2
# データフレームに変換
data = pd.DataFrame({'ts1': ts1, 'ts2': ts2}, index=time_index)
# データのプロット
plt.figure(figsize=(12, 6))
plt.plot(data['ts1'], label='Time Series 1 (ts1)')
plt.plot(data['ts2'], label='Time Series 2 (ts2)')
plt.title('Time Series Data')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# 2. VAR モデルの適用
model = VAR(data)
model_fitted = model.fit(ic='aic') # 情報基準(AIC)に基づいて最適なラグ次数を選択
# 結果の表示
print(model_fitted.summary())
# 3. インパルス応答関数のプロット
irf = model_fitted.irf(10) # 10期間分のインパルス応答を計算
irf.plot(orth=False)
plt.show()
# 4. 予測(10期間先まで)
forecast = model_fitted.forecast(data.values[-model_fitted.k_ar:], steps=10)
forecast_index = pd.date_range(data.index[-1] + pd.Timedelta(days=1), periods=10, freq='D')
forecast_df = pd.DataFrame(forecast, index=forecast_index, columns=['ts1_forecast', 'ts2_forecast'])
# 予測結果のプロット
plt.figure(figsize=(12, 6))
plt.plot(data['ts1'], label='Original ts1')
plt.plot(data['ts2'], label='Original ts2')
plt.plot(forecast_df['ts1_forecast'], label='Forecast ts1', linestyle='--')
plt.plot(forecast_df['ts2_forecast'], label='Forecast ts2', linestyle='--')
plt.title('VAR Model Forecast')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.kalman_filter import KalmanFilter
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.statespace.structural import UnobservedComponents
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_squared_error
# 1. サンプルデータの生成
np.random.seed(42)
n_samples = 200
time_index = pd.date_range('2020-01-01', periods=n_samples, freq='D')
# ランダムな状態遷移モデルを持つサンプル時系列データを生成
state_transition = np.cumsum(np.random.randn(n_samples)) # ランダムな状態
measurement = state_transition + np.random.normal(scale=0.5, size=n_samples) # 測定値(ノイズ付き)
# データフレームに変換
data = pd.DataFrame({'state': state_transition, 'measurement': measurement}, index=time_index)
# 2. 時系列の状態空間モデルの構築(ローカルレベルモデル)
# 状態空間モデル: 状態方程式と観測方程式で構成される
# ローカルレベルモデル: 状態方程式: x_t = x_(t-1) + η_t, 観測方程式: y_t = x_t + ε_t
model = UnobservedComponents(data['measurement'], level='local level')
# 3. モデルの適合と結果の表示
result = model.fit(disp=False)
print("State Space Model Summary:\n", result.summary())
# 4. 状態推定結果(フィルタリングと平滑化)の取得
# フィルタリング: 現在までの観測値を用いたリアルタイムの状態推定
# 平滑化: 全観測値を用いた状態推定(事後処理)
filtered_state = result.filtered_state[0]
smoothed_state = result.smoothed_state[0]
# 5. 状態推定の結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data['measurement'], label='Measurement (Observed Data)', color='blue')
plt.plot(data.index, filtered_state, label='Filtered State', linestyle='--', color='orange')
plt.plot(data.index, smoothed_state, label='Smoothed State', linestyle='-', color='red')
plt.title('State Space Model - Filtering and Smoothing')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# 6. 状態空間モデルを用いた予測(10日先まで予測)
forecast_steps = 10
forecast_result = result.get_forecast(steps=forecast_steps)
forecast_mean = forecast_result.predicted_mean
forecast_ci = forecast_result.conf_int()
# 予測結果を表示
forecast_index = pd.date_range(start=data.index[-1] + pd.Timedelta(days=1), periods=forecast_steps, freq='D')
plt.figure(figsize=(12, 6))
plt.plot(data['measurement'], label='Measurement (Observed Data)', color='blue')
plt.plot(data.index, smoothed_state, label='Smoothed State', linestyle='-', color='red')
plt.plot(forecast_index, forecast_mean, label='Forecast', linestyle='--', color='green')
plt.fill_between(forecast_index, forecast_ci.iloc[:, 0], forecast_ci.iloc[:, 1], color='green', alpha=0.3)
plt.title('State Space Model - Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# 7. 測定データに基づく時系列モデリング(VARモデルによる相互依存の確認)
# 2変数のVARモデルを構築して、データの相互依存関係を調査
data['measurement2'] = 0.5 * data['measurement'] + np.random.normal(scale=0.5, size=n_samples)
model_var = VAR(data[['measurement', 'measurement2']])
model_var_fitted = model_var.fit(ic='aic')
# VARモデルの要約
print("VAR Model Summary:\n", model_var_fitted.summary())
# インパルス応答関数のプロット(10期間)
irf = model_var_fitted.irf(10)
irf.plot(orth=False)
plt.show()
# 8. モデルの予測(10期間)
var_forecast = model_var_fitted.forecast(data[['measurement', 'measurement2']].values[-model_var_fitted.k_ar:], steps=forecast_steps)
var_forecast_df = pd.DataFrame(var_forecast, index=forecast_index, columns=['measurement_forecast', 'measurement2_forecast'])
# VARモデルの予測結果をプロット
plt.figure(figsize=(12, 6))
plt.plot(data['measurement'], label='Original Measurement 1')
plt.plot(data['measurement2'], label='Original Measurement 2')
plt.plot(var_forecast_df['measurement_forecast'], linestyle='--', label='Forecast Measurement 1', color='green')
plt.plot(var_forecast_df['measurement2_forecast'], linestyle='--', label='Forecast Measurement 2', color='orange')
plt.title('VAR Model Forecast')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# 9. 予測精度の評価
mse = mean_squared_error(data['measurement'][-forecast_steps:], var_forecast_df['measurement_forecast'])
print(f"Mean Squared Error of VAR Forecast: {mse:.4f}")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
from sklearn.metrics import mean_squared_error, r2_score
# 1. サンプル時系列データの生成
np.random.seed(42)
n_samples = 100
time_index = pd.date_range(start='2020-01-01', periods=n_samples, freq='D')
# スカラ(単変数)データ: 時間に対して線形に増加するデータ + ノイズ
x_scalar = np.arange(n_samples) # 説明変数(時間)
y_scalar = 0.5 * x_scalar + np.random.normal(scale=5, size=n_samples) # 目的変数(線形関係 + ノイズ)
# 多変数データ: 複数の説明変数(線形関係 + ノイズ)
x_multivariable = np.column_stack([x_scalar, np.sin(0.1 * x_scalar)]) # 説明変数(線形 + 正弦波)
y_multivariable = 2 * x_scalar + 3 * np.sin(0.1 * x_scalar) + np.random.normal(scale=5, size=n_samples) # 目的変数
# データフレームに変換
data_scalar = pd.DataFrame({'Time': x_scalar, 'Value': y_scalar}, index=time_index)
data_multivariable = pd.DataFrame({'Time': x_scalar, 'Sin_Time': np.sin(0.1 * x_scalar), 'Value': y_multivariable}, index=time_index)
# 2. 最小二乗推定法(スカラの場合)
# 線形回帰モデルを定義
model_scalar = LinearRegression()
x_scalar_reshaped = x_scalar.reshape(-1, 1) # 説明変数を2次元配列に変換
model_scalar.fit(x_scalar_reshaped, y_scalar)
# 予測値の計算
y_pred_scalar = model_scalar.predict(x_scalar_reshaped)
# 結果のプロット
plt.figure(figsize=(12, 6))
plt.scatter(x_scalar, y_scalar, color='blue', label='Observed Data')
plt.plot(x_scalar, y_pred_scalar, color='red', label='Fitted Line')
plt.title('Scalar Least Squares Estimation (Linear Regression)')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# モデルのパラメータと評価指標の表示
print("Scalar Model Coefficients:", model_scalar.coef_)
print("Scalar Model Intercept:", model_scalar.intercept_)
print("Scalar Model Mean Squared Error (MSE):", mean_squared_error(y_scalar, y_pred_scalar))
print("Scalar Model R-squared:", r2_score(y_scalar, y_pred_scalar))
# 3. 最小二乗推定法(多変数の場合)
# 説明変数と目的変数を設定
x_multivariable_reshaped = data_multivariable[['Time', 'Sin_Time']].values # 説明変数
y_multivariable_values = data_multivariable['Value'].values # 目的変数
# 多変数線形回帰モデルの適用
model_multivariable = LinearRegression()
model_multivariable.fit(x_multivariable_reshaped, y_multivariable_values)
# 予測値の計算
y_pred_multivariable = model_multivariable.predict(x_multivariable_reshaped)
# 結果のプロット
plt.figure(figsize=(12, 6))
plt.scatter(x_scalar, y_multivariable_values, color='blue', label='Observed Data')
plt.plot(x_scalar, y_pred_multivariable, color='red', label='Fitted Line')
plt.title('Multivariable Least Squares Estimation (Multiple Linear Regression)')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# モデルのパラメータと評価指標の表示
print("Multivariable Model Coefficients:", model_multivariable.coef_)
print("Multivariable Model Intercept:", model_multivariable.intercept_)
print("Multivariable Model Mean Squared Error (MSE):", mean_squared_error(y_multivariable_values, y_pred_multivariable))
print("Multivariable Model R-squared:", r2_score(y_multivariable_values, y_pred_multivariable))
# 4. statsmodels を用いた回帰分析(結果の詳細表示)
# スカラの場合の回帰分析
X_scalar_sm = sm.add_constant(x_scalar_reshaped) # 切片項を追加
model_scalar_sm = sm.OLS(y_scalar, X_scalar_sm).fit()
print("\nScalar Least Squares Estimation using statsmodels:\n", model_scalar_sm.summary())
# 多変数の場合の回帰分析
X_multivariable_sm = sm.add_constant(x_multivariable_reshaped) # 切片項を追加
model_multivariable_sm = sm.OLS(y_multivariable_values, X_multivariable_sm).fit()
print("\nMultivariable Least Squares Estimation using statsmodels:\n", model_multivariable_sm.summary())
# 5. 時系列データを用いた回帰分析
# 時系列データに基づいたデータフレームの作成(1日先の予測)
data_multivariable['Value_Lag1'] = data_multivariable['Value'].shift(1) # ラグ項(1日先の値)を作成
data_multivariable.dropna(inplace=True) # 欠損値を削除
# 説明変数と目的変数を設定
X_time_series = data_multivariable[['Time', 'Sin_Time', 'Value_Lag1']].values
y_time_series = data_multivariable['Value'].values
# モデルの適用
model_time_series = LinearRegression()
model_time_series.fit(X_time_series, y_time_series)
# 予測値の計算
y_pred_time_series = model_time_series.predict(X_time_series)
# 結果のプロット
plt.figure(figsize=(12, 6))
plt.plot(data_multivariable.index, y_time_series, label='Observed Data')
plt.plot(data_multivariable.index, y_pred_time_series, linestyle='--', label='Fitted Line (Time Series)')
plt.title('Time Series Least Squares Estimation (With Lag Variables)')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()
# モデルのパラメータと評価指標の表示
print("Time Series Model Coefficients:", model_time_series.coef_)
print("Time Series Model Intercept:", model_time_series.intercept_)
print("Time Series Model Mean Squared Error (MSE):", mean_squared_error(y_time_series, y_pred_time_series))
print("Time Series Model R-squared:", r2_score(y_time_series, y_pred_time_series))
import numpy as np
import matplotlib.pyplot as plt
# パラメータ設定
dt = 1 # 時間の間隔
process_variance = 1 # プロセスノイズの分散
measurement_variance = 2 # 観測ノイズの分散
# 初期状態 (位置と速度)
x = np.array([[0], [1]]) # 位置=0, 速度=1
# 状態遷移行列 (2x2)
A = np.array([[1, dt],
[0, 1]])
# 観測行列 (1x2)
H = np.array([[1, 0]])
# プロセスノイズ共分散行列
Q = process_variance * np.array([[dt**4/4, dt**3/2],
[dt**3/2, dt**2]])
# 観測ノイズ共分散行列
R = np.array([[measurement_variance]])
# 初期の推定誤差共分散行列
P = np.eye(2)
# 予測と観測値を格納するリスト
predicted_positions = []
true_positions = []
measured_positions = []
# シミュレーションデータを生成
np.random.seed(0)
true_position = 0
true_velocity = 1
for _ in range(50):
# 1. 真の位置を更新
true_position += true_velocity * dt
true_positions.append(true_position)
# 2. 観測値 (真の位置 + ノイズ)
measured_position = true_position + np.random.normal(0, np.sqrt(measurement_variance))
measured_positions.append(measured_position)
# 3. 予測ステップ
x = np.dot(A, x)
P = np.dot(A, np.dot(P, A.T)) + Q
# 4. 更新ステップ
y = measured_position - np.dot(H, x) # 観測残差
S = np.dot(H, np.dot(P, H.T)) + R # 観測残差の共分散
K = np.dot(P, np.dot(H.T, np.linalg.inv(S))) # カルマンゲイン
x = x + np.dot(K, y) # 状態の更新
P = P - np.dot(K, np.dot(H, P)) # 誤差共分散の更新
# 推定された位置を記録
predicted_positions.append(x[0, 0])
# 結果をプロット
plt.plot(true_positions, label='True Position')
plt.plot(measured_positions, label='Measured Position', linestyle='dotted')
plt.plot(predicted_positions, label='Kalman Filter Estimate', linestyle='dashed')
plt.legend()
plt.xlabel('Time')
plt.ylabel('Position')
plt.title('Kalman Filter for Position Estimation')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# 状態空間モデルの設定
# 状態ベクトル: [位置, 速度]
dt = 1 # 時間間隔
A = np.array([[1, dt], # 状態遷移行列
[0, 1]])
# 観測行列 (位置のみを観測)
H = np.array([[1, 0]])
# プロセスノイズの共分散行列
process_variance = 1
Q = process_variance * np.array([[dt**4/4, dt**3/2],
[dt**3/2, dt**2]])
# 観測ノイズの共分散行列
measurement_variance = 2
R = np.array([[measurement_variance]])
# 初期の推定誤差共分散行列
P = np.eye(2)
# 初期状態の設定 [位置, 速度]
x = np.array([[0], # 位置
[1]]) # 速度
# シミュレーション用の時系列データを生成
np.random.seed(0)
n_timesteps = 50
true_positions = []
measured_positions = []
# 真の位置データと観測データを生成
true_position = 0
true_velocity = 0.5 # 速度は一定と仮定
for _ in range(n_timesteps):
# 真の位置データを更新
true_position += true_velocity * dt
true_positions.append(true_position)
# ノイズを含む観測データを生成
measured_position = true_position + np.random.normal(0, np.sqrt(measurement_variance))
measured_positions.append(measured_position)
# カルマンフィルタによる時系列データの平滑化
filtered_positions = []
for measurement in measured_positions:
# 1. 予測ステップ
x = np.dot(A, x)
P = np.dot(A, np.dot(P, A.T)) + Q
# 2. 更新ステップ
y = measurement - np.dot(H, x) # 観測残差
S = np.dot(H, np.dot(P, H.T)) + R # 観測残差の共分散
K = np.dot(P, np.dot(H.T, np.linalg.inv(S))) # カルマンゲイン
# 状態の更新
x = x + np.dot(K, y)
P = P - np.dot(K, np.dot(H, P))
# 推定された位置を記録
filtered_positions.append(x[0, 0])
# 結果のプロット
plt.figure(figsize=(10, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot(measured_positions, label='Measured Position (Noisy)', linestyle='dotted', color='orange')
plt.plot(filtered_positions, label='Kalman Filtered Position', linestyle='dashed', color='green')
plt.xlabel('Time')
plt.ylabel('Position')
plt.legend()
plt.title('Kalman Filter for Time Series Smoothing')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# システムの設定
dt = 0.1 # 時間の間隔
A = np.array([[1, dt], # 状態遷移行列
[0, 1]]) # 位置と速度の関係
B = np.array([[0], # 制御入力行列
[dt]])
H = np.array([[1, 0]]) # 観測行列(位置のみを観測)
# システムノイズと観測ノイズの分散
process_variance = 1e-5
measurement_variance = 0.1
# プロセスノイズ共分散行列
Q = process_variance * np.eye(2)
# 観測ノイズ共分散行列
R = measurement_variance * np.eye(1)
# 初期推定誤差共分散行列
P = np.eye(2)
# 初期状態(位置と速度)
x = np.array([[0], # 初期位置
[0]]) # 初期速度
# 制御目標(ターゲット位置)
target_position = 10
# LQRの設定
Q_lqr = np.eye(2) # 状態に対する重み
R_lqr = np.array([[0.01]]) # 制御入力に対する重み
# LQRゲインの計算
P_lqr = np.linalg.solve(np.dot(A.T, np.dot(P, A)) - P + Q_lqr - np.dot(np.dot(A.T, np.dot(P, B)), np.linalg.inv(R_lqr + np.dot(np.dot(B.T, P), B))) @ np.dot(B.T, P) @ A.T, np.eye(2))
K_lqr = np.linalg.inv(R_lqr + B.T @ P_lqr @ B) @ B.T @ P_lqr @ A
# シミュレーションデータ
n_steps = 100
true_states = []
estimated_states = []
measurements = []
control_inputs = []
# 真の初期状態
true_state = np.array([[0], # 初期位置
[0]]) # 初期速度
# 制御ループ
for _ in range(n_steps):
# 1. 制御入力の計算(LQR制御を使用)
control_input = -K_lqr @ (x - np.array([[target_position], [0]]))
control_inputs.append(control_input[0, 0])
# 2. システムの真の状態を更新
true_state = A @ true_state + B @ control_input + np.random.multivariate_normal([0, 0], Q).reshape(-1, 1)
true_states.append(true_state.flatten())
# 3. ノイズのある観測値を生成
measurement = H @ true_state + np.random.normal(0, np.sqrt(measurement_variance))
measurements.append(measurement.flatten())
# 4. カルマンフィルタの予測ステップ
x = A @ x + B @ control_input
P = A @ P @ A.T + Q
# 5. カルマンフィルタの更新ステップ
y = measurement - H @ x # 観測残差
S = H @ P @ H.T + R # 観測残差の共分散
K = P @ H.T @ np.linalg.inv(S) # カルマンゲイン
x = x + K @ y # 状態の更新
P = P - K @ H @ P # 誤差共分散行列の更新
estimated_states.append(x.flatten())
# 結果の可視化
true_positions = [state[0] for state in true_states]
estimated_positions = [state[0] for state in estimated_states]
measured_positions = [measurement[0] for measurement in measurements]
plt.figure(figsize=(12, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot(estimated_positions, label='Estimated Position', linestyle='dashed', color='red')
plt.plot(measured_positions, label='Measured Position', linestyle='dotted', color='green')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.legend()
plt.title('Kalman Filter and LQR Control for System Position')
plt.show()
plt.figure(figsize=(12, 6))
plt.plot(control_inputs, label='Control Input (LQR)', color='purple')
plt.xlabel('Time Step')
plt.ylabel('Control Input')
plt.legend()
plt.title('LQR Control Inputs over Time')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
from scipy.linalg import solve_discrete_are
# システムパラメータ設定
dt = 1.0 # 時間の間隔
A = np.array([[1, dt], # 状態遷移行列
[0, 1]])
B = np.array([[0], # 制御行列(制御入力なし)
[0]])
H = np.array([[1, 0]]) # 観測行列(位置のみを観測)
# ノイズ共分散行列の設定
process_variance = 0.01 # プロセスノイズの分散
measurement_variance = 1.0 # 観測ノイズの分散
Q = process_variance * np.eye(2) # プロセスノイズ共分散行列
R = np.array([[measurement_variance]]) # 観測ノイズ共分散行列
# 定常共分散行列の計算(リカッチ方程式を解く)
P_steady_state = solve_discrete_are(A.T, H.T, Q, R)
# 定常カルマンゲインの計算
K = np.dot(P_steady_state, np.dot(H.T, np.linalg.inv(H @ P_steady_state @ H.T + R)))
# シミュレーション用の時系列データを生成
n_timesteps = 50
true_positions = []
measured_positions = []
estimated_positions = []
# 初期状態の設定 [位置, 速度]
x = np.array([[0], # 位置
[1]]) # 速度
# 真の状態の設定
true_position = 0
true_velocity = 1 # 速度は一定と仮定
np.random.seed(42)
for _ in range(n_timesteps):
# 1. 真の位置を更新
true_position += true_velocity * dt
true_positions.append(true_position)
# 2. ノイズを含む観測データを生成
measurement = true_position + np.random.normal(0, np.sqrt(measurement_variance))
measured_positions.append(measurement)
# 3. 定常カルマンフィルタによる更新ステップ
y = measurement - H @ x # 観測残差
x = x + K @ y # 状態の更新
# 推定された位置を記録
estimated_positions.append(x[0, 0])
# 結果のプロット
plt.figure(figsize=(12, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot(measured_positions, label='Measured Position (Noisy)', linestyle='dotted', color='orange')
plt.plot(estimated_positions, label='Steady-State Kalman Filtered Position', linestyle='dashed', color='green')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.legend()
plt.title('Steady-State Kalman Filter for Position Estimation')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# システムの設定
dt = 1.0 # 時間の間隔
A = np.array([[1, dt], # 状態遷移行列
[0, 1]])
H = np.array([[1, 0]]) # 観測行列(位置のみを観測)
# 実際のシステムのプロセスノイズと観測ノイズ
true_process_variance = 0.01
true_measurement_variance = 0.1
true_Q = true_process_variance * np.eye(2)
true_R = np.array([[true_measurement_variance]])
# シミュレーションデータを生成
n_timesteps = 50
true_states = []
observations = []
# 初期状態(位置と速度)
true_state = np.array([[0], [1]])
np.random.seed(42)
for _ in range(n_timesteps):
# システムの真の状態の更新(プロセスノイズ付き)
true_state = A @ true_state + np.random.multivariate_normal([0, 0], true_Q).reshape(-1, 1)
true_states.append(true_state.flatten())
# 観測値の生成(観測ノイズ付き)
observation = H @ true_state + np.random.normal(0, np.sqrt(true_measurement_variance))
observations.append(observation.flatten())
# 初期パラメータの設定(推定値)
Q_est = np.eye(2) # プロセスノイズ共分散行列の初期値
R_est = np.eye(1) # 観測ノイズ共分散行列の初期値
# 初期推定誤差共分散行列
P = np.eye(2)
# EMアルゴリズムのパラメータ設定
max_iter = 50 # 最大反復回数
tol = 1e-4 # 収束判定の閾値
# 初期状態の設定
x_est = np.array([[0], [0]]) # 状態推定の初期値
# EMアルゴリズムを用いたパラメータ推定
for iteration in range(max_iter):
# Eステップ: カルマンフィルタを使用して状態と共分散を推定
x_estimates = [] # 状態の推定値を保存
P_estimates = [] # 共分散行列を保存
for observation in observations:
# 予測ステップ
x_est = A @ x_est
P = A @ P @ A.T + Q_est
# 更新ステップ
y = observation - H @ x_est
S = H @ P @ H.T + R_est
K = P @ H.T @ np.linalg.inv(S)
x_est = x_est + K @ y
P = P - K @ H @ P
x_estimates.append(x_est)
P_estimates.append(P)
# 状態推定のリストを配列に変換
x_estimates = np.array(x_estimates)
P_estimates = np.array(P_estimates)
# Mステップ: 新しいQとRを推定
# プロセスノイズ共分散行列Qの推定
Q_new = np.zeros((2, 2))
for t in range(1, n_timesteps):
delta_x = x_estimates[t] - A @ x_estimates[t - 1]
Q_new += delta_x @ delta_x.T + P_estimates[t] - A @ P_estimates[t - 1] @ A.T
Q_new /= (n_timesteps - 1)
# 観測ノイズ共分散行列Rの推定
R_new = np.zeros((1, 1))
for t in range(n_timesteps):
delta_y = observations[t] - H @ x_estimates[t]
R_new += delta_y @ delta_y.T
R_new /= n_timesteps
# パラメータの変化量を確認
Q_diff = np.linalg.norm(Q_new - Q_est)
R_diff = np.linalg.norm(R_new - R_est)
# パラメータを更新
Q_est = Q_new
R_est = R_new
print(f"Iteration {iteration + 1}: Q_diff = {Q_diff:.6f}, R_diff = {R_diff:.6f}")
# 収束判定
if Q_diff < tol and R_diff < tol:
print("Converged.")
break
# 推定されたパラメータの表示
print("Estimated Q:\n", Q_est)
print("Estimated R:\n", R_est)
# 推定された状態をプロット
true_positions = [state[0] for state in true_states]
estimated_positions = [state[0][0] for state in x_estimates]
plt.figure(figsize=(12, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot([obs[0] for obs in observations], label='Measured Position', linestyle='dotted', color='orange')
plt.plot(estimated_positions, label='Estimated Position (After Parameter Estimation)', linestyle='dashed', color='green')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.legend()
plt.title('Kalman Filter Parameter Estimation with EM Algorithm')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# 状態遷移関数と観測関数の定義
def f(x):
"""
状態遷移関数
x = [位置, 速度]
"""
dt = 1.0 # 時間の間隔
F = np.array([[1, dt],
[0, 1]]) # 状態遷移行列
return F @ x
def h(x):
"""
観測関数(非線形)
"""
return np.array([np.sqrt(x[0]**2 + x[1]**2)]) # 位置と速度のベクトル長を観測
# ヤコビ行列の計算
def jacobian_f(x):
"""
状態遷移関数のヤコビ行列
"""
dt = 1.0
return np.array([[1, dt],
[0, 1]])
def jacobian_h(x):
"""
観測関数のヤコビ行列
"""
dist = np.sqrt(x[0]**2 + x[1]**2)
return np.array([[x[0] / dist, x[1] / dist]])
# 初期化
x = np.array([1, 0.5]) # 初期状態(位置と速度)
P = np.eye(2) # 初期誤差共分散行列
Q = 0.1 * np.eye(2) # プロセスノイズ共分散行列
R = np.array([[0.5]]) # 観測ノイズ共分散行列
# 真の初期状態
x_true = np.array([1, 0.5])
n_timesteps = 30
# シミュレーションデータの生成
np.random.seed(0)
observations = []
true_states = []
for _ in range(n_timesteps):
# 真の状態の更新(プロセスノイズ付き)
x_true = f(x_true) + np.random.multivariate_normal([0, 0], Q)
true_states.append(x_true)
# 観測値の生成(観測ノイズ付き)
observation = h(x_true) + np.random.normal(0, np.sqrt(R[0, 0]))
observations.append(observation)
# 拡張カルマンフィルタの適用
estimated_states = []
for z in observations:
# 予測ステップ
x_pred = f(x)
F_jac = jacobian_f(x) # 状態遷移関数のヤコビ行列
P_pred = F_jac @ P @ F_jac.T + Q
# 更新ステップ
H_jac = jacobian_h(x_pred) # 観測関数のヤコビ行列
y = z - h(x_pred) # 観測残差
S = H_jac @ P_pred @ H_jac.T + R # 観測残差の共分散行列
K = P_pred @ H_jac.T @ np.linalg.inv(S) # カルマンゲインの計算
# 状態と共分散行列の更新
x = x_pred + K @ y
P = P_pred - K @ H_jac @ P_pred
estimated_states.append(x)
# 結果のプロット
true_positions = [state[0] for state in true_states]
estimated_positions = [state[0] for state in estimated_states]
measured_positions = [obs[0] for obs in observations]
plt.figure(figsize=(12, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot(measured_positions, label='Measured Position (Noisy)', linestyle='dotted', color='orange')
plt.plot(estimated_positions, label='EKF Estimated Position', linestyle='dashed', color='green')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.legend()
plt.title('Simple Extended Kalman Filter for Non-linear System')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# システムの設定
def f(x):
"""
状態遷移関数(非線形)
x = [位置, 速度]
"""
dt = 0.1 # 時間の間隔
return np.array([x[0] + x[1] * dt, x[1]]) # 次の位置と速度を計算
def h(x):
"""
観測関数(非線形)
観測は位置のみ
"""
return np.array([np.sin(x[0])]) # 状態の位置のsinを観測
# 初期設定
n_ensembles = 100 # エンセmblesの数
x_dim = 2 # 状態の次元
z_dim = 1 # 観測の次元
# 初期状態(真の状態とエンセmbles)
x_true = np.array([1.0, 0.5]) # 真の状態(位置と速度)
ensembles = np.random.multivariate_normal([1.0, 0.5], np.eye(2) * 0.1, size=n_ensembles)
# 共分散行列
Q = np.array([[1e-3, 0], # プロセスノイズ共分散行列
[0, 1e-3]])
R = np.array([[0.1]]) # 観測ノイズ共分散行列
# シミュレーションデータの生成
np.random.seed(0)
n_timesteps = 50
true_states = []
observations = []
for _ in range(n_timesteps):
# 真の状態の更新(プロセスノイズ付き)
x_true = f(x_true) + np.random.multivariate_normal([0, 0], Q)
true_states.append(x_true)
# 観測値の生成(観測ノイズ付き)
observation = h(x_true) + np.random.normal(0, np.sqrt(R[0, 0]), size=(z_dim,))
observations.append(observation)
# 拡散カルマンフィルタ(EnKF)による状態推定
estimated_states = []
for z in observations:
# 1. 予測ステップ
for i in range(n_ensembles):
ensembles[i] = f(ensembles[i]) + np.random.multivariate_normal([0, 0], Q) # 各エンセmblesを予測
# 2. 観測値を用いた更新ステップ
# エンセmblesの平均と共分散を計算
x_mean = np.mean(ensembles, axis=0) # エンセmblesの平均(予測された平均状態)
P_ensemble = np.cov(ensembles, rowvar=False) # エンセmblesの共分散行列
# 観測のエンセmblesを作成
ensemble_observations = np.array([h(ens) for ens in ensembles])
# 観測エンセmblesの平均と共分散を計算
z_mean = np.mean(ensemble_observations, axis=0) # 観測エンセmblesの平均
P_zz = np.cov(ensemble_observations, rowvar=False) + R # 観測エンセmblesの共分散 + 観測ノイズ共分散
P_xz = np.cov(ensembles.T, ensemble_observations.T, bias=True)[0:x_dim, x_dim:] # 状態と観測の共分散
# カルマンゲインを計算
K = P_xz @ np.linalg.inv(P_zz)
# 各エンセmblesを更新
for i in range(n_ensembles):
ensembles[i] = ensembles[i] + K @ (z - ensemble_observations[i])
# 推定された状態の保存(エンセmblesの平均を推定値とする)
estimated_states.append(np.mean(ensembles, axis=0))
# 結果のプロット
true_positions = [state[0] for state in true_states]
estimated_positions = [state[0] for state in estimated_states]
measured_positions = [obs[0] for obs in observations]
plt.figure(figsize=(12, 6))
plt.plot(true_positions, label='True Position', color='blue')
plt.plot(measured_positions, label='Measured Position (Noisy)', linestyle='dotted', color='orange')
plt.plot(estimated_positions, label='EnKF Estimated Position', linestyle='dashed', color='green')
plt.xlabel('Time Step')
plt.ylabel('Position')
plt.legend()
plt.title('Ensemble Kalman Filter for Non-linear System')
plt.show()
import numpy as np
import matplotlib.pyplot as plt
# バッテリの等価回路モデルパラメータ設定
R0 = 0.015 # 内部抵抗(オーム)
R1 = 0.01 # 可変抵抗(オーム)
C1 = 2000 # キャパシタンス(ファラッド)
# サンプリング時間
dt = 1.0 # 時間の間隔(秒)
# 初期状態
SOC = 0.8 # 初期の充電状態 (0.0 ~ 1.0)
V_OCV = 3.7 # 初期の開回路電圧 (V)
# 状態遷移行列と観測行列の設定
A = np.array([[1, -dt / (R1 * C1)],
[0, 1]])
B = np.array([[dt / C1], [0]])
H = np.array([[1, 0]])
# プロセスノイズと観測ノイズの共分散行列
Q = np.array([[1e-5, 0], [0, 1e-5]]) # プロセスノイズ共分散行列
R = np.array([[1e-3]]) # 観測ノイズ共分散行列
# 初期の誤差共分散行列
P = np.eye(2)
# 初期状態(SOCとキャパシタ電圧)
x = np.array([[SOC], [0.0]]) # 状態ベクトル [SOC, キャパシタ電圧]
# シミュレーション用のデータ生成
n_timesteps = 50
I = 1.0 # 放電電流(A, 定電流)
true_states = []
observations = []
# シミュレーション用の真の状態と観測値を生成
np.random.seed(0)
for _ in range(n_timesteps):
# 状態の更新(状態遷移)
x = A @ x + B * I
x[0, 0] = max(0, min(1, x[0, 0])) # SOCを0〜1の範囲に制限
# 真の状態を保存
true_states.append(x.flatten())
# 出力電圧の計算 (V = OCV - R0 * I - V_C1)
V_C1 = x[1, 0]
V_OCV = 3.7 + 0.1 * (x[0, 0] - 0.5) # OCVをSOCの線形関数と仮定
V_out = V_OCV - R0 * I - V_C1
# 観測値にノイズを加える
observation = V_out + np.random.normal(0, np.sqrt(R[0, 0]))
observations.append(observation)
# 拡張カルマンフィルタの適用
estimated_states = []
x_est = np.array([[0.8], [0.0]]) # 初期の推定状態
P_est = np.eye(2)
for z in observations:
# 1. 予測ステップ
x_pred = A @ x_est + B * I # 状態の予測
P_pred = A @ P_est @ A.T + Q # 誤差共分散行列の予測
# 2. 更新ステップ
# 観測残差の計算
V_C1_pred = x_pred[1, 0]
V_OCV_pred = 3.7 + 0.1 * (x_pred[0, 0] - 0.5) # OCVの線形モデル
V_out_pred = V_OCV_pred - R0 * I - V_C1_pred
y = z - V_out_pred # 観測残差
# 観測残差を行列形式に変換
y = np.array([[y]])
# 観測行列 H のヤコビ行列(線形化)
H_jacobian = np.array([[1, -1]])
# 観測残差共分散 S の計算
S = H_jacobian @ P_pred @ H_jacobian.T + R
# カルマンゲイン K の計算
K = P_pred @ H_jacobian.T @ np.linalg.inv(S)
# 状態と誤差共分散行列の更新
x_est = x_pred + K @ y
P_est = P_pred - K @ H_jacobian @ P_pred
# 推定結果を保存
estimated_states.append(x_est.flatten())
# 結果のプロット
true_SOC = [state[0] for state in true_states]
estimated_SOC = [state[0] for state in estimated_states]
plt.figure(figsize=(12, 6))
plt.plot(true_SOC, label='True SOC', color='blue')
plt.plot(estimated_SOC, label='Estimated SOC (EKF)', linestyle='dashed', color='green')
plt.xlabel('Time Step')
plt.ylabel('State of Charge (SOC)')
plt.legend()
plt.title('State of Charge Estimation using Extended Kalman Filter')
plt.show()
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from sktime.datasets import load_airline
from sktime.forecasting.trend import PolynomialTrendForecaster
# サンプルデータの読み込み (sktimeのairlineデータセットを使用)
y = load_airline()
# データの確認
y.plot(title="Airline Passengers Data", ylabel="Number of Passengers", xlabel="Time")
plt.show()
# Statsmodelsを用いたトレンド・季節性分解
result = seasonal_decompose(y, model='multiplicative')
result.plot()
plt.show()
# 季節調整後のデータ(季節性を除去)
seasonally_adjusted = y / result.seasonal
# 季節調整後のデータプロット
seasonally_adjusted.plot(title="Seasonally Adjusted Data", ylabel="Number of Passengers", xlabel="Time")
plt.show()
# sktimeを用いたトレンド除去
forecaster = PolynomialTrendForecaster(degree=1) # 1次の多項式(線形トレンド)をフィット
forecaster.fit(y)
y_trend = forecaster.predict(fh=[i for i in range(1, len(y) + 1)]) # トレンド予測
# トレンド除去後の残差を算出
detrended = y - y_trend
# トレンド除去後のデータプロット
detrended.plot(title="Detrended Data", ylabel="Number of Passengers", xlabel="Time")
plt.show()
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing
# サンプルデータの作成(ここでは、毎月の売上データを例とします)
data = {
'Month': pd.date_range(start='2020-01-01', periods=24, freq='M'),
'Sales': [200, 210, 190, 220, 230, 240, 220, 210, 250, 260, 270, 280,
300, 310, 320, 330, 340, 330, 350, 360, 370, 380, 390, 400]
}
df = pd.DataFrame(data)
df.set_index('Month', inplace=True)
# データのプロット
df['Sales'].plot(title="Monthly Sales Data", ylabel="Sales", xlabel="Month")
plt.show()
# 1. 単純指数平滑化法(Simple Exponential Smoothing)
ses_model = SimpleExpSmoothing(df['Sales']).fit(smoothing_level=0.2, optimized=False)
df['SES'] = ses_model.fittedvalues
# 2. ホルトの二重指数平滑化法(Double Exponential Smoothing)
holt_model = ExponentialSmoothing(df['Sales'], trend='add').fit()
df['Holt'] = holt_model.fittedvalues
# 3. ホルト・ウィンターの三重指数平滑化法(Holt-Winters Exponential Smoothing)
holt_winters_model = ExponentialSmoothing(df['Sales'], trend='add', seasonal='add', seasonal_periods=12).fit()
df['Holt-Winters'] = holt_winters_model.fittedvalues
# 各モデルの結果をプロット
df[['Sales', 'SES', 'Holt', 'Holt-Winters']].plot(title="Exponential Smoothing Methods", ylabel="Sales", xlabel="Month")
plt.show()
# 各モデルの予測値を表示
print("Simple Exponential Smoothing Predicted Values:\n", ses_model.forecast(steps=12))
print("Holt's Method Predicted Values:\n", holt_model.forecast(steps=12))
print("Holt-Winters Method Predicted Values:\n", holt_winters_model.forecast(steps=12))
import numpy as np
import matplotlib.pyplot as plt
# ローカルレベルモデルのカルマンフィルタクラスを定義
class KalmanFilterLocalLevel:
def __init__(self, initial_state, initial_covariance, process_var, observation_var):
"""
カルマンフィルタの初期設定
:param initial_state: 初期状態推定値
:param initial_covariance: 初期誤差共分散
:param process_var: プロセスノイズの分散 σ_w^2
:param observation_var: 観測ノイズの分散 σ_v^2
"""
self.state_estimate = initial_state # 初期状態
self.covariance_estimate = initial_covariance # 初期共分散
self.process_var = process_var # プロセスノイズの分散
self.observation_var = observation_var # 観測ノイズの分散
def predict(self):
"""予測ステップ"""
# 状態予測 (状態方程式: x_t = x_{t-1} + w_t)
self.state_predict = self.state_estimate # 状態の予測
# 誤差共分散の予測
self.covariance_predict = self.covariance_estimate + self.process_var
def update(self, observation):
"""更新ステップ"""
# カルマンゲインの計算
kalman_gain = self.covariance_predict / (self.covariance_predict + self.observation_var)
# 状態推定値の更新
self.state_estimate = self.state_predict + kalman_gain * (observation - self.state_predict)
# 誤差共分散の更新
self.covariance_estimate = (1 - kalman_gain) * self.covariance_predict
return self.state_estimate
# データの生成(真の状態と観測値)
np.random.seed(42)
n_timesteps = 50
true_states = np.cumsum(np.random.normal(0, 1, n_timesteps)) # 真の状態(ランダムウォーク)
observations = true_states + np.random.normal(0, 2, n_timesteps) # 観測値(真の状態 + 観測ノイズ)
# カルマンフィルタの初期設定
initial_state = 0 # 初期状態推定値
initial_covariance = 1 # 初期誤差共分散
process_var = 1 # プロセスノイズの分散
observation_var = 4 # 観測ノイズの分散
# カルマンフィルタのインスタンス化
kf = KalmanFilterLocalLevel(initial_state, initial_covariance, process_var, observation_var)
# フィルタリングと予測
filtered_states = []
for obs in observations:
kf.predict() # 予測ステップ
filtered_state = kf.update(obs) # 更新ステップ
filtered_states.append(filtered_state)
# 結果の可視化
plt.figure(figsize=(10, 6))
plt.plot(true_states, label='True State', linestyle='--', color='black')
plt.plot(observations, label='Observations', color='red', alpha=0.5)
plt.plot(filtered_states, label='Kalman Filter Estimate', color='blue')
plt.xlabel('Time Step')
plt.ylabel('State Value')
plt.legend()
plt.title('Kalman Filter for Local Level Model')
plt.show()
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pymc3 as pm
import statsmodels.api as sm
# 1. データの生成
np.random.seed(42)
n = 100
time = np.arange(n)
# 介入前のデータ
pre_intervention = 0.5 * time + 5 + np.random.normal(scale=5, size=n)
# 介入効果を追加(例えば20番目以降に介入があったとする)
intervention_point = 20
post_intervention = pre_intervention.copy()
post_intervention[intervention_point:] += 20 # 介入による効果
# データフレームにまとめる
data = pd.DataFrame({
'time': time,
'y': post_intervention,
'intervention': (time >= intervention_point).astype(int)
})
# 2. 因果推論:介入前後の効果の測定
# 単純な介入前後の平均の差分を計算
pre_mean = data.loc[data['intervention'] == 0, 'y'].mean()
post_mean = data.loc[data['intervention'] == 1, 'y'].mean()
effect_size = post_mean - pre_mean
print(f"Estimated Effect Size (Mean Difference): {effect_size:.2f}")
# 3. 時系列分析:介入効果を含む時系列回帰
# ダミー変数を用いた時系列回帰
X = sm.add_constant(data[['time', 'intervention']])
model = sm.OLS(data['y'], X).fit()
print(model.summary())
# 4. ベイズ推定:pymc3を用いたモデルの作成
with pm.Model() as model:
# 事前分布の定義
beta_0 = pm.Normal("beta_0", mu=0, sigma=10) # 切片
beta_1 = pm.Normal("beta_1", mu=0, sigma=10) # 時間に対する傾き
beta_2 = pm.Normal("beta_2", mu=0, sigma=10) # 介入効果の係数
# 標準偏差
sigma = pm.HalfNormal("sigma", sigma=10)
# 線形モデル
mu = beta_0 + beta_1 * data['time'] + beta_2 * data['intervention']
# 尤度関数
y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=data['y'])
# サンプリング
trace = pm.sample(1000, return_inferencedata=False)
# 5. ベイズ推定の結果をプロット
pm.plot_trace(trace)
plt.show()
# 6. 介入効果の推定値を表示
pm.summary(trace, var_names=["beta_2"])
# 必要なライブラリをインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
# ダミーの時系列データを作成
np.random.seed(0)
data = np.random.randn(100).cumsum() # ランダムな時系列データを生成
df = pd.DataFrame(data, columns=["Value"])
df["Time"] = pd.date_range(start="2020-01-01", periods=len(df), freq="D")
# 第1章:時系列予測 (ARIMA)
def time_series_forecast(df):
# ARIMAモデルの定義
model = ARIMA(df["Value"], order=(5, 1, 0))
model_fit = model.fit()
# 予測
forecast = model_fit.forecast(steps=10)
df_forecast = pd.DataFrame(forecast, columns=["Forecast"])
df_forecast["Time"] = pd.date_range(start=df["Time"].iloc[-1] + pd.Timedelta(days=1), periods=10, freq="D")
# プロット
plt.figure(figsize=(12, 6))
plt.plot(df["Time"], df["Value"], label="Actual")
plt.plot(df_forecast["Time"], df_forecast["Forecast"], label="Forecast", linestyle="--")
plt.title("Time Series Forecast (ARIMA)")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.show()
return df_forecast
# 第2章:単純な未来予測 (移動平均)
def simple_moving_average_forecast(df, window=5):
df["SMA"] = df["Value"].rolling(window=window).mean()
df["SMA_Forecast"] = df["SMA"].shift(-window)
# プロット
plt.figure(figsize=(12, 6))
plt.plot(df["Time"], df["Value"], label="Actual")
plt.plot(df["Time"], df["SMA"], label="Simple Moving Average", linestyle="--")
plt.title("Simple Moving Average Forecast")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.show()
return df
# 第3章:ランダムウォーク
def random_walk_forecast(df, steps=10):
last_value = df["Value"].iloc[-1]
random_steps = np.random.randn(steps).cumsum()
forecast_values = last_value + random_steps
df_random_walk = pd.DataFrame(forecast_values, columns=["Random_Walk"])
df_random_walk["Time"] = pd.date_range(start=df["Time"].iloc[-1] + pd.Timedelta(days=1), periods=steps, freq="D")
# プロット
plt.figure(figsize=(12, 6))
plt.plot(df["Time"], df["Value"], label="Actual")
plt.plot(df_random_walk["Time"], df_random_walk["Random_Walk"], label="Random Walk Forecast", linestyle="--")
plt.title("Random Walk Forecast")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.show()
return df_random_walk
# 各章の関数を実行
print("Chapter 1: Time Series Forecast")
forecast_result = time_series_forecast(df)
print("\nChapter 2: Simple Moving Average Forecast")
sma_result = simple_moving_average_forecast(df)
print("\nChapter 3: Random Walk Forecast")
random_walk_result = random_walk_forecast(df)
# 必要なライブラリのインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
# ダミーの時系列データを作成
np.random.seed(42)
data = np.random.randn(200).cumsum() # ランダムな時系列データを生成
date_index = pd.date_range(start="2020-01-01", periods=len(data), freq="D")
df = pd.DataFrame(data, index=date_index, columns=["Value"])
# 第4章:移動平均プロセスのモデル化
def moving_average_model(df, order=1):
model = ARIMA(df["Value"], order=(0, 0, order))
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第5章:自己回帰プロセスのモデル化
def autoregressive_model(df, order=1):
model = ARIMA(df["Value"], order=(order, 0, 0))
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第6章:複雑な時系列のモデル化(ARIMAを使用)
def complex_time_series_model(df, p=2, d=1, q=2):
model = ARIMA(df["Value"], order=(p, d, q))
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第7章:非定常時系列の予測(ADF検定と差分)
def non_stationary_series_forecast(df):
result = adfuller(df["Value"])
print("ADF Statistic:", result[0])
print("p-value:", result[1])
if result[1] > 0.05:
# 非定常時系列なので差分を取る
df["Diff"] = df["Value"].diff().dropna()
model = ARIMA(df["Diff"].dropna(), order=(1, 0, 1))
model_fit = model.fit()
print(model_fit.summary())
return model_fit
else:
print("The series is stationary.")
return None
# 第8章:季節性の考慮(SARIMAモデル)
def seasonal_model(df, seasonal_order=(1, 1, 1, 12)):
model = SARIMAX(df["Value"], order=(1, 1, 1), seasonal_order=seasonal_order)
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第9章:モデルへの外部変数の追加(回帰モデルを使用)
def model_with_exogenous_variables(df):
# ダミーの外部変数データを生成
exogenous_data = np.random.randn(len(df), 1)
df["Exogenous"] = exogenous_data
model = SARIMAX(df["Value"], exog=df["Exogenous"], order=(1, 1, 1))
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第10章:複数の時系列の予測(VARモデルを使用)
from statsmodels.tsa.api import VAR
def multiple_time_series_forecast(df):
df["Other_Series"] = np.random.randn(len(df)).cumsum()
model = VAR(df[["Value", "Other_Series"]])
model_fit = model.fit()
print(model_fit.summary())
return model_fit
# 第11章:キャップストーンプロジェクト(オーストラリアの抗糖尿病薬処方数の予測)
def capstone_forecast(df):
# データを標準化して予測
scaler = StandardScaler()
df["Value_Scaled"] = scaler.fit_transform(df[["Value"]])
# ARIMAモデルで予測
model = ARIMA(df["Value_Scaled"], order=(5, 1, 0))
model_fit = model.fit()
print(model_fit.summary())
# 予測結果を元のスケールに戻す
forecast = model_fit.forecast(steps=10)
forecast_original_scale = scaler.inverse_transform(forecast.reshape(-1, 1))
# プロット
plt.figure(figsize=(12, 6))
plt.plot(df.index, df["Value"], label="Actual")
plt.plot(pd.date_range(start=df.index[-1] + pd.Timedelta(days=1), periods=10, freq="D"),
forecast_original_scale, label="Forecast", linestyle="--")
plt.title("Capstone: Forecast of Antidiabetic Drug Prescription in Australia")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.show()
return model_fit
# 各章の関数を実行
print("Chapter 4: Moving Average Process Model")
moving_average_model(df)
print("\nChapter 5: Autoregressive Process Model")
autoregressive_model(df)
print("\nChapter 6: Complex Time Series Model")
complex_time_series_model(df)
print("\nChapter 7: Non-Stationary Series Forecast")
non_stationary_series_forecast(df)
print("\nChapter 8: Seasonal Model")
seasonal_model(df)
print("\nChapter 9: Model with Exogenous Variables")
model_with_exogenous_variables(df)
print("\nChapter 10: Multiple Time Series Forecast")
multiple_time_series_forecast(df)
print("\nChapter 11: Capstone Forecast: Antidiabetic Drug Prescription in Australia")
capstone_forecast(df)
# 必要なライブラリのインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, Flatten
from sklearn.preprocessing import MinMaxScaler
# ダミーの時系列データを作成(家庭の電力消費量を模擬)
np.random.seed(42)
data = np.sin(np.linspace(0, 100, 200)) + np.random.randn(200) * 0.5
date_index = pd.date_range(start="2020-01-01", periods=len(data), freq="D")
df = pd.DataFrame(data, index=date_index, columns=["Electricity_Consumption"])
# データのスケーリング
scaler = MinMaxScaler()
df_scaled = pd.DataFrame(scaler.fit_transform(df), columns=df.columns, index=df.index)
# トレーニングとテストデータの分割
train_size = int(len(df_scaled) * 0.8)
train, test = df_scaled.iloc[:train_size], df_scaled.iloc[train_size:]
# データウィンドウの作成
def create_data_window(data, window_size=5):
X, y = [], []
for i in range(len(data) - window_size):
X.append(data[i:i+window_size])
y.append(data[i+window_size])
return np.array(X), np.array(y)
# 第13章:データウィンドウの作成
window_size = 10
X_train, y_train = create_data_window(train["Electricity_Consumption"].values, window_size)
X_test, y_test = create_data_window(test["Electricity_Consumption"].values, window_size)
# ディープラーニング用にデータの形を整形
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# 第15章:LSTMによる予測モデル
def lstm_model(X_train, y_train, X_test, y_test, epochs=50):
model = Sequential()
model.add(LSTM(50, activation='relu', input_shape=(X_train.shape[1], 1)))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
model.fit(X_train, y_train, epochs=epochs, verbose=0)
return model
# LSTMモデルをトレーニング
lstm = lstm_model(X_train, y_train, X_test, y_test)
# LSTMによる予測
lstm_predictions = lstm.predict(X_test)
# 第16章:CNNによる時系列のフィルタリングモデル
def cnn_model(X_train, y_train, X_test, y_test, epochs=50):
model = Sequential()
model.add(Conv1D(64, kernel_size=2, activation='relu', input_shape=(X_train.shape[1], 1)))
model.add(Flatten())
model.add(Dense(50, activation='relu'))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
model.fit(X_train, y_train, epochs=epochs, verbose=0)
return model
# CNNモデルをトレーニング
cnn = cnn_model(X_train, y_train, X_test, y_test)
# CNNによる予測
cnn_predictions = cnn.predict(X_test)
# 第17章:予測を使ってさらに予測を行う
# LSTMモデルを用いて、予測を再帰的に行い、未来の値を予測
def recursive_forecast(model, initial_input, n_steps=10):
forecast = []
current_input = initial_input.reshape(1, initial_input.shape[0], 1)
for _ in range(n_steps):
pred = model.predict(current_input)[0]
forecast.append(pred)
current_input = np.append(current_input[:, 1:], [[pred]], axis=1)
return np.array(forecast)
# 初期のテストデータから再帰的に予測を行う
recursive_predictions = recursive_forecast(lstm, X_test[0], n_steps=10)
# 第18章:キャップストーンプロジェクト - 家庭の電力消費量の予測
# 訓練データとテストデータのプロット
plt.figure(figsize=(12, 6))
plt.plot(df.index[:train_size], df["Electricity_Consumption"][:train_size], label="Train Data")
plt.plot(df.index[train_size:], df["Electricity_Consumption"][train_size:], label="Test Data")
plt.plot(df.index[train_size+window_size:train_size+window_size+len(lstm_predictions)],
scaler.inverse_transform(lstm_predictions), label="LSTM Predictions", linestyle="--")
plt.plot(df.index[train_size+window_size:train_size+window_size+len(cnn_predictions)],
scaler.inverse_transform(cnn_predictions), label="CNN Predictions", linestyle="--")
plt.title("Capstone: Electricity Consumption Forecast using LSTM and CNN")
plt.xlabel("Time")
plt.ylabel("Electricity Consumption")
plt.legend()
plt.show()
# 必要なライブラリのインポート
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from fbprophet import Prophet
from sklearn.metrics import mean_squared_error
# ダミーの時系列データを作成(カナダのステーキ肉の月間平均小売価格を模擬)
np.random.seed(42)
months = pd.date_range(start="2015-01-01", periods=60, freq="M")
steak_prices = 20 + np.sin(np.linspace(0, 12, 60)) + np.random.randn(60) * 2 # サイン波+ノイズ
df = pd.DataFrame({"ds": months, "y": steak_prices})
# 第19章:Prophetを使った時系列予測の自動化
def prophet_time_series_forecast(df, periods=12):
# Prophetモデルの作成とフィッティング
model = Prophet()
model.fit(df)
# 未来のデータフレームの作成
future = model.make_future_dataframe(periods=periods, freq='M')
# 予測を行う
forecast = model.predict(future)
# 結果をプロット
model.plot(forecast)
plt.title("Prophet Forecast of Steak Prices")
plt.xlabel("Date")
plt.ylabel("Steak Price")
plt.show()
return forecast
# 第20章:キャップストーンプロジェクト - カナダでのステーキ肉の月間平均小売価格の予測
def capstone_project_prophet(df, periods=12):
# データを訓練とテストに分割
train_size = int(len(df) * 0.8)
train, test = df.iloc[:train_size], df.iloc[train_size:]
# Prophetによる予測
forecast = prophet_time_series_forecast(train, periods=periods)
# テストデータとの比較
test_forecast = forecast.iloc[-len(test):][["ds", "yhat"]]
test_forecast = test_forecast.set_index("ds")
test = test.set_index("ds")
# 平均二乗誤差の計算
mse = mean_squared_error(test["y"], test_forecast["yhat"])
print(f"Mean Squared Error on Test Data: {mse:.4f}")
# テストデータと予測のプロット
plt.figure(figsize=(12, 6))
plt.plot(train["ds"], train["y"], label="Training Data")
plt.plot(test.index, test["y"], label="Test Data")
plt.plot(test_forecast.index, test_forecast["yhat"], label="Prophet Predictions", linestyle="--")
plt.title("Capstone: Forecast of Monthly Average Retail Price of Steak in Canada")
plt.xlabel("Date")
plt.ylabel("Steak Price")
plt.legend()
plt.show()
return forecast
# 各章の関数を実行
print("Chapter 19: Time Series Forecast with Prophet")
prophet_forecast = prophet_time_series_forecast(df)
print("\nChapter 20: Capstone Project - Forecasting Monthly Average Retail Price of Steak in Canada")
capstone_forecast = capstone_project_prophet(df)