0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの21日目です
Day1はこちら | 全体構成を見る

今回は前回までで自動で取得できるようにした以下の特徴量を用いて、電力需要予測をしていきます。

hugging face Models の設定

今回のダッシュボードはGitHub Actionsで毎日cacheを更新 ⇒ Huging Face Spaces上で公開します。そのため、GitHubを読み込めばモデルは動きますが、scalerやmodelの重みを置いておくと重くなる可能性がる他、SpacesとActionsで同じ場所から取得できるほうが好ましいためHugging Face Modelsを単一の配布先に設定します。

1. scalerとmodelの保存

まずはscalerとmodelをローカルに保存します。

  • scaler
joblib.dump(scaler_y, "../model/scaler_y.pkl")
  • model
torch.save(model.state_dict(), '../model/model_demand.pth')

2. hugging face modelsの登録

modelsにアップするには新たに登録が必要です。
自分のアイコンから「New Model」をクリックします。
スクリーンショット 2025-12-20 122632.png

Create a new model repositoryというページに遷移しますので、こちらからrepositoryという形で登録します。
スクリーンショット 2025-12-20 122722.png

3. アップロード

先ほど保存したscalerとmodelをアップロードします。
image.png
⇒「Update files」を選択

スクリーンショット 2025-12-20 123503.png

これでmodelsへの登録が完了です!

予測モデル

【電力需要予測】GRUを使ってみるで記載した以下のGRUモデルです。

  • 月(sin,cos)
  • 時間
  • 時間(sin,cos)
  • 祝日フラグ
  • 湿度
  • 気温
  • 気温絶対値差分
  • 特徴量は生のまま、目的変数のみに正規化をかける

事前設定

CACHE_DIR = Path("data/cache")

WEATHER_PATH = CACHE_DIR / "weather_bf1w_af1w.parquet"
DEMAND_REALIZED_PATH = CACHE_DIR / "demand_bf1w_ytd.parquet"
DEMAND_OUT_PATH = CACHE_DIR / "demand_forecast.parquet"

HF_REPO_ID = "nakamichan/power-forecast-models"
DEMAND_MODEL = "model_demand.pth"
SCALER_Y_PATH = "scaler_y.pkl"
  • HF_REPO_IDからモデルをロードしてくれるようにします

モデルのロード部分

def load_demand_model(input_size) -> Any:
    """
    需要予測モデルを読み込んで返す
    """
    
    # モデルファイルを Hub からダウンロード
    model_path = hf_hub_download(
        repo_id=HF_REPO_ID,
        filename=DEMAND_MODEL,
    )
    
    model = GRUModel(input_size=input_size)      
    model.load_state_dict(torch.load(model_path, map_location="cpu", weights_only=True))
    return model
  • hf_hub_download: Hugging Face Modelsからローカルにキャッシュしそのパスを返す→同じファイルは次回以降ダウンロードされない
  • torch.load: ファイルの読み込み関数
    .pth.ptに保存されたstate_dict(重みの辞書)をpythonに読み込む
  • model.load_state_dict(): モデルのパラメータに重みを流し込む関数
    モデルの構造が一致していないとエラーになる

特徴量作成部分

month,hour,is_holidayの追加

def make_date(df):
    """
    - timestamp列からmonth, hour,is_holiday列を追加する
    """
    df = df.copy()

    df["month"] = df["timestamp"].dt.month
    df["hour"] = df["timestamp"].dt.hour
    
    df["is_holiday"] = df["timestamp"].dt.date.apply(jpholiday.is_holiday)
    
    def is_year_end(date):
        return (date.month == 12 and date.day >= 29) or (date.month == 1 and date.day <= 3)

    df["is_year_end"] = df["timestamp"].dt.date.apply(is_year_end)
    
    def is_obon(date):
        return date.month == 8 and date.day in [13, 14, 15]

    df["is_obon"] = df["timestamp"].dt.date.apply(is_obon)
    df["is_holiday"] = df["is_holiday"] | df["is_year_end"] | df["is_obon"]
    df["is_holiday"] = df["is_holiday"].astype(int)
    
    df.drop(columns=["is_year_end", "is_obon"], inplace=True)
    
    return df

sin.cos変換
def make_sincos(df, add_month, add_hour):
    """
    - 指定したcolにsin,cos変換を施す
    """
    df = df.copy()
    
    if add_month:
        df["month_sin"] = np.sin(2 * np.pi * df["month"] / 12)
        df["month_cos"] = np.cos(2 * np.pi * df["month"] / 12)

    if add_hour:
        df["hour_sin"] = np.sin(2 * np.pi * df["hour"] / 24)
        df["hour_cos"] = np.cos(2 * np.pi * df["hour"] / 24)

    return df
気温の絶対値差分の作成
def make_temperature_abs(df, threshold=18.1):
    """
    - temperature列から気温の絶対値差分の特徴量を作成する
    """
    df = df.copy()
    
    df["temperature_abs"] = (df['temperature'] - threshold).abs()
    
    return df
def build_demand_features(
    weather: pd.DataFrame,
    sequence_length: int,
    weather_col =  ["timestamp", "temperature", "humidity", "is_forecast"],
    ) -> pd.DataFrame:
    """
    気象データから需要予測用の特徴量を作る

    入力:
        weather: weather_1w.parquet を読んだ DataFrame
        特徴量:use_colにリストで指定

    出力:
        features: モデルに入力する特徴量 DataFrame
            - index または 'timestamp' で元の時間軸と対応すること
    """
    df = weather.copy()
    weather_df = df[weather_col]
    
    today = _today_jst()
    cut = pd.Timestamp(f"{today} 00:00:00")
    
    start_time = cut - pd.Timedelta(minutes=30 * sequence_length) # 前日の24ステップを含める
    X_test = weather_df[weather_df["timestamp"] >= start_time]
    
    # 予測期間を抜き出す
    pred_time_df = X_test[X_test["timestamp"] >= cut]
    pred_time = pred_time_df["timestamp"]
    print("prediction time: \n", pred_time)
    
    X_test = make_date(X_test)
    X_test = make_sincos(X_test, add_month=True, add_hour=True)
    X_test = make_temperature_abs(X_test)
    
    X_test.set_index("timestamp", inplace=True) 
    X_test.drop(columns="is_forecast", inplace=True)
    print("X_test shape:", X_test.shape)
    print("X_test: \n", X_test)
    
    # テンソルに変換
    features_tensor = torch.tensor(X_test.values, dtype=torch.float32)
    
    return features_tensor, pred_time

モデルが求める形状に変換

def create_sequences(data, sequence_length):
    sequences = []
    for i in range(len(data) - sequence_length):
        seq = data[i:i+sequence_length] # モデルの入力
        sequences.append(seq)
    return torch.stack(sequences)

推論

def evaluate(model, dataloader, scaler_y, device):
    model.eval()
    all_predictions = []

    with torch.no_grad():
        for batch_X in dataloader:
            batch_X = batch_X[0].to(device)
            outputs = model(batch_X) # モデルの予測値を計算
            all_predictions.append(outputs)

    predictions = scaler_y.inverse_transform(torch.cat(all_predictions).cpu().numpy()) # すべてのバッチの予測値を1つのテンソルにしてからスケールを元に戻sす
    return predictions

実行

def predict_demand():
    
    weather = pd.read_parquet(WEATHER_PATH)
    print("weather\n", weather)
    
    # timestamp を必ず datetime にしておく
    weather["timestamp"] = pd.to_datetime(weather["timestamp"])

    # 過去の一定期間(24時間分)のデータを1つの入力シーケンとしてまとめる
    sequence_length = 48
    
    scaler_y_path= hf_hub_download(
        repo_id=HF_REPO_ID,
        filename=SCALER_Y_PATH,
    )
    
    scaler_y = joblib.load(scaler_y_path)

    features_tensor, pred_time = build_demand_features(weather, sequence_length)
    print("features_tensor.shape\n", features_tensor.shape)

    # シーケンスデータ作成
    test_sequences = create_sequences(features_tensor, sequence_length)
    print("test_sequences.shape", test_sequences.shape)  # (n_samples, sequence_length, num_features)

    model = load_demand_model(input_size=features_tensor.shape[1])
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device=device)
    test_loader = DataLoader(TensorDataset(test_sequences), batch_size=128, shuffle=False)
    
    predictions = evaluate(model, test_loader, scaler_y, device)
    print("predictions: \n", predictions[:5])
    
    # 予測結果をparquetで保存
    pred_df = pd.DataFrame({
        "timestamp": pred_time,
        "predicted_demand": predictions.flatten()
    }).reset_index(drop=True)
    
    print("pred_df: \n", pred_df)
    
    demand = pd.read_parquet(DEMAND_REALIZED_PATH)
    print("demand\n", demand["timestamp"])
    
    # 需給実績をくっつけて可視化で使用する(timestamp, predicted_demand, price_realized)    
    out = pred_df.merge(
        demand,
        on="timestamp",
        how="outer",
    )
    out["demand"] = out["predicted_demand"].fillna(out["realized_demand"])
    print("final_out:\n", out)
    
    out.to_parquet(DEMAND_OUT_PATH, index=False)
    print(f"[SAVE] demand forecast to {DEMAND_OUT_PATH}")

if __name__ == "__main__":
    predict_demand()

【ダッシュボード】データ自動取得(電力需要・天気)で記載したfetch_weather関数→今回のpredict_demand関数を行えばpredictまでできるという流れになります。
決まった時間に行う設定はGithub Actionsで行いますが、それはDay24に記載します。

明日

次回は電力価格予測を自動で行っていきます!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?