この記事は「【リアルタイム電力予測】需要・価格・電源最適化ダッシュボード構築記」シリーズの23日目です
Day1はこちら | 全体構成を見る
今日は、これまで作ってきた 需要予測・価格予測・最適化の結果(グラフ)を、Streamlit で可視化していきます。
※ Hugging Face Spaces へのアップ方法は Day25 で扱います
また、これからの3日間については以下の流れで進めます。
| Day | 内容 | 技術 |
|---|---|---|
| Day23 | アプリの中身 | Streamlit / Plotly |
| Day24 | 自動更新の仕組み | GitHub Actions |
| Day25 | 公開と総まとめ | Hugging Face Spaces + 振り返り |
目的は、「予測して終わり」ではなく 日々更新される予測結果を すぐ理解できる形 で表示することです。
- 需要:実績 + 予測を同じ軸で表示して、予測の当たり具合と今後の見通しを確認できる
- 価格:実績 + 予測のリスクレンジ(分位点)で、上振れ下振れを確認できる
- 最適化:電源別の供給量(積み上げ)で、どの電源が主役になっているか見える
Streamlitの設計方針
今回の設計で意識したポイントを先にまとめます。
1) アプリは「計算しない」:読むだけに寄せる
Streamlit はアクセスのたびにスクリプトが走るため、アプリ内で予測や最適化を回すと、重くて時間がかかりやすいです。
そこで今回は、
- 予測・最適化:別のバッチ(Day24のGitHub Actions)で毎日実行
- Streamlit:
data/cache/*.parquetを読むだけ
という 「読むだけアプリ」 にしています。
この設計にすると、UIが軽くなり、更新も安定します。
2) 責務分離:読み込み / 描画 / 画面構成を分ける
アプリが大きくなったときに崩れるのは「何でもmainに書いてしまう」パターンです。そこで、
-
load_*:データ読み込み(I/O) -
plot_*:DataFrame → Plotly Figure(描画) -
main():タブ、見出し、表示順(UI)
の3層に分けています。
3) キャッシュ戦略:metadata.json を 更新検知 にする
st.cache_data は 引数が同じなら前回結果を返す仕組みです。
そこで、metadata.json の last_updated_jst を version として使い、
- version が変わったら → parquetを再読み込み
- 変わらなければ → キャッシュを使う
という設計にします。
これで「毎日更新されるcache」をUI側が自然に追従できます。
モデルを実行して予測値を保存
可視化の前に、まずは data/cache に必要なデータが揃っている必要があります。
以下の関数を順に実行すると、可視化に必要なファイルが生成されます。
また、最後に metadata.json を出力します。
ここに書く last_updated_jst が、Streamlit側で更新を検知するキーになります。
上記をまとめて実行
CACHE_DIR = Path("data/cache")
def main():
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# ========== 1. データ取得 ==========
print("\n===== weather =====\n")
fetch_weather()
print("\n===== market =====\n")
fetch_market()
print("\n===== price =====\n")
fetch_price()
print("\n===== demand =====\n")
fetch_demand()
# ========== 2. 需要予測 ==========
print("\n===== predict demand =====\n")
predict_demand()
# ========== 3. 価格予測 ==========
print("\n===== predict price =====\n")
predict_price()
# ========== 4. 電源構成最適化 ==========
print("\n===== optimize =====\n")
optimize_dispatch()
# ========== 5. メタ情報(更新時刻+出典) ==========
meta = {
"last_updated_jst": datetime.now(timezone(timedelta(hours=9))).isoformat(),
"sources": {
"weather": "Open-Meteo JMA API",
"markets": {
"fx": "Frankfurter (ECB)",
"oil": "EIA / yfinance BZ=F",
"coal": "World Bank Pink Sheet",
"lng": "EIA / yfinance NG=F",
},
"jepx": "JEPX 日前スポット 東京エリア",
"demand_model": "自作需要予測モデル",
"price_model": "自作価格予測モデル",
"optimizer": "自作数理最適化モデル",
},
}
meta_path = CACHE_DIR / "metadata.json"
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
print("Daily pipeline finished.")
print(f"- meta: {meta_path}")
if __name__ == "__main__":
main()
基本設定
ここから app.py 側の話です。ポイントは「どこからデータを読むか」を環境で切り替えることです。
- ローカル:
data/cache/*.parquetをそのまま読む - 公開環境:GitHub上の
cacheを raw URL で読む
この切り替えを RUN_ENV で行います(環境変数の設定はDay25で扱います)。
パスの設定
CACHE_DIR = Path("data/cache")
STATIC_DIR = Path("data/static")
JST = pytz.timezone("Asia/Tokyo")
RUN_ENV = os.getenv("RUN_ENV", "local")
GITHUB_RAW_BASE = "https://raw.githubusercontent.com/nakamin/portfolio8/main"
HERO_GITHUB_URL = (
"https://raw.githubusercontent.com/nakamin/portfolio8/main/data/static/hero.png"
)
st.set_page_config(
layout="wide",
)
- Hugging Face Spacesではparquetのようなバイナリファイルを置くことができないので、モデルは、GitHub 上の parquet を読むようにします
- GitHub上のファイルの指定方法は以下です
- https://raw.githubusercontent.com/<ユーザ>/<レポジトリ>/<ブランチ>/<パス>
parquetの読み込み
parquetはテキストではなくバイナリなので、GitHubから読むときは requests で bytes を受け取って BytesIO に包んでから pd.read_parquet します。
また、HTTPエラーを握りつぶさないために raise_for_status() を入れています(404を気づける)。
def _read_parquet_from_github(rel_path: str) -> pd.DataFrame:
"""
- GitHub の raw URL から parquet を読み込む
- rel_path はレポジトリ root からの相対パス
"""
url = f"{GITHUB_RAW_BASE}/{rel_path}"
resp = requests.get(url)
resp.raise_for_status()
df = pd.read_parquet(io.BytesIO(resp.content))
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def _read_parquet_local(path: Path) -> pd.DataFrame:
"""
- ローカル or Actions で、ファイルシステム上の parquet を読む
"""
df = pd.read_parquet(path)
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
トップ画像
Streamlitは再実行が頻繁に起きるため、毎回parquetを読み直すと無駄が増えます。
そこで @st.cache_data を使って読み込み結果をキャッシュします。
@st.cache_data(show_spinner=False)
def show_hero():
_, col, _ = st.columns([1, 20, 1]) # 真ん中のカラムを広めに
with col:
if RUN_ENV == "hf":
# HF では GitHub の画像を直接表示
st.image(HERO_GITHUB_URL)
else:
hero_path = STATIC_DIR / "hero.png"
if hero_path.exists():
st.image(str(hero_path))
- widthを使った自動調整を何度も試しましたが、ずっとカクカクする挙動を修正することができず、
st.columns([1, 20, 1])として3つのカラムの真ん中に置くという方法にしました - トップ画像は横長のものを一枚作成し、GitHubにアップしたものをSpacesが読み込むようにしています ※ローカル上で実験していたので実行環境がHugging Face SpacesかLocalでパスの場所を変えるようにしています
Spaces上でparquetを読み込む
st.cache_data は「引数が同じならキャッシュを返す」ので、version_key に last_updated_jst を渡すと、更新された瞬間に version_key が変わり、再読み込みされます。
@st.cache_data(show_spinner=False)
def load_parquet(name: str, version_key: str | None = None) -> pd.DataFrame | None:
"""
name: 'demand_forecast' のようなベース名
- local: data/cache/{name}.parquet
- hf: GitHub raw の data/cache/{name}.parquet
- version_key: これが変わるとキャッシュが無効化されて再読込される
"""
try:
if RUN_ENV == "hf":
rel_path = f"data/cache/{name}.parquet"
df = _read_parquet_from_github(rel_path)
else:
path = CACHE_DIR / f"{name}.parquet"
df = _read_parquet_local(path)
return df
except Exception as e:
st.error(f"{name}.parquet を読み込めませんでした: {e}")
return None
metadataの読み込み
metadata.json は
- 最終更新時刻(画面上部で表示)
- データソース(「モデル説明」「パイプライン説明」タブで表示)
- キャッシュ更新検知(last_updated_jst)
という3役を担います。
def load_metadata() -> dict:
"""
メタ情報(sources, last_updated など)を取得
- local 環境: data/cache/metadata.json を読む
- hf 環境: GitHub raw の data/cache/metadata.json を読む
"""
if RUN_ENV == "hf":
rel_path = "data/cache/metadata.json"
url = f"{GITHUB_RAW_BASE}/{rel_path}"
resp = requests.get(url)
if resp.status_code != 200:
# 初回など、まだ metadata.json が無いとき
return {}
try:
return resp.json()
except ValueError:
# 念のため、JSONとして読めないときは空にする
return {}
else:
path = CACHE_DIR / "metadata.json"
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
画面設計
今回は「ダッシュボードだけでなく説明も載せたい」ので、タブ構成にしました。
- ダッシュボード:結論(予測・最適化の可視化)
- モデル説明:根拠(どんな特徴量・モデルか)
- パイプライン説明:運用(どの処理で何が作られるか)
- お問い合わせ:改善の導線
タブで役割を分けると、初見ユーザーが迷わず見たい場所へ行けます。
def main():
tab_dashboard, tab_model, tab_pipeline, contact = st.tabs(["ダッシュボード", "モデル説明", "パイプライン説明", "お問い合わせ"])
with tab_dashboard:
"""可視化コードを記載"""
with tab_model:
"""モデル説明コードを記載"""
with tab_pipeline:
"""パイプライン説明コードを記載"""
with contact:
"""お問い合わせに関するコードを記載"""
if __name__ == "__main__":
try:
main()
except Exception as e:
st.error("アプリ実行中にエラーが発生しました")
st.code(traceback.format_exc())
可視化(ダッシュボードタブ)
以降の可視化は、すべて同じパターンで実装します。
-
load_parquet()でデータ取得 -
plot_*()で Figure を作る(描画責務を分離) -
st.plotly_chart()で表示
1. 電力需要(実績 + 予測)
需要は「過去(実績)」と「未来(予測)」が連続して見えることが重要なので、同じ軸で表示します。
plot_style関数(全グラフ共通)
# テーマに合わせた色
PLOT_BG_COLOR = "#0b1633" # グラフ内の背景(今より少し明るい紺)
PAPER_BG_COLOR = "#020922" # グラフ外側(アプリ背景と同じでOK)
GRID_MINOR_COLOR = "#2c3f66" # グリッド
GRID_COLOR = "#4e6691"
TEXT_COLOR = "#f5f5f5" # 文字色
TEXT_MINOR_COLOR = "#9eb3d4"
def style_figure(
fig,
x_title: str | None = None,
y_title: str | None = None,
x_dtick=None,
x_minor_dtick=None,
y_dtick=None,
):
"""
Plotly 図に共通のスタイルを当てる。
- 背景色
- グリッド色
- フォントサイズ
- 軸タイトル
- 目盛り間隔 (dtick)
- 凡例
"""
fig.update_layout(
paper_bgcolor=PAPER_BG_COLOR,
plot_bgcolor=PLOT_BG_COLOR,
font=dict(color=TEXT_COLOR, size=14), # 全体のデフォルト文字サイズ
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0),
hoverlabel=dict(font_size=13),
)
fig.update_xaxes(
showgrid=True,
gridcolor=GRID_MINOR_COLOR,
gridwidth=1,
dtick=x_minor_dtick,
zeroline=False,
tickfont=dict(size=11, color=TEXT_MINOR_COLOR), # x 軸の目盛りの文字
title_font=dict(size=16), # x 軸タイトル
tickangle=0,
tickformat="%m/%d\n%H:%M",
)
fig.update_yaxes(
showgrid=True,
gridcolor=GRID_MINOR_COLOR,
zeroline=False,
tickfont=dict(size=12), # y 軸の目盛りの文字
title_font=dict(size=16), # y 軸タイトル
)
if x_title is not None:
fig.update_xaxes(title_text=x_title)
if y_title is not None:
fig.update_yaxes(title_text=y_title)
# 補助目盛
if x_minor_dtick is not None:
fig.update_xaxes(
minor=dict(
dtick=x_minor_dtick,
gridcolor=GRID_MINOR_COLOR,
gridwidth=1,
showgrid=True
)
)
if x_dtick is not None:
fig.update_xaxes(
dtick=x_dtick, # 24時間おき
gridcolor=GRID_COLOR, # 00:00用の明るい色
gridwidth=2, # ここで線を太くする
showgrid=True
)
if y_dtick is not None:
fig.update_yaxes(dtick=y_dtick)
return fig
plot_demand関数
import plotly.graph_objects as go
from visualize.style_figure import style_figure
def plot_demand(demand, now_floor):
fig_d = go.Figure()
if "realized_demand" in demand.columns:
fig_d.add_trace(
go.Scatter(
x=demand["timestamp"],
y=demand["realized_demand"],
mode="lines",
name="需要実績",
line=dict(width=2.5, color="#4FC3F7"), # 明るい水色
)
)
if "predicted_demand" in demand.columns:
fig_d.add_trace(
go.Scatter(
x=demand["timestamp"],
y=demand["predicted_demand"],
mode="lines",
name="需要予測",
line=dict(width=2, color="#FFCA28", dash="dot"), # 点線
)
)
# 現在時刻の縦線
fig_d.add_shape(
type="line",
x0=now_floor, x1=now_floor,
y0=0, y1=1,
xref="x", yref="paper",
line=dict(color="red", dash="dot") # 赤色+破線
)
# 注釈を追加して「Now」と表示
fig_d.add_annotation(
x=now_floor,
y=1.05, # yref="paper" の上端に近い位置
xref="x", yref="paper",
text="Now",
showarrow=False,
font=dict(color="red", size=12),
align="center"
)
six_hours_ms = 6 * 60 * 60 * 1000 # 6時間ごとに縦グリッド
one_day_ms = 24 * 60 * 60 * 1000 # 1日ごと
fig_d = style_figure(
fig_d,
x_title="日時",
y_title="電力需要 [MW]",
x_dtick=one_day_ms,
x_minor_dtick=six_hours_ms, # グリッドを細かく
y_dtick=2000,
)
fig_d.update_traces(
# ホバーのヘッダー部分(時間)の形式を指定
hovertemplate="<b>%{x|%Y/%m/%d %H:%M}</b><br>" +
"%{fullData.name}: %{y:,.0f} MW<extra></extra>"
)
return fig_d
上記のplot_demand関数を読み込み可視化する
with tab_dashboard:
# ========================
# セクション1: 需要予測
# ========================
st.subheader("1. 電力需要(実績+予測)")
demand = load_parquet("demand_forecast", version_key=version) # predicted_demand / realized_demand / demand など
if demand is None:
st.stop()
print("demand: \n", demand)
fig_d = plot_demand(demand, now_floor)
st.plotly_chart(fig_d)
st.markdown(
f"""
<h4 style="text-align:center; margin-top: 1rem;">
予測対象期間:{demand_from:%Y年%m月%d日} 〜 {demand_to:%Y年%m月%d日}(7日間)
</h4>
""",
unsafe_allow_html=True,
)
st.markdown("---")
2. JEPX 価格(実績 + 分位点予測)
価格は「1本の予測値」よりも 幅(リスクレンジ) が見えるほうが有用なので、分位点(P10/P50/P90)で表示します。
plot_price関数
import pandas as pd
import plotly.graph_objects as go
from visualize.style_figure import style_figure
def add_band(fig, df, name, color, opacity):
# 上側(P90)
fig.add_trace(
go.Scatter(
x=df["timestamp"],
y=df["predicted_price(90%)"],
mode="lines",
line=dict(width=0, color=color),
showlegend=False,
hoverinfo="skip",
)
)
# 下側(P10)を塗りつぶし(P90→P10の順で塗る)
fig.add_trace(
go.Scatter(
x=df["timestamp"],
y=df["predicted_price(10%)"],
mode="lines",
fill="tonexty",
name=name,
line=dict(width=0, color=color),
opacity=opacity,
)
)
return fig
def plot_price(price_fc, today, now_floor):
tomorrow0 = pd.Timestamp(today) + pd.Timedelta(days=1)
cut_band = tomorrow0 + pd.Timedelta(days=1) # 明後日0:00
mask_day1 = (price_fc["timestamp"] >= tomorrow0) & (price_fc["timestamp"] < cut_band)
mask_day2plus = (price_fc["timestamp"] >= cut_band)
df1 = price_fc.loc[mask_day1].copy()
df2 = price_fc.loc[mask_day2plus].copy()
fig_p = go.Figure()
# 実績価格
if "tokyo_price_jpy_per_kwh" in price_fc.columns:
fig_p.add_trace(
go.Scatter(
x=price_fc["timestamp"],
y=price_fc["tokyo_price_jpy_per_kwh"],
mode="lines",
name="スポット価格(実績)",
line=dict(width=2.5, color="#4FC3F7"), # 明るい水色
)
)
# P50(中心予測)
if "predicted_price(50%)" in df1.columns:
fig_p.add_trace(
go.Scatter(
x=df1["timestamp"],
y=df1["predicted_price(50%)"],
mode="lines",
name="予測価格",
line=dict(width=2, color="#FFCA28", dash="dot"), # 点線
)
)
# fig_p = add_band(fig_p, df1, name="P10–P90(+24hまで)", color="#FA9A8D", opacity=0.20)
# fig_p = add_band(fig_p, df2, name="P10–P90(+48h以降は参考)", color="#FA9A8D", opacity=0.10)
# 分位点バンド(10〜90)
if all(c in df1.columns for c in ["predicted_price(10%)", "predicted_price(90%)"]):
fig_p.add_trace(
go.Scatter(
x=df1["timestamp"],
y=df1["predicted_price(90%)"],
mode="lines",
name="P90",
line=dict(width=0, color="#FA9A8D"),
showlegend=False,
)
)
fig_p.add_trace(
go.Scatter(
x=df1["timestamp"],
y=df1["predicted_price(10%)"],
mode="lines",
name="P10-P90",
fill="tonexty", # P90 と P10 の間を塗る
line=dict(width=0, color="#FA9A8D"),
opacity=0.2,
showlegend=True,
)
)
# 現在時刻の縦線
fig_p.add_shape(
type="line",
x0=now_floor, x1=now_floor,
y0=0, y1=1,
xref="x", yref="paper",
line=dict(color="red", dash="dot") # 赤色+破線
)
# 注釈を追加して「Now」と表示
fig_p.add_annotation(
x=now_floor,
y=1.05, # yref="paper" の上端に近い位置
xref="x", yref="paper",
text="Now",
showarrow=False,
font=dict(color="red", size=12),
align="center"
)
one_day_ms = 24 * 60 * 60 * 1000
six_hours_ms = 6 * 60 * 60 * 1000 # 6時間ごとに縦グリッド
fig_p = style_figure(
fig_p,
x_title="日時",
y_title="価格 [円/kWh]",
x_dtick=one_day_ms,
x_minor_dtick=six_hours_ms, # グリッドを細かく
y_dtick=5,
)
fig_p.update_traces(
# ホバーのヘッダー部分(時間)の形式を指定
hovertemplate="<b>%{x|%Y/%m/%d %H:%M}</b><br>" +
"%{fullData.name}: %{y:,.0f} 円<extra></extra>"
)
return fig_p
# ========================
# セクション2: 価格(実績+分位点予測)予測
# ========================
st.subheader("2. JEPX 東京スポット価格と分位点予測")
price_fc = load_parquet("price_forecast", version_key=version) # price / predicted_price(10/50/90) / tokyo_price_jpy_per_kwh
if price_fc is None:
st.stop()
print("price_fc: \n", price_fc)
fig_p = plot_price(price_fc=price_fc, today=today, now_floor=now_floor)
st.plotly_chart(fig_p)
st.markdown(
f"""
<h4 style="text-align:center; margin-top: 1rem;">
予測対象期間:{price_from:%Y年%m月%d日} 〜 {price_to:%Y年%m月%d日}(6日間)
</h4>
""",
unsafe_allow_html=True,
)
st.markdown("---")
3. 最適化結果(電源構成)
最適化は「数値一覧」よりも「積み上げ帯」のほうが直感的です。
PV/風力/水力/火力/…が1日を通じてどう切り替わるかを見せます。
plot_energy_mix関数
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from visualize.style_figure import style_figure
COLOR_MAP = {
"lng": "#F4A6A6", # パステルレッド(LNG)
"coal": "#F7C1C1", # パステルピンク(石炭)
"oil": "#FFD6E8", # 明るいピンク(石油)
"th_other": "#FFEAF2", # ごく淡いピンク(火力その他)
"biomass": "#D9F9B1", # パステル黄緑(バイオマス)
"wind_net": "#B7E4B6", # パステルグリーン(風力)
"hydro": "#B3D9FF", # パステルブルー(水力)
"pstorage_gen": "#CDEEFF", # 淡い水色(揚水発電)
"pv_net": "#FFFACD", # レモンシフォン(太陽光)
"battery_dis": "#FFDAB3", # パステルオレンジ(蓄電池放電)
"import": "#D3D3D3", # ライトグレー(連系線受電)
"misc": "#D8BFD8", # パステルパープル(その他)
}
def plot_energy_mix(out_df: pd.DataFrame, now_floor):
df = out_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.sort_values("timestamp")
# 数値にしておく(存在する列だけ)
num_cols = [
"pv","wind","hydro","coal","oil","lng","th_other", "biomass", "misc"
"curtail_pv","curtail_wind",
"pstorage_gen","pstorage_pump",
"battery_dis","battery_ch",
"import",
"reserve","shed","predicted_demand","total_cost"
]
for c in num_cols:
if c in df:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)
# --- 純発電(抑制を引いた分) ---
df["pv_net"] = np.maximum(df.get("pv", 0) - df.get("curtail_pv", 0), 0)
df["wind_net"] = np.maximum(df.get("wind", 0) - df.get("curtail_wind", 0), 0)
# --- バランス残差チェック ---
# モデルの需給式:
# PV + 風 + 水力 + 火力 + 揚水発電 + 電池放電 + 連系線 + Shed
# = 需要 + 揚水揚水 + 電池充電
gen_total = (
df.get("pv", 0)
+ df.get("wind", 0)
+ df.get("hydro", 0)
+ df.get("coal", 0)
+ df.get("oil", 0)
+ df.get("lng", 0)
+ df.get("th_other", 0)
+ df.get("biomass", 0)
+ df.get("misc", 0)
+ df.get("pstorage_gen", 0)
+ df.get("battery_dis", 0)
+ df.get("import", 0)
)
demand_total = (
df.get("load_MW", 0)
+ df.get("pstorage_pump", 0)
+ df.get("battery_ch", 0)
)
resid = gen_total + df.get("shed", 0) - demand_total
max_abs_resid = float(np.abs(resid).max())
if max_abs_resid > 50: # 50MW超えなら警告
print(f"[warn] balance residual max |resid| = {max_abs_resid:.1f} MW (確認推奨)")
fig = go.Figure()
# --- 積み上げ(供給側:純発電+揚水発電+電池放電+連系線) ---
stack = "netgen"
def add_stack(col, name):
if col in df and df[col].abs().sum() > 0:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df[col],
name=name, mode="lines",
stackgroup=stack,
line=dict(width=0.0),
fillcolor=COLOR_MAP.get(col, "#cccccc") # fallback色
))
# 下から積む順序(ベースに近いものを下に)
add_stack("pstorage_gen", "揚水(発電)")
add_stack("hydro", "水力")
add_stack("oil", "石油")
add_stack("coal", "石炭")
add_stack("lng", "LNG")
add_stack("pv_net", "太陽光(純)")
add_stack("battery_dis", "蓄電池(放電)")
add_stack("wind_net", "風力(純)")
add_stack("import", "連系線(流入)")
add_stack("th_other", "火力(その他)")
add_stack("biomass", "バイオマス")
add_stack("misc", "その他")
# --- 需要ライン(本来の需要) ---
if "predicted_demand" in df:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["predicted_demand"], name="需要(予測)",
mode="lines", line=dict(color="#4FC3F7", width=2.5)
))
# --- 予備力帯(load → load+reserve を塗る) ---
if "reserve" in df and df["reserve"].abs().sum() > 0:
# 下側(需要)をダミーで描く
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["predicted_demand"],
showlegend=False, line=dict(width=0), hoverinfo="skip",
hoverlabel=None, name="", hovertemplate=None
))
# 上側(需要+予備力)
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["predicted_demand"] + df["reserve"],
name="予備力",
fill="tonexty", fillcolor="rgba(255,165,0,0.15)",
line=dict(width=0),
hovertemplate="%{x|%H:%M}<br>予備力: %{customdata:.0f} MW",
customdata=df["reserve"]
))
# --- 抑制は別レイヤ(純発電の上に薄く重ねる) ---
if "curtail_pv" in df and df["curtail_pv"].sum() > 0:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["pv_net"] + df["curtail_pv"],
line=dict(width=0), showlegend=False, hoverinfo="skip"
))
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["pv_net"],
name="出力抑制(PV)",
fill="tonexty", fillcolor="rgba(255,0,0,0.12)",
line=dict(width=0),
hovertemplate="%{x|%H:%M}<br>PV抑制: %{customdata:.0f} MW",
customdata=df["curtail_pv"]
))
if "curtail_wind" in df and df["curtail_wind"].sum() > 0:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["wind_net"] + df["curtail_wind"],
line=dict(width=0), showlegend=False, hoverinfo="skip"
))
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["wind_net"],
name="出力抑制(風力)",
fill="tonexty", fillcolor="rgba(0,0,255,0.12)",
line=dict(width=0),
hovertemplate="%{x|%H:%M}<br>風力抑制: %{customdata:.0f} MW",
customdata=df["curtail_wind"]
))
# --- 供給不足ライン(実際に賄えた需要=需要 - Shed) ---
if "shed" in df and df["shed"].max() > 0:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["predicted_demand"] - df["shed"],
name="供給不足(実供給)",
mode="lines",
line=dict(color="crimson", dash="dot"),
hovertemplate="%{x|%H:%M}<br>不足: %{customdata:.0f} MW",
customdata=df["shed"]
))
# --- 総コストは右軸 ---
if "total_cost" in df:
fig.add_trace(go.Scatter(
x=df["timestamp"], y=df["total_cost"],
name="総コスト",
mode="lines",
line=dict(width=2.5, color="#FFCA28", dash="dot"),
yaxis="y2"
))
# 現在時刻の縦線
fig.add_shape(
type="line",
x0=now_floor, x1=now_floor,
y0=0, y1=1,
xref="x", yref="paper",
line=dict(color="red", dash="dot") # 赤色+破線
)
# 注釈を追加して「Now」と表示
fig.add_annotation(
x=now_floor,
y=1.05, # yref="paper" の上端に近い位置
xref="x", yref="paper",
text="Now",
showarrow=False,
font=dict(color="red", size=12),
align="center"
)
one_day_ms = 24 * 60 * 60 * 1000
one_hours_ms = 60 * 60 * 1000 # 1時間ごとに縦グリッド
fig = style_figure(
fig,
x_title="時刻",
y_title="電力 [MW]",
x_dtick=one_day_ms,
x_minor_dtick=one_hours_ms, # グリッドを細かく
y_dtick=2000,
)
# そのあとで、このグラフ固有のレイアウトを上書き
fig.update_layout(
# title=title,
yaxis2=dict(title="コスト", overlaying="y", side="right", showgrid=False),
hovermode="x unified",
margin=dict(l=60, r=60, t=60, b=40),
height=560,
)
fig.update_traces(
hovertemplate="<b>%{fullData.name}</b>: %{y:,.1f} MW<extra></extra>"
)
fig.update_xaxes(hoverformat="%Y/%m/%d %H:%M")
fig.for_each_trace(
lambda trace: trace.update(hovertemplate="<b>%{fullData.name}</b>: %{y:,.0f} 円<extra></extra>")
if trace.name and "コスト" in trace.name else ()
)
fig.for_each_trace(
lambda trace: trace.update(hoverinfo="skip", hovertemplate=None)
if not trace.name else ()
)
return fig
# ========================
# セクション3: 発電ミックス最適化結果
# ========================
st.subheader("3. エネルギーミックス(最適化結果)")
dispatch = load_parquet("dispatch_optimal", version_key=version)
if dispatch is None:
st.stop()
print("dispatch: \n", dispatch)
fig_balance = plot_energy_mix(dispatch, now_floor)
st.plotly_chart(fig_balance)
st.markdown(
f"""
<h4 style="text-align:center; margin-top: 1rem;">
予測対象日:{today:%Y年%m月%d日}
</h4>
""",
unsafe_allow_html=True,
)
全体コード
全体のコードはこちらです(app.py)。
https://github.com/nakamin/portfolio8/blob/main/app.py
明日
GitHub Actionsで自動更新する仕組みを設定します!![]()