使い勝手のよい Python ですが WEB画面の開発となると億劫です。Gradio や Streamlit という手もありますが、AI が生成するコンテンツを動的に埋め込むことなんかもやってみたい。
ということで FastAPI と Bootstrap を用いて Python のための WEB画面開発ツールのプロトタイプ pyboots(仮称)を作成したので紹介します。
ざっくりした仕組みは、生成した HTMLコンテンツをブラウザへ送り、以後はブラウザが click などのイベントをサーバへ通知します。1000行に満たない小さなコードです。
なお、今回は AI によるコンテンツ生成までは行いません。以降に AI は登場しませんのであしからず。
導入
まずは、画面とその Python コードをご覧ください。
Hello World
この画面を作成する Python コードは次の通りです。Hallo World の割にコード量が多いですが URLパスを内部に隠蔽しない方針のためです。
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")
# 画面ページのレイアウト
pg = pb.Page()
with pg.Row():
pg.P(text="Hello World!")
# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
return templates.TemplateResponse(
request=request, name="index.html", context={"content": pg.get_content()}
)
# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
return StreamingResponse(pg.iter_script(), media_type="application/javascript")
画面ページのレイアウトは次のように with 文で構成します。Streamlit や Gradio からインスピレーションを得ました。
# 画面ページのレイアウト
pg = pb.Page()
with pg.Row():
pg.P(text="Hello World!")
HTMLテンプレート index.html
は、レイアウトしたコンテンツを HTML の body に埋め込むシンプルなテンプレートです。
<!DOCTYPE HTML>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>PyBoots</title>
<link rel="stylesheet" href="/ui/static/css/bootstrap.min.css">
<script type="module" src="/ui/static/js/bootstrap.bundle.min.js"></script>
<script type="module" src="/ui/index/js/pyboot.js"></script>
</head>
<body>
{{ content|safe }}
</body>
</html>
ここで <script type="module" src="/ui/index/js/pyboot.js">
は、本ツールが生成するスクリプトです。
グリッド
Bootstrap のグリッドシステムです。
この画面を作成する Python コードは次の通りです。
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")
# 画面ページのレイアウト
pg = pb.Page()
with pg.Container():
with pg.Row():
with pg.Col():
pg.P(text="Row-1 Col-1")
with pg.Col():
pg.P(text="Row-1 Col-2")
with pg.Row():
with pg.Col():
pg.P(text="Row-2 Col-1")
with pg.Col():
pg.P(text="Row-2 Col-2")
with pg.Col():
pg.P(text="Row-2 Col-3")
# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
return templates.TemplateResponse(
request=request, name="index.html", context={"content": pg.get_content()}
)
# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
return StreamingResponse(pg.iter_script(), media_type="application/javascript")
サイドバーとクリックイベント
確認ボタンをクリックするとサイドバーに入力された内容がメインコンテンツに表示されます。
この画面を作成する Python コードは次の通りです。
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")
# 画面ページのレイアウト
pg = pb.Page()
with pg.Header(style='height: 80px;'):
with pg.Container():
with pg.Row():
pg.H1(text="Hello World!")
with pg.Main():
with pg.LeftSidebar(style='width: 280px; min-height: calc(100vh - 80px);'):
with pg.Nav():
with pg.NavItem():
c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
with pg.NavItem():
c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
('東日本', '東日本'),
('西日本', '西日本'),
])
with pg.NavItem():
c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
('犬', 'dog', True),
('猫', 'cat', False),
('猿', 'monkey', True)
])
with pg.NavItem():
c_confirm = pg.Button(id='btn', label='確認')
with pg.Container():
with pg.Row():
with pg.Col():
c_greeting = pg.P()
# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):
pets = [key for key, val, chk in pet.choices if chk]
text = f'''こんにちは {name.value} さん!
{name.value} さんのお住まいは{area.value}です.
{name.value} さんが飼っているペットは{pets}です.'''
data = c_greeting.set(text=text)
return data
# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
return templates.TemplateResponse(
request=request, name="index.html", context={"content": pg.get_content()}
)
# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
return StreamingResponse(pg.iter_script(), media_type="application/javascript")
ここで次の処理は、部品 c_confirm
に click
イベントを設定し、それを URLパス "/ui/index/confirm/onclick" のパスハンドラに紐づけます。ブラウザからサーバへ送信する部品を引数 nodes
に列挙します。
# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):
この場合、ブラウザで確認ボタンがクリックされるとサーバの URLパスハンドラ on_btn_click()
が呼ばれます。パスハンドラは更新する部品の内容を返信します。
ストリーミング(ダウンストリーム)
ストリームボタンをクリックすると HTTPストリーミングレスポンスの内容がメインコンテンツに逐次表示されます。
この画面を作成する Python コードは次の通りです。
+import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")
# 画面ページのレイアウト
pg = pb.Page()
with pg.Header(style='height: 80px;'):
with pg.Container():
with pg.Row():
pg.H1(text="Hello World!")
with pg.Main():
with pg.LeftSidebar(style='width: 280px; min-height: calc(100vh - 80px);'):
with pg.Nav():
with pg.NavItem():
c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
with pg.NavItem():
c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
('東日本', '東日本'),
('西日本', '西日本'),
])
with pg.NavItem():
c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
('犬', 'dog', True),
('猫', 'cat', False),
('猿', 'monkey', True)
])
with pg.NavItem():
c_confirm = pg.Button(id='btn', label='確認')
c_start = pg.Button(id='btn2', label='ストリーム', classes='btn btn-secondary')
with pg.Container():
with pg.Row():
with pg.Col():
c_greeting = pg.P()
+ c_message = pg.StreamExample()
# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):
pets = [key for key, val, chk in pet.choices if chk]
text = f'''こんにちは {name.value} さん!
{name.value} さんのお住まいは{area.value}です.
{name.value} さんが飼っているペットは{pets}です.'''
data = c_greeting.set(text=text)
return data
+# ストリームボタンクリック
+@c_start.event('click', app, outs=[c_message])
+@app.post("/ui/index/start/onclick")
+async def on_btn2_click():
+ text = '吾輩は猫である。名前はまだ無い。どこで生れたかとんと見当けんとうがつかぬ。何でも薄暗いじめじめした所でニャーニャー泣いていた事だけは記憶している。'
+ async def generate_data():
+ for c in text:
+ yield c
+ await asyncio.sleep(0.05)
+
+ headers = {'x-pyboots-response': 'stream'};
+ return StreamingResponse(generate_data(), media_type="text/plain", headers=headers)
# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
return templates.TemplateResponse(
request=request, name="index.html", context={"content": pg.get_content()}
)
# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
return StreamingResponse(pg.iter_script(), media_type="application/javascript")
ストリームレスポンスを用いる場合は、そのことをブラウザ側へ通知する必要があります。レスポンスの HTTPヘッダに {'x-pyboots-response': 'stream'}
を設定します。加えて次のようにストリームを受信する部品をデコレータ @c_start.event()
の引数 outs
に指定します。
@c_start.event('click', app, outs=[c_message])
静的なカスタム部品
独自の部品を作成できます。
部品が満たすべき必須要件
- Pydantic の
BaseModel
クラスを継承する - ユーザが指定できる部品属性を
BaseModel
形式で定義する - 部品属性
id
を持つ - HTML要素(Jinja2テンプレート)を内部属性
_x_enter_template
に定義する - 部品属性の
id
とHTML要素のid
属性を一致させる
クリックなどのイベントリスナーは、id
が付与されたHTML要素に追加されます。id
は HTML要素の識別のみならずブラウザとサーバ間の紐づけなど部品を識別する唯一不変の識別子としてシステム内で利用されます。
カスタム部品のコード例
from pydantic import BaseModel
# カスタム部品
class H3(BaseModel):
id: str = ''
text: str
_x_enter_template = '<h3 id="{{ id }}">{{ text }}</h3>'
# カスタム部品クラスを画面ページの extra_parts に指定
pg = pb.Page(extra_parts=[H3])
pg.H3(text="Hello World!")
関数 pb.Page()
の引数 extra_parts
にカスタム部品を指定すればページレイアウトで利用できます。
他の部品を包含するコンテナ部品を作成する場合は、開始タグを内部属性 _x_enter_template
に、終了タグを内部属性 _x_leave_template
に指定します。
コンテナ部品のコード例
# コンテナ部品を定義
class Div(BaseModel):
id: str = ''
classes: str = ''
_x_enter_template = '<div id="{{ id }}" class="{{ classes }}">'
_x_leave_template = '</div>'
コンテナ部品は with文にて他の部品を包含できます。
pg = pb.Page(extra_parts=[H3, Div])
with pg.Div():
title = pg.H3(text="Hello World!")
次のような HTML が生成されるでしょう。
<div id="x_12345" class="">
<h3 id="x_67890">Hello World!</h3>
</div>
データ検証
HTTPリクエスト Body の JSON は、FastAPI と Pydantic により検証されます。
次は、click イベントのパスハンドラーの引数にカスタム部品 H3
を指定する例です。引数の変数名を部品 id の x_67890
とし、その型を部品の型 H3
とします。そうすることでリクエスト Body の JSON が検証され当該部品の属性値が引数に格納されます。詳しくは FastAPI ドキュメントの「複数のボディパラメータ」を参照ください。
# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[title])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, x_67890: H3)
print(x_67890.text)
実行結果
Hello World!
FastAPI の APIドキュメントで API のスキーマを確認できます。
部品グループ
部品数が増えるとパスハンドラの引数にそれらを1つずつ並べるのは手間です。その場合は、次のように部品をまとめたグループを作成することができます。
from typing_extensions import Annotated
from pyboots.fastapi_helper import NodeGroup
pg = pb.Page()
with pg.Container(classes='container'):
with pg.Row():
c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
('東日本', '東日本'),
('西日本', '西日本'),
])
c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
('犬', 'dog', True),
('猫', 'cat', False),
('猿', 'monkey', True)
])
c_confirm = pg.Button(id='btn', label='確認')
# 部品グループ
sb = NodeGroup('Form', app, nodes=[c_name, c_area, c_pet])
Form = Annotated[dict, sb.var_type]
# 確認ボタンクリック
@sb.event('click', c_confirm)
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(form: Form):
name, area, pet = form.name, form.area, form.pet
ここでは、部品グループのモデル sb.var_type
を辞書型 dict
に Annotated することで新たな型 Form を定義しそれをパスハンドラの引数で用います。
動的なカスタム部品
ユーザーインタラクションにより部品の値を書き換えたいことがあります。本ツールはそのためのインタフェースを用意しています。部品はそれらのインタフェースを実装することで動的に内容を書き換えることができます。
インタフェース一覧
名前 | 呼び出し | 想定用途 |
---|---|---|
init(id, data) |
DOMContentLoaded | 部品の初期化 |
handler(id, event, post) |
画面イベント発生時 | イベント委譲の取捨選択。サーバへ通知する場合は post(callback) を呼ぶ |
get(id, event) |
画面イベントをサーバへ通知するとき | 現在の部品の値を返す。入力値の検証。エラーを throw するとサーバへの通知はキャンセルされる |
set(id, data) |
サーバからレスポンスを受信したとき | 指定された値を部品に設定する |
downstream(id, stream) |
サーバからストリーミングレスポンスを受信したとき | ストリーミングレスポンスの受信処理 |
先ほどの静的なカスタム部品を動的な部品に書き換えます。
from pydantic import BaseModel
# カスタム部品
class H3(BaseModel):
id: str = ''
text: str
_x_enter_template = '<div><h3 id="{{ id }}"></h3></div>'
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
handler: function(id, event, post) {
event.preventDefault();
if (event.target.matches('h3')) post();
},
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
return {
id: elem.id,
text: elem.textContent,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('text' in data) elem.textContent = data.text;
},
downstream: async function(id, stream) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const decoder = new TextDecoder();
const reader = stream.getReader();
elem.textContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
elem.textContent = elem.textContent + decoder.decode(value);
}
}
}
'''
イベントリスナーはページロード時にのみ指定の id
の要素に設定されます。要素を書き換える際は注意してください
動作確認環境
- bootstrap v5.3
- python 3.12
Pythonパッケージ
- fastapi 0.110.1
- Jinja2 3.1.3
- pydantic 2.7.0
- uvicorn 0.29.0
セットアップ
フォルダ構成
./
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── app/
├── main.py
├── pyboots/ # 今回、作成したツール
│ ├── __init__.py
│ ├── fastapi_helper.py
│ ├── node.py
│ ├── page.py
│ └── pyboots.js
├── static/ # Bootstrap のサイトからダウンロード
│ ├── css/
| | └── bootstrap.min.css
│ └── js/
| └── bootstrap.bundle.min.js
└── templates/
└── index.html
Bootstrap ダウンロード
Bootstrapのダウンロードページ から zip ファイルをダウンロード。bootstrap.min.css
と bootstrap.bundle.min.js
を取り出し上記フォルダに配置する。
Docker関連ファイル
Dockerfile
FROM public.ecr.aws/docker/library/python:3.12-slim
WORKDIR /app
COPY ./requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY app/. ./
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
services:
pyboots:
image: pyboots:0.1
build:
context: .
dockerfile: ./Dockerfile
container_name: pyboots
working_dir: /app
command: ["uvicorn", "main:app", "--reload", "--port", "8000", "--host", "0.0.0.0", "--log-level", "debug"]
ports:
- "0.0.0.0:8000:8000"
volumes:
- ./app:/app
requirements.txt
fastapi
uvicorn[standard]
pydantic
jinja2
Pythonスクリプトと JavaScript
以降が今回、作成したツール pyboots(仮称)です。
app/pyboots/page.py
本機能のサーバ側処理の中核となる Page
クラスと NodeBroker
クラスを定義しています。
import os
import re
import html
from jinja2 import Environment, select_autoescape
from . import node
# HTL要素の属性 id の正規表現パターン
PATTERN_ID_RAW = r'^[a-zA-Z][a-zA-Z0-9_]*$'
PATTERN_ID = re.compile(PATTERN_ID_RAW)
class Page:
"""WEB ページ"""
def __init__(self, jinja2_env=None, extra_parts=None):
if not jinja2_env:
jinja2_env = Environment(
autoescape=select_autoescape(['html']),
trim_blocks=True, # 行の先頭の空白を削除
lstrip_blocks=True # ブロックの前の空白を削除
)
extra_parts = extra_parts or []
self.contents = []
self.extra_scripts = []
self.num_nodes = 0
self.jinja2_env = jinja2_env
self.script_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pyboots.js')
# ユーザ定義の画面部品
self.extra_parts = {cls.__name__: cls for cls in extra_parts}
# Jinja2 ユーザ定義フィルタ
if 'nl2br' not in jinja2_env.filters:
# 改行文字を <br> へ変換
jinja2_env.filters['nl2br'] = lambda text: html.escape(text).replace('\n', '<br>')
def add_content(self, content):
self.contents.append(content)
def get_content(self):
return ''.join(self.contents)
def add_script(self, content):
self.extra_scripts.append(content)
def iter_script(self):
with open(self.script_file, 'r') as fp:
yield fp.read()
for script in self.extra_scripts:
yield script
def __getattr__(self, attr_name):
"""文字列 attr_name に合致するクラスのインスタンスを作成する関数を返す
使用例
pg = pyboot.Page(jinja2_env=templates.env)
with pg.Header():
with pg.Container():
with pg.Row():
pg.H1(text="Hello")
"""
cls = self.extra_parts.get(attr_name) or getattr(node, attr_name, None)
if cls is None:
raise AttributeError("'Page' object has no attribute '{}'".format(attr_name))
def new(*args, **kwargs):
"""ノードとそのページャを作成"""
node = cls(*args, **kwargs)
node_id = 'x_{}'.format(self.num_nodes)
self.num_nodes += 1
return NodeBroker(self, node, node_id, jinja2_env=self.jinja2_env)
return new
class NodeBroker:
"""ツリーの始点と終端ノードをページに追加"""
def __init__(self, page, node, default_id, jinja2_env):
if not hasattr(node, 'id'):
raise TypeError("expected 'id' attribute, not {}".format(node))
if not hasattr(node, '_x_enter_template'):
raise TypeError("expected '_x_enter_template' attribute, not {}".format(node))
if not callable(getattr(node, 'model_dump', None)):
raise TypeError("expected 'model_dump()' method, not {}".format(node))
if jinja2_env is None:
raise RuntimeError("required jinja2_env")
self.page = page
self.node = node
self.jinja2_env = jinja2_env
self.leave_render = None
# ノードに id が指定されていなければデフォルト値を設定
if not node.id:
node.id = default_id
elif node.id.startswith('x_'):
raise ValueError("'{}' is not allowed for user-specified id. starts with 'x_' are system reserved".format(node.id))
# ノード id 検証
prohibitions = 'event',
if node.id in prohibitions:
raise ValueError("'{}' is not a valid id. cannot be used the following words {} ".format(node.id, prohibitions))
if not PATTERN_ID.match(node.id):
raise ValueError("'{}' is not a valid id. must match the following pattern '{}'".format(node.id, PATTERN_ID_RAW))
# HTML要素 (開始) をページに追加
context = node.model_dump()
page.add_content(NodeRender(node._x_enter_template, context, jinja2_env).render())
# スクリプトをページに追加
if hasattr(self.node, '_x_script'):
page.add_script(NodeScriptRender(node._x_script, context, jinja2_env).render())
# HTML要素 (終了)
leave_template = getattr(self.node, '_x_leave_template', None)
if leave_template:
self.leave_render = NodeRender(leave_template, context, jinja2_env)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
"""HTML要素 (終了) をページに追加"""
if self.leave_render:
self.page.add_content(self.leave_render.render())
@property
def var_type(self):
return self.node.__class__
@property
def id(self):
return self.node.id
def set(self, **kwargs):
if 'id' in kwargs:
raise ValueError("'id' attribute cannot be changed")
data = self.node.copy(update=kwargs)
return data
def add_event_listener(self, url_path, event_name, nodes, outs):
"""画面イベントをサーバへ通知するブラウザ側のスクリプトをページに追加"""
template = '''
nodeBroker.setup.push(() => {
nodeBroker.addNodeEventListener({{ url_path | tojson }}, {{ event_name | tojson }}, {{ event_target | tojson }}, {{ depends | tojson }}, {{ outs | tojson }});
});
'''
context = {
'url_path': url_path,
'event_name': event_name,
'event_target': self.id,
'depends': [x.id for x in nodes],
'outs': [x.id for x in outs],
}
# ページにスクリプトを追加
self.page.add_script(NodeRender(template, context, self.jinja2_env).render())
return
def event(self, event_name, app, nodes=None, outs=None):
"""Nodeイベントリスナーのデコレータ"""
nodes = nodes or []
outs = outs or []
def decorator(handler):
routes = get_routes_by_name(app, handler.__name__)
if not routes:
raise ValueError("not found route name '{}' in api routes".format(handler.__name__))
elif len(routes) > 1:
raise ValueError("duplicate route name '{}' in api routes".format(handler.__name__))
url_path = routes[0].path
self.add_event_listener(url_path, event_name, nodes, outs)
return handler
return decorator
def get_routes_by_name(app, name):
"""API のルート取得"""
routes = []
for route in app.routes:
if getattr(route, 'name', '') == name:
routes.append(route)
return routes
class NodeRender:
"""画面部品の HTML をレンダリング"""
def __init__(self, template, context, jinja2_env):
self.template = template
self.context = context
self.jinja2_env = jinja2_env
def render(self):
template = self.jinja2_env.from_string(self.template)
raw_text = template.render(self.context)
return raw_text
class NodeScriptRender:
"""画面部品の JavaScript をレンダリング"""
def __init__(self, template, context, jinja2_env):
self.context = context
self.jinja2_env = jinja2_env
self.template = '''
{
const nodeId = {{ id | tojson }};
const node = ''' + template + ''';
const context = {{ _context | tojson }};
nodeBroker.push(nodeId, node, context);
}
'''
def render(self):
template = self.jinja2_env.from_string(self.template)
raw_text = template.render(_context=self.context, **(self.context))
return raw_text
app/pyboots/pyboots.js
本機能のブラウザ側処理の中核となるスクリプトです。普段は JS を使わないので雑ですみません。
// 画面部品毎の init(), get(), set() 関数を格納する
const nodeBroker = {
nodes: {}, // ノードを保持
context: {}, // ノードを初期化するコンテキスト
setup: [], // イベントリスナー設定などの初期化を行う関数リスト
// 初期化処理
boot: async function () {
// Initialize each nodes
const promises = Object.entries(this.nodes).map(([id, node]) => node.init(id, this.context[id]));
const results = await Promise.allSettled(promises);
results.filter(r => r.status === 'rejected').forEach(r => console.error(r.reason));
this.setup.forEach(f => f());
},
// ノード追加
push: function (nodeId, node, context) {
if (node.init === undefined) node.init = (id, data) => { };
if (node.handler === undefined) node.handler = (id, ev, cb) => cb();
if (node.get === undefined) node.get = (id, ev) => { };
if (node.set === undefined) node.set = (id, data) => { };
this.nodes[nodeId] = node;
this.context[nodeId] = context;
},
// 画面部品にイベントハンドラを設定
addNodeEventListener: function (url, eventName, eventTargetId, dependencyIds, outIds) {
const elem = document.getElementById(eventTargetId);
if (!elem) {
console.error(`element with ID ${eventTargetId} not found`);
return;
}
elem.addEventListener(eventName, async function (event) {
const nodeHandler = nodeBroker.nodes[eventTargetId].handler;
await nodeHandler(eventTargetId, event, function (callback) {
nodeBroker.eventHandler(url, event, dependencyIds, outIds)
.catch(error => {
console.error('Error:', error);
}).finally(() => {
if (callback) callback();
});
});
});
},
// イベントハンドラ
eventHandler: async function (url, ev, dependencyIds, outIds) {
let payload;
const event = { id: ev.target.id, pointer_type: ev.pointerType, type: ev.type };
if (dependencyIds.length == 0) {
payload = event;
} else {
const [numRejected, nodeData] = await this.getNodes(dependencyIds, ev);
if (numRejected > 0) return;
nodeData.event = event;
payload = nodeData;
}
const response = await nodeBroker.postJSON(url, payload);
if (response.headers.get('x-pyboots-response') == 'stream') {
// Streaming response の場合
await this.downstreamNodes(response.body, outIds)
} else {
let data = await response.json();
if (!data) return;
if (!Array.isArray(data)) data = [data];
// List[node, node, ...] to Object( {nodeId: node, nodeId: node, ...} )
const nodeData = data
.filter(node => (outIds.length == 0 || node.id in outIds))
.reduce((d, node) => {
d[node.id] = node;
return d;
}, {});
await this.setNodes(nodeData);
}
},
// nodeIds の部品の属性を取得
// Return: nodeData = {nodeId: {...}, nodeId: {...}, ...}
getNodes: async function (nodeIds, ev) {
let numRejected = 0;
let nodeData = {};
if (nodeIds.length == 0) return [numRejected, nodeData];
const promises = nodeIds.map(id => this.nodes[id].get(id, ev));
const results = await Promise.allSettled(promises);
results.forEach((r, index) => {
r.id = nodeIds[index];
if (r.status === 'fulfilled') {
nodeData[r.id] = r.value;
} else {
console.info('Node error:', r.reason);
numRejected++;
nodeData[r.id] = null; // ここでエラーに対応したデフォルト値を設定
}
});
return [numRejected, nodeData];
},
// nodeData を部品に設定
// nodeData = {nodeId: {...}, nodeId: {...}, ...}
setNodes: async function (nodeData) {
const promises = Object.entries(nodeData).map(([id, value]) => this.nodes[id].set(id, value));
const results = await Promise.allSettled(promises);
results.filter(r => r.status === 'rejected').forEach(r => console.log('node error:', r.reason));
},
// 部品の downstream
downstreamNodes: async function (stream, nodeIds) {
// stream を複製
const streams = await this.splitStream(stream, nodeIds.length);
const validIds = nodeIds.filter(id => 'downstream' in this.nodes[id]);
const promises = validIds.map((id, index) => this.nodes[id].downstream(id, streams[index]));
const results = await Promise.allSettled(promises);
results.filter(r => r.status === 'rejected').forEach(r => console.log('node error:', r.reason));
},
// サーバにPOSTリクエストを送信する関数
postJSON: async function (url, payload) {
const requestOptions = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(payload)
};
const response = await fetch(url, requestOptions);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response;
},
// ReadableStream オブジェクト reader を n 個に複製
splitStream: async function (stream, n) {
let newStreams = [stream];
for (let i = 1; i < n; i++) {
const [left, right] = newStreams[0].tee();
newStreams[0] = left;
newStreams.push(right);
}
return newStreams;
}
};
document.addEventListener('DOMContentLoaded', async function () {
await nodeBroker.boot();
});
app/pyboots/node.py
画面部品の定義です。
from typing import List, Tuple, Optional
from pydantic import BaseModel, Field
BASIC_SCRIPT = '''
{
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
return {
id: elem.id,
classes: elem.getAttribute('class'),
style: elem.style,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('classes' in data) elem.setAttribute('class', data.classes);
if ('style' in data) elem.style = data.style;
}
}
'''
class Header(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'container-fluid d-grid gap-3 align-items-center'
style: Optional[str] = 'height: 15vh; max-height: 15vh;'
_x_enter_template = '<div id="{{ id }}">'
_x_leave_template = '</div>'
_x_script = BASIC_SCRIPT
class Main(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'd-flex flex-nowrap'
style: Optional[str] = ''
_x_enter_template = '<main id="{{ id }}" class="{{ classes }}" style="{{ style }}">'
_x_leave_template = '</main>'
_x_script = BASIC_SCRIPT
class LeftSidebar(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'd-flex flex-column flex-shrink-0 p-3 text-bg-dark'
style: Optional[str] = 'width: 280px; height: 100vh'
_x_enter_template = '<div id="{{ id }}">'
_x_leave_template = '</div>'
_x_script = BASIC_SCRIPT
class Container(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'container-fluid'
style: Optional[str] = ''
_x_enter_template = '<div id="{{ id }}">'
_x_leave_template = '</div>'
_x_script = BASIC_SCRIPT
class Row(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'row'
style: Optional[str] = ''
_x_enter_template = '<div id="{{ id }}">'
_x_leave_template = '</div>'
_x_script = BASIC_SCRIPT
class Col(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'col'
style: Optional[str] = 'heigh'
_x_enter_template = '<div id="{{ id }}">'
_x_leave_template = '</div>'
_x_script = BASIC_SCRIPT
class Nav(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'nav nav-pills flex-column mb-auto'
style: Optional[str] = ''
_x_enter_template = '<ul id="{{ id }}">'
_x_leave_template = '</ul>'
_x_script = BASIC_SCRIPT
class NavItem(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'nav-item'
style: Optional[str] = ''
_x_enter_template = '<li id="{{ id }}">'
_x_leave_template = '</li>'
_x_script = BASIC_SCRIPT
class H1(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'display-5 fw-bold text-body-emphasis'
style: Optional[str] = ''
text: Optional[str] = ''
_x_enter_template = '<h1 id="{{ id }}"></h1>'
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
return {
id: elem.id,
classes: elem.getAttribute('class'),
style: elem.style,
text: elem.textContent,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('classes' in data) elem.setAttribute('class', data.classes);
if ('style' in data) elem.style = data.style;
if ('text' in data) elem.textContent = data.text;
}
}
'''
class P(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'mb-1'
text: str = ''
_x_enter_template = '<div id="{{ id }}"><p></p></div>'
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const childs = elem.children;
return {
id: elem.id,
classes: childs[0].getAttribute('class'),
text: Array.from(childs).map(p => p.textContent).join('\\n'),
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('text' in data) {
while (elem.firstChild) elem.removeChild(elem.firstChild);
const lines = data.text.split('\\n');
lines.forEach(line => {
const p = document.createElement('p');
p.setAttribute('class', data.classes);
p.textContent = line;
elem.appendChild(p);
});
}
}
}
'''
class Button(BaseModel):
id: Optional[str] = ''
classes: Optional[str] = 'btn btn-primary'
label: Optional[str] = ''
_x_enter_template = '<button type="button" id="{{ id }}" style="margin-right:0.5em;"></button>'
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
handler: function(id, event, post) {
event.target.disabled = true;
post(() => event.target.disabled = false);
},
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
return {
id: elem.id,
classes: elem.getAttribute('class'),
label: elem.textContent,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('classes' in data) elem.setAttribute('class', data.classes);
if ('label' in data) elem.textContent = data.label;
}
}
'''
class Input(BaseModel):
id: Optional[str] = ''
label: Optional[str] = ''
type: Optional[str] = 'text'
value: Optional[str] = ''
disabled: Optional[bool] = False
placeholder: Optional[str] = ''
_x_enter_template = '''
<div id="{{ id }}" class="mb-3">
<label for="{{ id }}_input" class="form-label"></label>
<input type="text" class="form-control" id="{{ id }}_input">
</div>
'''
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
handler: function(id, event, post) {
event.preventDefault();
// input 要素のときだけサーバへ通知
if (event.target.matches('input')) post();
},
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const label = elem.children[0];
const input = elem.children[1];
return {
id: elem.id,
label: label.textContent,
type: input.type,
value: input.value,
disabled: input.disabled,
placeholder: input.placeholder,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
let label = elem.children[0];
let input = elem.children[1];
if ('label' in data) label.textContent = data.label;
if ('type' in data) input.type = data.type;
if ('value' in data) input.value = data.value;
if ('disabled' in data) input.disabled = data.disabled;
if ('placeholder' in data) input.placeholder = data.placeholder;
}
}
'''
class Select(BaseModel):
id: Optional[str] = ''
label: Optional[str] = Field(default='', examples=['Your name'])
value: Optional[str] = ''
choices: Optional[List[Tuple[str, str]]] = [] # list of (label, value)
disabled: Optional[bool] = False
model_config = {
'json_schema_extra': {
'examples': [
{
'id': 'id-my-favorite',
'label': '好きな動物は?',
'value': 'cat',
'choices': [('犬', 'dog'), ('猫', 'cat'), ('猿', 'monkey')],
'disabled': True,
}
]
}
}
_x_enter_template = '''
<div id="{{ id }}" class="mb-3">
<label for="{{ id }}_select" class="form-label"></label>
<select class="form-select" id="{{ id }}_select">
<option value="" selected></option>
</select>
</div>
'''
_x_script = '''
{
// Select
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const label = elem.children[0];
const select = elem.children[1];
const options = select.children;
return {
id: elem.id,
label: label?.textContent,
value: select?.value,
disabled: select?.disabled,
choices: Array.from(options).map(o => [o.textContent, o.value]),
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
let label = elem.children[0];
let select = elem.children[1];
if ('choices' in data) {
while (select.firstChild) select.removeChild(select.firstChild);
data.choices.forEach(([caption, value]) => {
const opt = document.createElement('option');
opt.value = value;
opt.textContent = caption;
select.appendChild(opt);
});
}
if ('label' in data) label.textContent = data.label;
if ('value' in data) select.value = data.value;
if ('disabled' in data) select.disabled = data.disabled;
}
}
'''
class CheckboxGroup(BaseModel):
id: Optional[str] = ''
label: Optional[str] = ''
choices: Optional[List[Tuple[str, str, bool]]] = [] # list of (label, value, checked)
_x_enter_template = '''
<div id="{{ id }}" class="mb-3">
<label class="form-label"></label>
<div class="form-check">
<input type="checkbox">
<label class="form-check-label"></label>
</div>
</div>
'''
_x_script = '''
{
// CheckboxGroup
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const labelGroup = elem.firstElementChild;
const childs = elem.querySelectorAll('.form-check');
const label = labelGroup.textContent;
const choices = Array.from(childs).map(div => {
const input = div.children[0];
return [div.children[1].textContent, input.value, Boolean(input.checked)]
});
return {
id: elem.id,
label: label,
choices: choices,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
elem.querySelectorAll('.form-check').forEach(child => {
elem.removeChild(child);
});
if ('label' in data) elem.firstElementChild.textContent = data.label;
if ('choices' in data) {
data.choices.forEach(([caption, value, checked]) => {
let div = document.createElement('div');
let input = document.createElement('input');
let label = document.createElement('label');
div.className = "form-check";
input.type = "checkbox";
input.className = "form-check-input";
input.value = value;
input.checked = checked;
label.className = "form-check-label";
label.textContent = caption;
div.appendChild(input);
div.appendChild(label);
elem.appendChild(div);
});
}
}
}
'''
class StreamExample(BaseModel):
id: Optional[str] = ''
text: str = ''
_x_enter_template = '<p id="{{ id }}" class="mb-3"></p>'
_x_script = '''
{
init: function(id, data) { this.set(id, data); },
get: function(id, event) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
return {
id: elem.id,
text: elem.textContent,
};
},
set: function(id, data) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
if ('text' in data) elem.textContent = data.text;
},
downstream: async function(id, stream) {
const elem = document.getElementById(id);
if (elem === undefined) throw new Error("element '${id}' is not found");
const decoder = new TextDecoder();
const reader = stream.getReader();
elem.textContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
elem.textContent = elem.textContent + text;
}
}
}
'''
app/pyboots/__init__.py
お約束の __init__.py
from pydantic import BaseModel
from typing import List, Dict, Union, Optional
from .page import (
Page,
NodeBroker,
)
from .node import (
Main,
Container,
Header,
Row,
Col,
LeftSidebar,
Nav,
NavItem,
H1,
P,
Button,
Input,
Select,
CheckboxGroup,
StreamExample,
)
class NodeEvent(BaseModel):
id: str
pointer_type: Optional[Union[str, None]] = None
type: str
class NodeEventMessage(BaseModel):
event: NodeEvent
nodes: List[Union[Dict[str, Dict], None]] = []
app/pyboots/fastapi_helper.py
fastapi_helper.py
はちょっとしたスニペットなので必須ではありません。 実行時に複数の部品を組み合わせた型(Pydantic モデル)を生成します。前述の部品グループを参照。
from typing import List, Any, Dict, Union, Callable, Optional
from pydantic import create_model
from fastapi import FastAPI
from starlette.routing import BaseRoute
from typing_extensions import Annotated
from . import NodeEvent, NodeBroker
class NodeGroup:
def __init__(self, name: str, app: FastAPI, nodes: List[NodeBroker]=[], outs: List[NodeBroker]=[]) -> None:
self.name = name
self.app = app
self.nodes = nodes
self.outs = outs
@property
def var_type(self):
"""型定義を生成"""
if not self.nodes:
return NodeEvent
else:
nodes_fields = {x.id: (Union[x.var_type, None], None) for x in self.nodes}
model = create_model(
self.name,
event=(NodeEvent, ...),
**nodes_fields,
)
return model
def __call__(self, body: Dict) -> Any:
return self.var_type.model_validate(body)
def event(self, event_name: str, event_target: NodeBroker) -> Callable:
"""Nodeイベントリスナーのデコレータ"""
return event_target.event(event_name, self.app, nodes=self.nodes, outs=self.outs)
実行方法
コンテナイメージを作成し実行。
docker compose build
docker compose up -d
ブラウザで http://localhost:8000/ui/index
を開く。
今後の展望
ブラウザとサーバのいずれも画面部品のインターフェースでアクセスすることがこのツールの特徴ですが、Python と JavaScript でそれぞれ実装が必要なことが課題です。本ツールを利用しながらブラッシュアップを続けたいと思います。
最後まで目を通して頂きありがとうございます。