この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの21日目です
Day1はこちら | 全体構成を見る
今回は前回までで自動で取得できるようにした以下の特徴量を用いて、電力需要予測をしていきます。
- 気象実績と予報(一週間前~一週間後)
- 取得方法の詳細
hugging face Models の設定
今回のダッシュボードはGitHub Actionsで毎日cacheを更新 ⇒ Huging Face Spaces上で公開します。そのため、GitHubを読み込めばモデルは動きますが、scalerやmodelの重みを置いておくと重くなる可能性がる他、SpacesとActionsで同じ場所から取得できるほうが好ましいためHugging Face Modelsを単一の配布先に設定します。
1. scalerとmodelの保存
まずはscalerとmodelをローカルに保存します。
- scaler
joblib.dump(scaler_y, "../model/scaler_y.pkl")
- model
torch.save(model.state_dict(), '../model/model_demand.pth')
2. hugging face modelsの登録
modelsにアップするには新たに登録が必要です。
自分のアイコンから「New Model」をクリックします。

Create a new model repositoryというページに遷移しますので、こちらからrepositoryという形で登録します。

3. アップロード
先ほど保存したscalerとmodelをアップロードします。

⇒「Update files」を選択
これでmodelsへの登録が完了です!
予測モデル
【電力需要予測】GRUを使ってみるで記載した以下のGRUモデルです。
- 月
- 月(sin,cos)
- 時間
- 時間(sin,cos)
- 祝日フラグ
- 湿度
- 気温
- 気温絶対値差分
- 特徴量は生のまま、目的変数のみに正規化をかける
事前設定
CACHE_DIR = Path("data/cache")
WEATHER_PATH = CACHE_DIR / "weather_bf1w_af1w.parquet"
DEMAND_REALIZED_PATH = CACHE_DIR / "demand_bf1w_ytd.parquet"
DEMAND_OUT_PATH = CACHE_DIR / "demand_forecast.parquet"
HF_REPO_ID = "nakamichan/power-forecast-models"
DEMAND_MODEL = "model_demand.pth"
SCALER_Y_PATH = "scaler_y.pkl"
-
HF_REPO_IDからモデルをロードしてくれるようにします
モデルのロード部分
def load_demand_model(input_size) -> Any:
"""
需要予測モデルを読み込んで返す
"""
# モデルファイルを Hub からダウンロード
model_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=DEMAND_MODEL,
)
model = GRUModel(input_size=input_size)
model.load_state_dict(torch.load(model_path, map_location="cpu", weights_only=True))
return model
-
hf_hub_download: Hugging Face Modelsからローカルにキャッシュしそのパスを返す→同じファイルは次回以降ダウンロードされない -
torch.load: ファイルの読み込み関数
.pthや.ptに保存されたstate_dict(重みの辞書)をpythonに読み込む -
model.load_state_dict(): モデルのパラメータに重みを流し込む関数
モデルの構造が一致していないとエラーになる
特徴量作成部分
month,hour,is_holidayの追加
def make_date(df):
"""
- timestamp列からmonth, hour,is_holiday列を追加する
"""
df = df.copy()
df["month"] = df["timestamp"].dt.month
df["hour"] = df["timestamp"].dt.hour
df["is_holiday"] = df["timestamp"].dt.date.apply(jpholiday.is_holiday)
def is_year_end(date):
return (date.month == 12 and date.day >= 29) or (date.month == 1 and date.day <= 3)
df["is_year_end"] = df["timestamp"].dt.date.apply(is_year_end)
def is_obon(date):
return date.month == 8 and date.day in [13, 14, 15]
df["is_obon"] = df["timestamp"].dt.date.apply(is_obon)
df["is_holiday"] = df["is_holiday"] | df["is_year_end"] | df["is_obon"]
df["is_holiday"] = df["is_holiday"].astype(int)
df.drop(columns=["is_year_end", "is_obon"], inplace=True)
return df
sin.cos変換
def make_sincos(df, add_month, add_hour):
"""
- 指定したcolにsin,cos変換を施す
"""
df = df.copy()
if add_month:
df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)
if add_hour:
df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)
return df
気温の絶対値差分の作成
def make_temperature_abs(df, threshold=18.1):
"""
- temperature列から気温の絶対値差分の特徴量を作成する
"""
df = df.copy()
df["temperature_abs"] = (df['temperature'] - threshold).abs()
return df
def build_demand_features(
weather: pd.DataFrame,
sequence_length: int,
weather_col = ["timestamp", "temperature", "humidity", "is_forecast"],
) -> pd.DataFrame:
"""
気象データから需要予測用の特徴量を作る
入力:
weather: weather_1w.parquet を読んだ DataFrame
特徴量:use_colにリストで指定
出力:
features: モデルに入力する特徴量 DataFrame
- index または 'timestamp' で元の時間軸と対応すること
"""
df = weather.copy()
weather_df = df[weather_col]
today = _today_jst()
cut = pd.Timestamp(f"{today} 00:00:00")
start_time = cut - pd.Timedelta(minutes=30 * sequence_length) # 前日の24ステップを含める
X_test = weather_df[weather_df["timestamp"] >= start_time]
# 予測期間を抜き出す
pred_time_df = X_test[X_test["timestamp"] >= cut]
pred_time = pred_time_df["timestamp"]
print("prediction time: \n", pred_time)
X_test = make_date(X_test)
X_test = make_sincos(X_test, add_month=True, add_hour=True)
X_test = make_temperature_abs(X_test)
X_test.set_index("timestamp", inplace=True)
X_test.drop(columns="is_forecast", inplace=True)
print("X_test shape:", X_test.shape)
print("X_test: \n", X_test)
# テンソルに変換
features_tensor = torch.tensor(X_test.values, dtype=torch.float32)
return features_tensor, pred_time
モデルが求める形状に変換
def create_sequences(data, sequence_length):
sequences = []
for i in range(len(data) - sequence_length):
seq = data[i:i+sequence_length] # モデルの入力
sequences.append(seq)
return torch.stack(sequences)
推論
def evaluate(model, dataloader, scaler_y, device):
model.eval()
all_predictions = []
with torch.no_grad():
for batch_X in dataloader:
batch_X = batch_X[0].to(device)
outputs = model(batch_X) # モデルの予測値を計算
all_predictions.append(outputs)
predictions = scaler_y.inverse_transform(torch.cat(all_predictions).cpu().numpy()) # すべてのバッチの予測値を1つのテンソルにしてからスケールを元に戻sす
return predictions
実行
def predict_demand():
weather = pd.read_parquet(WEATHER_PATH)
print("weather\n", weather)
# timestamp を必ず datetime にしておく
weather["timestamp"] = pd.to_datetime(weather["timestamp"])
# 過去の一定期間(24時間分)のデータを1つの入力シーケンとしてまとめる
sequence_length = 48
scaler_y_path= hf_hub_download(
repo_id=HF_REPO_ID,
filename=SCALER_Y_PATH,
)
scaler_y = joblib.load(scaler_y_path)
features_tensor, pred_time = build_demand_features(weather, sequence_length)
print("features_tensor.shape\n", features_tensor.shape)
# シーケンスデータ作成
test_sequences = create_sequences(features_tensor, sequence_length)
print("test_sequences.shape", test_sequences.shape) # (n_samples, sequence_length, num_features)
model = load_demand_model(input_size=features_tensor.shape[1])
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device=device)
test_loader = DataLoader(TensorDataset(test_sequences), batch_size=128, shuffle=False)
predictions = evaluate(model, test_loader, scaler_y, device)
print("predictions: \n", predictions[:5])
# 予測結果をparquetで保存
pred_df = pd.DataFrame({
"timestamp": pred_time,
"predicted_demand": predictions.flatten()
}).reset_index(drop=True)
print("pred_df: \n", pred_df)
demand = pd.read_parquet(DEMAND_REALIZED_PATH)
print("demand\n", demand["timestamp"])
# 需給実績をくっつけて可視化で使用する(timestamp, predicted_demand, price_realized)
out = pred_df.merge(
demand,
on="timestamp",
how="outer",
)
out["demand"] = out["predicted_demand"].fillna(out["realized_demand"])
print("final_out:\n", out)
out.to_parquet(DEMAND_OUT_PATH, index=False)
print(f"[SAVE] demand forecast to {DEMAND_OUT_PATH}")
if __name__ == "__main__":
predict_demand()
【ダッシュボード】データ自動取得(電力需要・天気)で記載したfetch_weather関数→今回のpredict_demand関数を行えばpredictまでできるという流れになります。
決まった時間に行う設定はGithub Actionsで行いますが、それはDay24に記載します。
明日
次回は電力価格予測を自動で行っていきます!
