0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【ダッシュボード】データ自動取得(電力需要・天気)

Last updated at Posted at 2025-12-17

この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの十八日目です
Day1はこちら | 全体構成を見る

長かったシリーズも最後の章になりました。
今日からは ダッシュボード構築 を中心に記載していきます。
このダッシュボードでは、以下の3要素が入っています。

  1. 電力需要予測
  2. 電力価格予測
  3. 電源構成の数理最適化

これらの可視化グラフは、毎日一回更新され最新の天気予報や価格を反映していきます。
今日は、ダッシュボードの土台となる「毎日自動で更新される入力データ」 のうち2種類を取得します。

  1. 東京電力パワーグリッドが公開している 電力需給実績(30分値)
  2. 将来の予測に必要な 天気予報データ

これらを requests を使って自動取得 → 整形 → parquet に保存 するところまでを実装します。

1. 電力需要

データ取得元

取得元:東京電力パワーグリッド でんき予報 CSVダウンロード
エリア需給実績データ

ここからcsvを自動ダウンロードしてくるようにします。
このcsvは30分毎に更新されております。
※基本的に土日にも更新され続けていますが11月3連休(土日月)の日に同じように行ったところ次の平日(火曜)まで更新されていませんでした(確か)。年末年始もそのパターンがあるかもしれません

必要な設定

JST = timezone(timedelta(hours=9))

PAGE_URL = "https://www.tepco.co.jp/forecast/html/area_jukyu-j.html"
CACHE_DIR = Path("data/cache")
DATA_DIR = Path("data")

ACTUAL_PATH = DATA_DIR / "actual.parquet"
DEMAND_PATH = CACHE_DIR / "demand_bf1w_ytd.parquet"

columns = [
    "date", "time", "demand", "nuclear",
    "lng", "coal", "oil", "th_other",
    "hydro", "geothermal", "biomass",
    "pv", "pv_curtailed", "wind", "wind_curtailed",
    "pstorage", "battery", "tie", "misc", "total"
]

csvを取得する

このページでは「当月分のCSV」が1ファイルにまとまって公開されています。そのため 「ページ → CSVリンクを取得 → CSVをDL」 という流れになります。
今回はcsvがリンクになってくれているのでページを操作する必要がなく簡単です。

def _get_month_csv_url(session: requests.Session, target_ym) -> str:
    """ページから当月CSVのURLを拾う(a要素のhrefに .csv が含まれるもの)"""
    
    html = session.get(PAGE_URL, timeout=30).text
    # CSVリンクをすべて抽出
    csv_links = re.findall(r'href="([^"]*eria_jukyu_\d{6}_\d{2}\.csv)"', html, re.I)

    # 昨日の月に一致するリンクだけを抽出
    filtered = [link for link in csv_links if target_ym in link]
    print(filtered)

    # 完全なURLに変換
    def complete_url(link):
        if link.startswith("//"):
            return "https:" + link
        elif link.startswith("/"):
            return "https://www.tepco.co.jp" + link
        else:
            return link

    return [complete_url(link) for link in filtered]

この関数では

  • でんき予報ページの HTML を取得
  • <a href="...csv"> を正規表現で抽出
  • 対象月(YYYYMM)に一致する CSV だけを返す

という処理を行っています。

データ整形

このcsvを手動でダウンロードする際に、いつも一行目がカラムの横にきてずれています。
そのため、1行目をカラムの数より多ければ混在しているcsvだと判断して処理します。
どこかのタイミングで修正される可能性もあるので、通常通りのパターンも残しておきます。

def read_csv(path: str) -> pd.DataFrame:
    # 1行目を読み込んで、混在しているか確認
    with open(path, encoding="MacRoman") as f:
        first_line = f.readline()

    # 1行目を分割して、カラム数より多ければ混在していると判断
    first_split = first_line.strip().split(",")
    if len(first_split) > len(columns):
        # 最初のデータ行を抽出(カラム数分以降)
        first_data = first_split[len(columns):]
        # 2行目以降をDataFrameとして読み込む
        df = pd.read_csv(path, encoding="MacRoman", skiprows=1, header=None)
        df.columns = columns
        # 最初のデータ行を先頭に挿入
        first_row = pd.DataFrame([first_data], columns=columns)
        df = pd.concat([first_row, df], ignore_index=True)
    else:
        # 通常通り読み込む
        df = pd.read_csv(path, encoding="MacRoman", skiprows=1, header=None)
        df.columns = columns

    return df

長期変動用のcsvを作成(数理最適化用)

数値最適化では、動的に発電上限を推測できるように、長期的な上限と短期的な上限を決めてどちらか小さいほうを採用していました。
その長期的な上限は、2024年2月以降から前月までの完全なcsvから推測するようにしています。

2024年2月以降からのデータを使用するようにしたのは、それ以前のcsvは詳細な電源別の発電実績の記載がされていなかったからです。
この処理は毎月一回できればよいので、安全のために8日にしています(年末年始やゴールデンウィーク等で更新が止まっても対応できる日にした)。

def update_actual_last_month(df, today, actuion_day):
    # 毎月8日以外は何もしない
    if today.day != actuion_day:
        print(f"[update_actual] today.day != {actuion_day} → skip")
        return
    
    # 先月の 1日〜末日を計算
    first_this_month = today.date().replace(day=1)
    last_month_end = first_this_month - timedelta(days=1)
    last_month_start = last_month_end.replace(day=1)
    
    print(f"[update_actual] last month = {last_month_start}{last_month_end}")

    # 先月分だけ切り出し
    mask = (
        (df["timestamp"].dt.date >= last_month_start)
        & (df["timestamp"].dt.date <= last_month_end)
    )
    last_month_df = df.loc[mask].copy()
    
    if last_month_df.empty:
        print("[update_actual] last_month_df is empty → skip")
        return
    
    # timestamp を index にして、長期用フォーマットに揃える
    last_month_df = last_month_df.set_index("timestamp").sort_index()

    # 既存 actual を読み込んで結合
    if ACTUAL_PATH.exists():
        actual = pd.read_parquet(ACTUAL_PATH)

        combined = pd.concat([actual, last_month_df])
        # 同じ timestamp があれば新しい方を残す
        combined = combined[~combined.index.duplicated(keep="last")]
    else:
        combined = last_month_df
    
    print("combined: \n", combined)

    # 書き出し
    combined.to_parquet(ACTUAL_PATH)
    print(f"[update_actual] updated {ACTUAL_PATH} (rows={len(combined)})")

上記の関数を実行して自動で取得する

requests ライブラリ

今回のデータ取得では、Python の requests というライブラリを使っています。requests は「Python から Web サイトや API にアクセスするための定番ライブラリ」です。
使用方法を簡単に記載します。

①ページを取得する

r = requests.get("https://example.com")
html = r.text
  • requests.get(): URL に HTTP GET リクエストを送る
  • r.text: 文字列としてHTMLを取得
  • r.content:バイト列(CSV・画像など)

② クエリパラメータをつける

params = {
    "latitude": 35.68,
    "longitude": 139.76,
}
r = requests.get(url, params=params)

URL の ?latitude=...&longitude=... を自動生成してくれます。

③ Session を使う理由

with requests.Session() as s:
    r = s.get(url)
  • 同じ接続を再利用(高速・安定)
  • Cookie / Header を共通管理できる
  • 複数リクエストをまとめて扱いやすい
  • ページを取得 → そこから CSV を取得という 連続アクセス では Session が適している

④ エラー処理

r = requests.get(url)
r.raise_for_status()
  • ステータスコードが 200 以外なら例外を投げる
  • サイレントに失敗するのを防げる
  • Open-Meteo 取得部分でこれを使っています

コード

def fetch_demand():
    """当月CSVをDLして前日分とバンド(同月同時刻のmin/max)を用意する"""

    # 昨日の日付起算で当月を取得
    today = datetime.now()
    before_1w = (today - timedelta(days=7)).date()
    yesterday = (today - timedelta(days=1)).date()
    target_ym = before_1w.strftime("%Y%m")
    print(f"Target date: {before_1w} (YM={target_ym})")

    with requests.Session() as s:
        # 当月を探す
        csv_url = _get_month_csv_url(session=s, target_ym=target_ym)
        # csv_url = None
        # 前月を探す
        if not csv_url:
            prev_month_ym = (before_1w - timedelta(days=31)).strftime("%Y%m")
            print(f"{target_ym} is not found.Let's search {prev_month_ym}")
            csv_url = _get_month_csv_url(session=s, target_ym=prev_month_ym)
        if not csv_url:
            print("We could not find prev_month_ym csv")
        
        print("csv_url: ", csv_url)
        response = s.get(csv_url[0], timeout=30)
        response.encoding = "MacRoman"
        content = response.content.decode("MacRoman")  # bytes → str

        # CSV文字列をファイルのように扱う
        df = pd.read_csv(StringIO(content), skiprows=2, header=None)
        df.columns = columns
    
    if df is None:
        print("We could not get csv itself, so update the demand.")
        return 
    
    df["timestamp"] = pd.to_datetime(df["date"] + " " + df["time"])
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["realized_demand"] = df["demand"].astype("float64")
    
    df.drop(columns=["date", "time", "demand"], inplace=True)
    update_actual_last_month(df, today, 8)
    
    # 1週間前から昨日までを抽出
    prev = df[(df["timestamp"].dt.date >= before_1w)&(df["timestamp"].dt.date <= yesterday)].copy().reset_index(drop=True)

    if prev.empty:
        print("[fetch_demand] prev is empty – fallback: use last available week pattern")
        
        if df.empty:
            print("[fetch_demand] df itself is empty – cannot build fallback")
            return
    
        last_ts = df["timestamp"].max()
        last_date = last_ts.date()
        hist_start = last_date - timedelta(days=6)
    
        hist = df[
        (df["timestamp"].dt.date >= hist_start) &
        (df["timestamp"].dt.date <= last_date)
        ].copy()
        
        if hist.empty:
            print("[fetch_demand] hist (last week) is empty – cannot build fallback")
            return
        src_start = hist["timestamp"].dt.date.min()
        
        offset_days = (before_1w - src_start).days
        print(f"[fetch_demand] shifting last week by {offset_days} days | start: {src_start} end: {last_date}")
        
        hist["timestamp"] = hist["timestamp"] + pd.Timedelta(days=offset_days) # 各datetimeにoffset_daysぶんを足す処理

        # 目的の期間だけに絞り直す
        prev = hist[
            (hist["timestamp"].dt.date >= before_1w) &
            (hist["timestamp"].dt.date <= yesterday)
        ].copy()
        prev = hist.reset_index(drop=True)

    print("final_out: \n", prev)

    prev.to_parquet(DEMAND_PATH, index=False)
    print(f"[SAVE] before_1w demand: {DEMAND_PATH}")
    
if __name__ == "__main__":
    fetch_demand()

上記の関数を実行すると以下のoutputが得られます。

  1. 毎月8日:actual.parquet
    ⇒ 2024年2月~前月までの各電源の発電実績
  2. 毎日demand_bf1w_ytd.parquet
    ⇒ 7日前から~昨日までの各電源の発電実績

2. 天気

データ取得元

実績の場合は気象庁 過去の気象データ・ダウンロードから取得しましたが、今回取得したいのは未来の天気予報です。
そのため、気象庁のサイトではなく、別の手段を探さなくてはなりません。
今回は、Open-Meteoというサイトから取得していきます。
Open-Meteo を選んだ理由は以下です。

  • 無料・APIキー不要
  • 過去実績と将来予報を 1回のAPIで取得可能
  • 気温・風・日射量など、電力予測に必要な変数が揃っている

今回はJMA APIから変数名等を確認していきました。
image.png

必要な設定

CACHE_DIR = Path("data/cache")
ACTUAL_PATH = CACHE_DIR / "weather_bf1w_af1w.parquet"

# Open-Meteoから取得する変数
OM_HOURLY_VARS = ",".join([
    "temperature_2m",
    "wind_speed_10m",
    "sunshine_duration",   # 秒/時
    "shortwave_radiation", # W/m^2(時間平均)
    "relative_humidity_2m" # 湿度
])

一時間単位から30分単位に変換

Open-Meteoのデータは1時間単位のデータなので、今回の予測に合わせて30分単位に変換していきます。
日照時間は「1時間あたり何秒日が照っていたか」という量なので、30分に分割する際は単純に 半分ずつ配分 しています。

def _hourly_to_30min(df_hourly: pd.DataFrame) -> pd.DataFrame:
    """
    - 一時間単位→30分単位に変換
    - 温度/風/放射は線形、日照は半分配分で変換
    """
    df = df_hourly.set_index("timestamp").sort_index()

    # ダミー行を追加して resample 範囲を広げる
    last_time = df.index.max()
    dummy_time = last_time + pd.Timedelta(hours=1)
    dummy_row = pd.DataFrame({col: np.nan for col in df.columns}, index=[dummy_time])
    df = pd.concat([df, dummy_row])

    # 線形内挿
    cols_lin = [c for c in ["temperature_2m","wind_speed_10m","shortwave_radiation", "relative_humidity_2m"] if c in df.columns]
    out = df[cols_lin].resample("30min").interpolate("time")

    # 日照(秒/時)→ 30分に変換(半分にする)
    if "sunshine_duration" in df.columns:
        out["sunshine_duration"] = df["sunshine_duration"].resample("30min").interpolate("time") * 0.5

    out.reset_index(drop=False, inplace=True)
    out.rename(columns={"index": "timestamp"}, inplace=True)
    return out

  • 結果
hourly_30:
               timestamp  temperature_2m  wind_speed_10m  shortwave_radiation  relative_humidity_2m  sunshine_duration
0   2025-12-10 00:00:00            6.20            5.00                  0.0                  59.0                0.0
1   2025-12-10 00:30:00            6.00            4.75                  0.0                  60.0                0.0
2   2025-12-10 01:00:00            5.80            4.50                  0.0                  61.0                0.0
3   2025-12-10 01:30:00            5.55            4.50                  0.0                  62.0                0.0
4   2025-12-10 02:00:00            5.30            4.50                  0.0                  63.0                0.0
..                  ...             ...             ...                  ...                   ...                ...
668 2025-12-23 22:00:00            6.40            4.90                  0.0                  56.0                0.0
669 2025-12-23 22:30:00            6.30            4.80                  0.0                  57.0                0.0
670 2025-12-23 23:00:00            6.20            4.70                  0.0                  58.0                0.0
671 2025-12-23 23:30:00            6.20            4.70                  0.0                  58.0                0.0
672 2025-12-24 00:00:00            6.20            4.70                  0.0                  58.0                0.0

データ取得

def _fetch_openmeteo_hourly(lat: float, lon: float, tz: str,
                            past_days: int, forecast_days: int) -> pd.DataFrame:
    """
    Open-MeteoのForecast APIを1回叩いて、直近の実績+先の予報をまとめて取得(一時間単位)
    """
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        "latitude": lat,
        "longitude": lon,
        "timezone": tz,           # 返却時刻をJSTにする
        "hourly": OM_HOURLY_VARS,
        "past_days": past_days,   # 直近の実績
        "forecast_days": forecast_days,  # 先の予報(日数)
    }
    r = requests.get(url, params=params, timeout=45)
    r.raise_for_status()
    j = r.json()
    h = j.get("hourly", {})
    if not h or "time" not in h:
        raise RuntimeError("Open-Meteo hourly is empty")
    df = pd.DataFrame(h)
    df.rename(columns={"time": "timestamp"}, inplace=True)
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    for c in ["temperature_2m","wind_speed_10m","sunshine_duration","shortwave_radiation", "relative_humidity_2m"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df.sort_values("timestamp").reset_index(drop=True)

API 通信では

  • ネットワークエラー
  • サーバーエラー

が起こり得るため、raise_for_status()で失敗を即検知します。

データ整形

データを取得した後は、実績部分と予測部分がどこなのかを分かりやすくするために、フラグをつけていきます。

def build_openmeteo_daily(lat: float, lon: float, tz: str = "Asia/Tokyo",
                          past_days: int = 1, forecast_days: int = 7):
    """
    - 今日00:00を境に「実績相当」と「予報」に分割
    - 各々30分化して保存
    """
    today = _today_jst()
    cut = pd.Timestamp(f"{today} 00:00:00")  # JST基準(tzで返ってくる)

    hourly = _fetch_openmeteo_hourly(lat, lon, tz, past_days, forecast_days)
    print("hourly: \n", hourly)
    # 30分化
    hourly_30 = _hourly_to_30min(hourly)
    print("hourly_30: \n", hourly_30)
    
    # 実績(過去)と予報に分割
    actual_30 = hourly_30[hourly_30["timestamp"] < cut].copy()
    fcst_30   = hourly_30[hourly_30["timestamp"] >= cut].copy()

    # ラベル付け
    actual_30["source"] = "Open-Meteo"
    actual_30["is_forecast"] = 0
    fcst_30["source"] = "Open-Meteo"
    fcst_30["is_forecast"] = 1

    # 欠損がある場合は前方補完
    for df in (actual_30, fcst_30):
        for c in ["temperature_2m","wind_speed_10m","shortwave_radiation","sunshine_duration", "relative_humidity_2m"]:
            if c in df.columns:
                df[c] = df[c].fillna(method="ffill", limit=2)

    df = pd.concat([actual_30, fcst_30], ignore_index=True).sort_values("timestamp")
    df = df.sort_values(["timestamp","is_forecast"])
    df = df.drop_duplicates(subset=["timestamp"], keep="last").reset_index(drop=True)
    df.rename(columns={"temperature_2m": "temperature", "wind_speed_10m": "wind_speed", "relative_humidity_2m":"humidity"}, inplace=True)
    print(df.columns)
    df = df.iloc[:-1, :]
    print("final_df: \n", df)
    print(df["timestamp"].diff().value_counts())
    df.to_parquet(ACTUAL_PATH)
    print(f"[OK] weather unified: {ACTUAL_PATH}")

lat, lon = 35.6812, 139.7671は東京の緯度・経度です。
また、実績と予報を同一テーブルに入れておくことで、

  • 学習時:実績のみ使用
  • 本番:未来のみ使用

同じ処理フローを保ったまま切り替え できます。

def fetch_weather():

    # Open-Meteo:1日1回で“実績相当+7日予報”を取得 → 30分化 → 保存
    lat, lon = 35.6812, 139.7671
    build_openmeteo_daily(
        lat=lat, lon=lon, tz="Asia/Tokyo",
        past_days=7, forecast_days=7
    )
        
if __name__ == "__main__":
    fetch_weather()

上記の関数を実行すると以下のoutputが得られます。

  1. weather_bf1w_af1w.parquet
    ⇒ 7日前から7日後までの天気予報

まとめ

今日はダッシュボードの 入力データ部分 を整備しました。

  • 電力需要

    • TEPCO の公開 CSV を自動取得
    • フォーマット揺れにも耐える実装
    • 最適化用の長期履歴も同時に更新
  • 天気予報

    • Open-Meteo API を利用
    • 実績+7日予報を一括取得
    • 30分粒度に変換して保存

これで「毎朝、最新データが自動で揃う状態」 が完成しました!

明日

明日は燃料と為替データを自動で取得する方法を記載していきます!:fist_tone1:

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?