1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【ダッシュボード】データ自動取得(燃料・為替)

Last updated at Posted at 2025-12-18

この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの十九日目です
Day1はこちら | 全体構成を見る

今日は、ダッシュボードの土台となる「毎日自動で更新される入力データ」 のうち「外生価格データ」を取得します。※前回は電力需要と天気を取得しました

  • ドル円
  • 石油価格
  • ガス価格
  • 石炭価格(豪州)

これらの価格は電力価格の予測の際にとても大きな影響を及ぼします。詳しくは【電力価格予測】EDA(価格、燃料・為替、日付)で記載しています。

共通関数

4つのデータをまとめて取得するので共通することも多いです。
以下の関数でまとめていきました。

URLからデータを取得する
def _req(url, params=None, timeout=30, max_retry=3):
    """
    - URLからparamsで指定したデータを取得する
    """
    
    last = None
    for i in range(max_retry):
        try:
            r = requests.get(url, params=params, timeout=timeout)
            if r.status_code == 200:
                return r
            last = f"HTTP {r.status_code}: {r.text[:200]}"
        except Exception as e:
            last = str(e)
        time.sleep(1.5 * (i + 1))
    raise RuntimeError(f"Request failed: {url} ({last})")
日付の範囲を指定する
def _clip(df, date_col, start, end):
    """dfの日次範囲を指定する"""
    m = (df[date_col] >= pd.to_datetime(start)) & (df[date_col] <= pd.to_datetime(end))
    return df.loc[m].copy()
csvを保存する
def _save_cache(df, path):
    """キャッシュとして指定したpathにcsvを保存する"""
    os.makedirs(os.path.dirname(path), exist_ok=True)
    df.to_csv(path, index=False)
    print(f"[SAVE] df at {path}")
csvを読み込む
def _load_cache(path, date_col, start, end, source_label="cache"):
    """キャッシュとして保存したcsvを読み込む"""
    if not os.path.exists(path):
        raise RuntimeError("no cache")
    df = pd.read_csv(path)
    df[date_col] = pd.to_datetime(df[date_col])
    df = _clip(df, date_col, start, end)
    if df.empty:
        raise RuntimeError("empty cache after clip")
    df["source"] = source_label
    
    print(f"[LOAD] df from {path}")
    return df
日本時間の今日の日付を指定する
def _today_jst():
    JST = timezone(timedelta(hours=9))
    return datetime.now(JST).date()
取得期間を指定
def _range_to_today(days_back: int=10):
    """
    取得期間として、今日を基準に直近days_back日分を取る
    """
    t = _today_jst()
    start = t - pd.Timedelta(days=days_back)
    end   = t  # APIには「今日まで」で投げる(実際には最新営業日まで返ってくればOK)
    return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")

1. ドル円(USD/JPY)

データ取得元

取得コード

def fetch_fx_usdjpy(start, end):
    """
    - 以下の優先度で為替価格を取得する
        - Frankfurter(APIキー不要)から為替価格を取得する
        - yfinanceから取得する
        - キャッシュを使用する
    """
    try:
        url = f"https://api.frankfurter.app/{start}..{end}"
        j = _req(url, params={"from": "USD", "to": "JPY"}).json()
        df = (pd.DataFrame(j["rates"]).T
                .rename_axis("date").reset_index())
        df["date"] = pd.to_datetime(df["date"])
        df["USDJPY"] = df["JPY"].astype(float)
        out = df[["date", "USDJPY"]].sort_values("date").reset_index(drop=True)
        if start:
            out = out[out["date"] >= pd.to_datetime(start)]
        out["source"] = "frankfurter"
        _save_cache(out, FX_CACHE)
        return out
    except Exception as e_fr:
        last_err = f"frankfurter failed: {e_fr}"

    # yfinance の通貨ペア(USD/JPY は "JPY=X")
    try:
        import yfinance as yf
        tkr = yf.Ticker("JPY=X")
        hist = tkr.history(start=start, end=end, interval="1d", actions=False, auto_adjust=False)
        if hist is not None and not hist.empty:
            out = (hist.reset_index()
                    .rename(columns={"Date":"date","Close":"USDJPY"})
                    [["date","USDJPY"]].dropna().sort_values("date"))
            out["source"] = "yfinance"
            _save_cache(out, FX_CACHE)
            return out
        last_err = "yfinance empty"
    except Exception as e_yf:
        last_err = f"yfinance failed: {e_yf}"

    # cache
    return _load_cache(FX_CACHE, "date", start, end, "cache")
  • URL構造
https://api.frankfurter.app/{start}..{end}?from=USD&to=JPY
  • 返却JSONの形
{
  "rates": {
    "2025-01-01": { "JPY": 145.2 },
    "2025-01-02": { "JPY": 146.1 }
  }
}

⇒ JSONを.json()でdictに変換

  • キャッシュは最終手段
df["source"] = source_label

⇒ あとでAPI由来か、yfinance由来か、cacheかをデバッグする

2. 石油価格

データ取得元

APIの設定

EIAのAPIを使う際にはAPI keyを使用します。
https://www.eia.gov/opendata/register.php

ローカルで行う場合には環境変数の設定が必要です。

echo 'export EIA_API_KEY=<取得したkeyを記載>' >> ~/.bashrc # 永続化
source ~/.bashrc
  • EIA APIの基本構造
    • PET : Petroleum
    • RBRTE : Europe Brent
    • D : Daily
https://api.eia.gov/v2/seriesid/PET.RBRTE.D
  • 返却JSON
{
  "response": {
    "data": [
      { "date": "2025-01-01", "value": 78.5 },
      ...
    ]
  }
}
data = j.get("response", {}).get("data", [])

JSONの構造をgetで取得していきます。

  • yfinance
    原油の 直近限月先物 で、スポットとは厳密には違いますが、トレンド把握には十分とみなして取得します。
tkr = yf.Ticker("BZ=F")

取得コード

def fetch_brent_daily(start, end, api_key=None):
    """
    - 以下の優先度で原油価格を取得する
        - EIA Europe Brent Spot Price FOB (Daily)(APIキー必要)から為替価格を取得する
        - yfinanceから取得する
        - キャッシュを使用する
    """

    try:
        if api_key is None:
            api_key = os.getenv("EIA_API_KEY", "PUT_YOUR_API_KEY")

        if not api_key or api_key == "PUT_YOUR_API_KEY":
            raise RuntimeError("EIA_API_KEY not set")

        # v2 の seriesid 互換ルート
        url = "https://api.eia.gov/v2/seriesid/PET.RBRTE.D"
        params = {"api_key": api_key}
        if start:
            params["start"] = pd.to_datetime(start).strftime("%Y-%m-%d")
        if end:
            params["end"]   = pd.to_datetime(end).strftime("%Y-%m-%d")

        j = _req(url, params=params).json()
        
        if "error" in j:
            raise RuntimeError(j["error"])
        data = j.get("response", {}).get("data", [])
        if not data:
            raise RuntimeError("EIA empty data")
        df = pd.DataFrame(data)
        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
        elif "period" in df.columns:
            df["date"] = pd.to_datetime(df["period"])
        else:
            raise RuntimeError("date/period missing")
        val_col = "value" if "value" in df.columns else df.columns.difference(["date","period"]).tolist()[0]
        out = (df.rename(columns={val_col: "brent"})
                [["date","brent"]].sort_values("date").reset_index(drop=True))
        if start:
            out = out[out["date"] >= pd.to_datetime(start)]
            out.reset_index(drop=True, inplace=True)

        out["source"] = "EIA"
        _save_cache(out, BRENT_CACHE)
        return out
    except Exception as e_eia:
        last_err = f"EIA failed: {e_eia}"
        print(last_err)

    # yfinance:ICE Brent front-month futures
    try:
        import yfinance as yf
        tkr = yf.Ticker("BZ=F")  # ICE Brent front-month
        hist = tkr.history(start=start, end=end, interval="1d", actions=False, auto_adjust=False)
        if hist is not None and not hist.empty:
            out = (hist.reset_index()
                    .rename(columns={"Date": "date", "Close": "brent"})
                    [["date", "brent"]]
                    .dropna()
                    .sort_values("date")
                    .reset_index(drop=True))
            out["source"] = "yfinance"
            _save_cache(out, BRENT_CACHE)
            return out
        last_err = "yfinance empty"
    except Exception as e_yf:
        last_err = f"yfinance failed: {e_yf}"

    # cache
    return _load_cache(BRENT_CACHE, "date", start, end, "cache")

3. ガス価格

データ取得元

取得コード

def fetch_gas_henryhub_daily(start, end, api_key=None):
    """
    - 以下の優先度で天然ガス価格を取得する
        - EIA Henry Hub spot daily(APIキー必要)から為替価格を取得する
        - yfinanceから取得する
        - キャッシュを使用する
    """
    try:
        if api_key is None:
            api_key = os.getenv("EIA_API_KEY", "PUT_YOUR_API_KEY")

        if not api_key or api_key == "PUT_YOUR_API_KEY":
            raise RuntimeError("EIA_API_KEY not set")

        url = "https://api.eia.gov/v2/seriesid/NG.RNGWHHD.D"

        params = {"api_key": api_key}
        if start:
            params["start"] = pd.to_datetime(start).strftime("%Y-%m-%d")
        if end:
            params["end"]   = pd.to_datetime(end).strftime("%Y-%m-%d")
        j = _req(url, params=params).json()
        
        if "error" in j:
            raise RuntimeError(j["error"])

        data = j.get("response", {}).get("data", [])
        if not data:
            raise RuntimeError("EIA empty data")
        
        df = pd.DataFrame(data)
        if "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
        elif "period" in df.columns:
            df["date"] = pd.to_datetime(df["period"])
        else:
            raise RuntimeError("date/period missing")
        
        val_col = "value" if "value" in df.columns else df.columns.difference(["date","period"]).tolist()[0]
        out = (df.rename(columns={val_col: "henry_hub"})
                [["date","henry_hub"]].sort_values("date").reset_index(drop=True))
        if start:
            out = out[out["date"] >= pd.to_datetime(start)]
            out.reset_index(drop=True, inplace=True)
            
        out["source"] = "EIA"
        _save_cache(out, GAS_CACHE)
        return out
    except Exception as e_eia:
        last_err = f"EIA failed: {e_eia}"
        print(last_err)

    # yfinance:NYMEX Natural Gas futures
    try:
        import yfinance as yf

        tkr = yf.Ticker("NG=F")  # NYMEX NatGas front-month
        hist = tkr.history(start=start, end=end, interval="1d", actions=False, auto_adjust=False)
        if hist is not None and not hist.empty:
            out = (hist.reset_index()
                    .rename(columns={"Date": "date", "Close": "henry_hub"})
                    [["date", "henry_hub"]]
                    .dropna()
                    .sort_values("date")
                    .reset_index(drop=True))
            out["source"] = "yfinance"
            _save_cache(out, GAS_CACHE)
            return out
        last_err = "yfinance empty"
    except Exception as e_yf:
        last_err = f"yfinance failed: {e_yf}"

    # cache
    return _load_cache(GAS_CACHE, "date", start, end, "cache")

4. 石炭価格

データ取得元

  • 本命: World Bank
    こちらの Pink Sheet Data 列の Monthly prices をダウンロードしました。
    image.png

以下のようなエクセルがダウンロードされます。
1960年からのデータとなっており他国の炭の記載もありますが、ここから今回は豪州炭のみを使用します。
image.png

  • サブ2: キャッシュを使用

取得コード

def fetch_coal_worldbank_monthly(start, end):
    """
    - 以下の優先度で石炭価格を取得する
        - Pink Sheet: Monthly Prices(APIキー不要)から為替価格を取得する
        - キャッシュを使用する
    """
    try: 
        url = ("https://thedocs.worldbank.org/en/doc/18675f1d1639c7a34d463f59263ba0a2-0050012025/related/CMO-Historical-Data-Monthly.xlsx")
        r = _req(url)
        wb = pd.read_excel(io.BytesIO(r.content), sheet_name="Monthly Prices", header=None)
        code_row, coal_col = None, None
        for r in range(0, 20):  # 先頭20行程だけ読み込む
            row_vals = wb.iloc[r].astype(str).tolist()
            if "Coal, Australian" in row_vals:
                code_row = r
                coal_col = row_vals.index("Coal, Australian")
                break
        if coal_col is None:
            raise RuntimeError("Coal, Australian column is not found.")

        start_row = code_row + 2 # データの始まり行

        # 月列(先頭列)+ 豪州炭列を抽出
        coal = wb.iloc[start_row:, [0, coal_col]].copy()
        coal.columns = ["month_raw", "coal_aus"]
        
        # '1960M01' → Timestamp(1960-01-01) へ
        def parse_mon(s):
            if pd.isna(s):
                return pd.NaT
            s = str(s).strip()
            m = re.match(r"^\s*(\d{4})[Mm](\d{1,2})(?:\D.*)?$", s)  # 後ろに注記が付いてもOK
            if m:
                y = int(m.group(1)); mm = int(m.group(2))
                if 1 <= mm <= 12:
                    return pd.Timestamp(y, mm, 1)
            return pd.to_datetime(s, errors="coerce")

        coal["month"] = coal["month_raw"].map(parse_mon)
        coal = coal.dropna(subset=["month"]).drop(columns=["month_raw"])

        # 数値化&整形
        coal["coal_aus"] = pd.to_numeric(coal["coal_aus"], errors="coerce")
        coal = coal.dropna(subset=["coal_aus"]).sort_values("month").reset_index(drop=True)

        today = _today_jst()
        today_ts = pd.to_datetime(today)
        today_period = today_ts.to_period("M")
        prev_month_start = (today_period - 1).to_timestamp()
        print("last_month: ", prev_month_start)
        coal_filtered = coal[coal["month"] >= prev_month_start]
        
        if coal_filtered.empty:
            # monthでソートして末尾1行
            coal = coal.sort_values("month")
            coal_filtered = coal.tail(1)

        coal_filtered["source"] = "worldbank"
        coal_filtered = coal_filtered[["month", "coal_aus", "source"]]
        _save_cache(coal_filtered, COAL_M_CACHE)
        return coal_filtered

    except Exception as e_wb:
        print(f"World Bank failed: {e_wb}")
        
    return _load_cache(COAL_M_CACHE, "month", start, end, "cache")

月次⇒日次へ変換

取得したエクセルは月ごとのデータになっているので、日ごとのデータに変換します。
やり方はreindexを使えば簡単にできます。

def coal_monthly_to_daily_ffill(coal_m_df, start, end):
    """
    - 月次価格を日次価格にする
    """
    daily_idx = pd.date_range(start, end, freq="D")
    coal_d = (coal_m_df.set_index("month")
            .reindex(daily_idx, method="ffill")
            .rename_axis("date").reset_index())
    coal_d = coal_d[["date","coal_aus"]]
    coal_d["source"] = coal_m_df["source"].iloc[-1] if not coal_m_df.empty else "cache"
    return coal_d

結合

最後に取得したデータを結合して完成です。

def expand_daily_to_30min(df_daily, value_cols):
    """
    - 30分間隔のデータに変換する
    """
    expanded = []
    for _, row in df_daily.iterrows():
        for i in range(48):
            dt = pd.to_datetime(row["timestamp"]) + pd.Timedelta(minutes=30 * i)
            entry = {"timestamp": dt}
            for col in value_cols:
                entry[col] = row[col]
            expanded.append(entry)
    return pd.DataFrame(expanded)
def build_feature_table(start, end):
    """
    - すべてdfを統合する
    """
    fx    = fetch_fx_usdjpy(start, end)                     # USDJPY
    print("fx: \n", fx.head())
    brent = fetch_brent_daily(start, end, None)      # brent(スポットor先物)
    print("brent: \n", brent.head())
    gas   = fetch_gas_henryhub_daily(start, end, None)
    print("gas: \n", gas.head())
    coalm = fetch_coal_worldbank_monthly(start, end)
    print("coalm: \n", coalm.head())
    coal  = coal_monthly_to_daily_ffill(coalm, start, end)
    print("coal: \n", coal.head())

    for d in (fx, brent, gas, coal):
        d["date"] = pd.to_datetime(d["date"]).dt.date

    base = pd.DataFrame({
    "date": pd.date_range(start, end, freq="D")})
    base["date"] = pd.to_datetime(base["date"]).dt.date
    
    df = (base.merge(fx[["date","USDJPY"]], on="date", how="left")
               .merge(brent[["date","brent"]], on="date", how="left")
               .merge(gas[["date","henry_hub"]], on="date", how="left")
               .merge(coal[["date","coal_aus"]], on="date", how="left"))

    # 補完(前方→後方)
    df = df.sort_values("date")
    df.rename(columns={"date": "timestamp"}, inplace=True)
    for col in ["USDJPY","brent","henry_hub","coal_aus"]:
        if col in df.columns:
            df[col] = df[col].ffill().bfill()
            
    print("df(commodities combined): \n", df)
    
    _save_cache(df, ALL_CACHE)
    latest_df = df.tail(8).reset_index(drop=True)
    latest_df["timestamp"] = pd.to_datetime(latest_df["timestamp"], errors="coerce")
    print("latent_df: \n", latest_df)
    
    latest_df.to_parquet(COM_PATH_DAY)
    print(f"[OK] market unified: {COM_PATH_DAY}")
    

    extended_market = expand_daily_to_30min(latest_df, ["USDJPY", "brent", "henry_hub", "coal_aus"])
    last_row = extended_market.iloc[-1].copy()
    # 開始時刻(最後の行の日時)
    start_time = last_row["timestamp"]
    # 新しい行を格納するリスト
    new_rows = []
    # 30分刻みで6日分(48 × 6 = 336回)
    for i in range(1, 48 * 6 + 1):
        new_row = last_row.copy()
        new_row["timestamp"] = start_time + pd.Timedelta(minutes=30 * i)
        new_rows.append(new_row)

    # DataFrame化して結合
    df_future = pd.DataFrame(new_rows)
    extended_market = pd.concat([extended_market, df_future], ignore_index=True)
    print("market_extended: \n", extended_market)
    
    extended_market.to_parquet(COM_PATH_AF1W)
    print(f"[OK] extended_market unified: {COM_PATH_AF1W}")

def fetch_market():
    
        # 今日までのデータを取得する
        start, end = _range_to_today(days_back=10)
        build_feature_table(start, end)

if __name__ == "__main__":
    fetch_market()

上記の関数を実行すると以下のデータが取得できます

  • 燃料と為替のcache(csv)
  • 上記のcacheを結合したデータ(csv)
  • 当日だけに絞ったデータ(parquet)
  • 今日から7日後までの日付と結合したデータ(parquet)

まとめ

今日は以下のデータをすべて自動で取得しました。

  • 為替(USD/JPY)
  • 原油(Brent)
  • 天然ガス(Henry Hub)
  • 石炭(豪州炭)

API → 代替 → キャッシュ の三段構えを採用し、ダッシュボードが止まらない設計にしています。

明日

明日は電力価格を取得してきます!:fist_tone1:

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?