この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの22日目です
Day1はこちら | 全体構成を見る
今回は前回までで自動で取得できるようにした以下の特徴量を用いて、電力価格予測をしていきます。
hugging face Models の設定(※需要予測と同様)
今回のダッシュボードはGitHub Actionsで毎日cacheを更新 ⇒ Hugging Face Spaces上で公開します。そのため、GitHubを読み込めばモデルは動きますが、scalerやmodelの重みを置いておくと重くなる可能性がある他、SpacesとActionsで同じ場所から取得できるほうが好ましいためHugging Face Modelsを単一の配布先に設定します。
1. modelの保存
まずはmodelをローカルに保存します。
- model
# GAMモデル
joblib.dump(gam, "model/gam_price.pkl")
# LightGBM
model_p10.save_model("model/lgb_price_p10.txt")
model_p50.save_model("model/lgb_price_p50.txt")
model_p90.save_model("model/lgb_price_p90.txt")
2. hugging face modelsの登録(※登録済みの方はスキップ)
modelsにアップするには新たに登録が必要です。
自分のアイコンから「New Model」をクリックします。

Create a new model repositoryというページに遷移しますので、こちらからrepositoryという形で登録します。

3. アップロード
先ほど保存したmodelをアップロードします。

⇒「Update files」を選択

これでmodelsへの登録が完了です!
事前の設定
CACHE_DIR = Path("data/cache")
HF_REPO_ID = "nakamichan/power-forecast-models"
# 実績と予報
DEMAND_PATH = CACHE_DIR / "demand_forecast.parquet" # 実績も入っている
WEATHER_PATH = CACHE_DIR / "weather_bf1w_af1w.parquet"
MARKET_PATH = CACHE_DIR / "fx_commodity_30min_af1w.parquet"
PRICE_PATH = CACHE_DIR / "spot_tokyo_bf1w_tdy.parquet"
# モデル
GAM_PATH = "gam_price.pkl"
LGB10_PATH = "lgb_price_p10.txt"
LGB50_PATH = "lgb_price_p50.txt"
LGB90_PATH = "lgb_price_p90.txt"
# このモデルの結果
PRICE_OUT_PATH = CACHE_DIR / "price_forecast.parquet"
-
HF_REPO_IDからモデルをロードしてくれるようにします
モデルの特徴量
GAM_FEATURES = [
# 需要・気象
"demand", "demand_7d_ma",
"temperature", "temp_7d_ma",
"shortwave_radiation",
"wind_speed",
# 時間・カレンダー
"hour", "day_of_year",
"is_holiday"
]
LGB_FEATURES = [
"demand", "demand_7d_ma",
"temperature", "temp_7d_ma", "temperature_abs",
"wind_speed", "sunshine_duration", "shortwave_radiation",
"hour", "day_of_year", "is_holiday",
"season","daypart",
"USDJPY", "brent", "henry_hub", "coal_aus",
"USDJPY_ma30", "USDJPY_chg30",
"brent_ma30", "brent_chg30",
"henry_hub_ma30",
"coal_aus_ma90", "coal_aus_chg90",
"price_lag_24h", "price_lag_48h", "price_lag_72h", "price_lag_1w",
"y_gam"
]
予測モデル
【電力価格予測】GAM + LightGBMを使ってみるで記載したモデルです。
- GAMで予測
- GAMの出力を特徴量の一つに加えてLightGBMで予測
モデルのロード
def load_price_model() -> Any:
"""
価格予測モデルを読み込む
"""
# モデルファイルを Hub からダウンロード
gam_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=GAM_PATH,
)
lgb10_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=LGB10_PATH,
)
lgb50_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=LGB50_PATH,
)
lgb90_path = hf_hub_download(
repo_id=HF_REPO_ID,
filename=LGB90_PATH,
)
gam = joblib.load(gam_path)
lgb_p10 = lgb.Booster(model_file=lgb10_path)
lgb_p50 = lgb.Booster(model_file=lgb50_path)
lgb_p90 = lgb.Booster(model_file=lgb90_path)
return gam, lgb_p10, lgb_p50, lgb_p90
特徴量作成
month,hour,is_holidayの追加
def make_date(df):
"""
- timestamp列からmonth, hour,is_holiday列を追加する
"""
df = df.copy()
df["month"] = df["timestamp"].dt.month
df["hour"] = df["timestamp"].dt.hour
df["is_holiday"] = df["timestamp"].dt.date.apply(jpholiday.is_holiday)
def is_year_end(date):
return (date.month == 12 and date.day >= 29) or (date.month == 1 and date.day <= 3)
df["is_year_end"] = df["timestamp"].dt.date.apply(is_year_end)
def is_obon(date):
return date.month == 8 and date.day in [13, 14, 15]
df["is_obon"] = df["timestamp"].dt.date.apply(is_obon)
df["is_holiday"] = df["is_holiday"] | df["is_year_end"] | df["is_obon"]
df["is_holiday"] = df["is_holiday"].astype(int)
df.drop(columns=["is_year_end", "is_obon"], inplace=True)
return df
季節と時間カテゴリを作成する
def make_daypart(hour: int) -> str:
"""時間を6つに分ける"""
if 5 <= hour <= 7: return "dawn"
if 8 <= hour <= 11: return "morning"
if 12 <= hour <= 13:return "noon"
if 14 <= hour <= 16:return "afternoon"
if 17 <= hour <= 19:return "evening"
return "night"
def make_season(month: int) -> str:
"""月を季節ごとに4つに分ける"""
if month in (3,4,5): return "spring"
if month in (6,7,8): return "summer"
if month in (9,10,11): return "autumn"
return "winter"
気温の絶対値差分の作成
def make_temperature_abs(df, threshold=18.1):
"""
- temperature列から気温の絶対値差分の特徴量を作成する
"""
df = df.copy()
df["temperature_abs"] = (df['temperature'] - threshold).abs()
return df
def build_price_features(
demand: pd.DataFrame,
weather: pd.DataFrame,
market: pd.DataFrame,
price: pd.DataFrame,
) -> pd.DataFrame:
# timestamp を必ず datetime にしておく
demand["timestamp"] = pd.to_datetime(demand["timestamp"])
weather["timestamp"] = pd.to_datetime(weather["timestamp"])
market["timestamp"] = pd.to_datetime(market["timestamp"])
price["timestamp"] = pd.to_datetime(price["timestamp"])
# 結合
merged_df = demand.merge(weather, on="timestamp", how="left") \
.merge(market, on="timestamp", how="left") \
.merge(price, on="timestamp", how="left")
print("merged_df:\n", merged_df)
merged_df = make_date(merged_df)
merged_df['daypart'] = merged_df['hour'].apply(make_daypart)
merged_df['season'] = merged_df['month'].apply(make_season)
daypart_order = ["dawn","morning","noon","afternoon","evening","night"]
season_order = ["spring","summer","autumn","winter"]
merged_df["season"] = pd.Categorical(
merged_df["season"],
categories=season_order,
ordered=False
)
merged_df["daypart"] = pd.Categorical(
merged_df["daypart"],
categories=daypart_order,
ordered=False
)
merged_df["is_holiday"] = pd.Categorical(
merged_df["is_holiday"],
ordered=False
)
merged_df["day_of_year"] = merged_df["timestamp"].dt.dayofyear
# 過去7日移動平均の需要
merged_df["demand_7d_ma"] = merged_df["demand"].rolling(48*7, min_periods=1).mean()
# 過去7日移動平均の気温
merged_df["temp_7d_ma"] = merged_df["temperature"].rolling(48*7, min_periods=1).mean()
merged_df = make_temperature_abs(merged_df)
# 価格ラグ
merged_df["price_lag_24h"] = merged_df["tokyo_price_jpy_per_kwh"].shift(48)
merged_df["price_lag_48h"] = merged_df["tokyo_price_jpy_per_kwh"].shift(96)
merged_df["price_lag_72h"] = merged_df["tokyo_price_jpy_per_kwh"].shift(144)
merged_df["price_lag_1w"] = merged_df["tokyo_price_jpy_per_kwh"].shift(336)
# 為替・石油・ガス(日次レベル)
for col in ["USDJPY", "brent", "henry_hub"]:
# 30日移動平均
merged_df[f"{col}_ma30"] = merged_df[col].rolling(48*30, min_periods=1).mean()
# 30日前との差分
merged_df[f"{col}_chg30"] = merged_df[col] - merged_df[col].shift(48*30)
# 石炭(月次レベル)
# 90日移動平均
merged_df["coal_aus_ma90"] = merged_df["coal_aus"].rolling(48*90, min_periods=1).mean()
# 90日前との差分
merged_df["coal_aus_chg90"] = merged_df["coal_aus"] - merged_df["coal_aus"].shift(48*90)
return merged_df
- LightGBMはカテゴリカル特徴量をそのまま処理してくれるので、
pandas.Categoricalで変換しています
GAMで推論
def predict_gam(
gam_model: Optional[Any],
test_df
) -> pd.DataFrame:
"""
GAMモデルで推論し、y_gamをtest_dfに代入してLightGBMでも使用
"""
X_gam_test = test_df.copy()
X_gam_test["is_holiday"] = X_gam_test["is_holiday"].cat.codes.astype("int8")
X_gam_test = X_gam_test[GAM_FEATURES].to_numpy()
print("X_gam_test: \n", X_gam_test)
test_df["y_gam"] = gam_model.predict(X_gam_test)
return test_df
- GAM実装は 入力を数値行列として受け取るので、
Categoricalのままだとエラーになります(Categorical変換はLightGBM用)→cat.codesで数値化して渡しています -
test_dfにy_gamが加わった状態です
LightGBMで推論
def predict_lgb(
lgb_p10, lgb_p50, lgb_p90,test_df
) -> pd.DataFrame:
"""
"""
print(test_df.columns)
X_lgb_test = test_df[LGB_FEATURES]
r10_test = lgb_p10.predict(X_lgb_test)
r50_test = lgb_p50.predict(X_lgb_test)
r90_test = lgb_p90.predict(X_lgb_test)
return r10_test, r50_test, r90_test
実行
def predict_price():
# 特徴量の読み込み
demand = pd.read_parquet(DEMAND_PATH)
print("demand\n", demand)
weather = pd.read_parquet(WEATHER_PATH)
print("weather\n", weather)
print(weather.columns)
market = pd.read_parquet(MARKET_PATH)
print("market\n", market)
price = pd.read_parquet(PRICE_PATH)
print("price\n", price)
JST = timezone(timedelta(hours=9))
today = pd.Timestamp(datetime.now(JST).date())
tomorrow = today + timedelta(days=1)
features_df, pred_time = build_price_features(demand, weather, market, price)
test_df = features_df[features_df["timestamp"] >= tomorrow].reset_index(drop=True)
pred_time = test_df["timestamp"].copy()
gam, lgb_p10, lgb_p50, lgb_p90 = load_price_model()
test_df = predict_gam(gam, test_df)
r10_test, r50_test, r90_test = predict_lgb(lgb_p10, lgb_p50, lgb_p90, test_df)
# 予測結果をparquetで保存
pred_df = pd.DataFrame({
"timestamp": pred_time,
"predicted_price(10%)": r10_test.flatten(),
"predicted_price(50%)": r50_test.flatten(),
"predicted_price(90%)": r90_test.flatten(),
}).reset_index(drop=True)
print("pred_df: \n", pred_df)
# 需給実績をくっつけて可視化で使用する(timestamp, predicted_demand, price_realized)
out = pred_df.merge(
price[["timestamp", "tokyo_price_jpy_per_kwh"]],
on="timestamp",
how="outer",
)
out["price"] = out["predicted_price(50%)"].fillna(out["tokyo_price_jpy_per_kwh"])
print("final_out:\n", out)
out.to_parquet(PRICE_OUT_PATH, index=False)
print(f"[SAVE] price forecast to {PRICE_OUT_PATH}")
if __name__ == "__main__":
predict_price()
- この関数を決まった時間に実行する設定はGithub Actionsで行いますが、それについてはDay24に記載します
明日
Streamlitによる可視化設計をしていきます!![]()