0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python で手軽にWEB画面を作りたい

Last updated at Posted at 2024-04-25

使い勝手のよい Python ですが WEB画面の開発となると億劫です。Gradio や Streamlit という手もありますが、AI が生成するコンテンツを動的に埋め込むことなんかもやってみたい。

ということで FastAPIBootstrap を用いて Python のための WEB画面開発ツールのプロトタイプ pyboots(仮称)を作成したので紹介します。

ざっくりした仕組みは、生成した HTMLコンテンツをブラウザへ送り、以後はブラウザが click などのイベントをサーバへ通知します。1000行に満たない小さなコードです。

なお、今回は AI によるコンテンツ生成までは行いません。以降に AI は登場しませんのであしからず。

導入

まずは、画面とその Python コードをご覧ください。

Hello World

image.png

この画面を作成する Python コードは次の通りです。Hallo World の割にコード量が多いですが URLパスを内部に隠蔽しない方針のためです。

main.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb

app = FastAPI()

templates = Jinja2Templates(directory="templates")

# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")

# 画面ページのレイアウト
pg = pb.Page()
with pg.Row():
    pg.P(text="Hello World!")


# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
    return templates.TemplateResponse(
        request=request, name="index.html", context={"content": pg.get_content()}
    )


# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
    return StreamingResponse(pg.iter_script(), media_type="application/javascript")

画面ページのレイアウトは次のように with 文で構成します。Streamlit や Gradio からインスピレーションを得ました。

# 画面ページのレイアウト
pg = pb.Page()
with pg.Row():
    pg.P(text="Hello World!")

HTMLテンプレート index.html は、レイアウトしたコンテンツを HTML の body に埋め込むシンプルなテンプレートです。

intex.html
<!DOCTYPE HTML>
<html>
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>PyBoots</title>
    <link rel="stylesheet" href="/ui/static/css/bootstrap.min.css">
    <script type="module" src="/ui/static/js/bootstrap.bundle.min.js"></script>
    <script type="module" src="/ui/index/js/pyboot.js"></script>
  </head>
  <body>
    {{ content|safe }}
  </body>
</html>

ここで <script type="module" src="/ui/index/js/pyboot.js"> は、本ツールが生成するスクリプトです。

グリッド

image.png

Bootstrap のグリッドシステムです。

この画面を作成する Python コードは次の通りです。

main.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb

app = FastAPI()

templates = Jinja2Templates(directory="templates")

# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")

# 画面ページのレイアウト
pg = pb.Page()
with pg.Container():
    with pg.Row():
        with pg.Col():
            pg.P(text="Row-1 Col-1")
        with pg.Col():
            pg.P(text="Row-1 Col-2")
    with pg.Row():
        with pg.Col():
            pg.P(text="Row-2 Col-1")
        with pg.Col():
            pg.P(text="Row-2 Col-2")
        with pg.Col():
            pg.P(text="Row-2 Col-3")


# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
    return templates.TemplateResponse(
        request=request, name="index.html", context={"content": pg.get_content()}
    )


# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
    return StreamingResponse(pg.iter_script(), media_type="application/javascript")

サイドバーとクリックイベント

image.png

確認ボタンをクリックするとサイドバーに入力された内容がメインコンテンツに表示されます。

この画面を作成する Python コードは次の通りです。

main.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb

app = FastAPI()

templates = Jinja2Templates(directory="templates")

# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")

# 画面ページのレイアウト
pg = pb.Page()
with pg.Header(style='height: 80px;'):
    with pg.Container():
        with pg.Row():
            pg.H1(text="Hello World!")
with pg.Main():
    with pg.LeftSidebar(style='width: 280px; min-height: calc(100vh - 80px);'):
        with pg.Nav():
            with pg.NavItem():
                c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
            with pg.NavItem():
                c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
                    ('東日本', '東日本'),
                    ('西日本', '西日本'),
                ])
            with pg.NavItem():
                c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
                    ('', 'dog', True),
                    ('', 'cat', False),
                    ('', 'monkey', True)
                ])
            with pg.NavItem():
                c_confirm = pg.Button(id='btn', label='確認')
    with pg.Container():
        with pg.Row():
            with pg.Col():
                c_greeting = pg.P()


# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):
    pets = [key for key, val, chk in pet.choices if chk]

    text = f'''こんにちは {name.value} さん!
{name.value} さんのお住まいは{area.value}です.
{name.value} さんが飼っているペットは{pets}です.'''

    data = c_greeting.set(text=text)
    return data


# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
    return templates.TemplateResponse(
        request=request, name="index.html", context={"content": pg.get_content()}
    )


# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
    return StreamingResponse(pg.iter_script(), media_type="application/javascript")

ここで次の処理は、部品 c_confirmclick イベントを設定し、それを URLパス "/ui/index/confirm/onclick" のパスハンドラに紐づけます。ブラウザからサーバへ送信する部品を引数 nodes に列挙します。

# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):

この場合、ブラウザで確認ボタンがクリックされるとサーバの URLパスハンドラ on_btn_click() が呼ばれます。パスハンドラは更新する部品の内容を返信します。

ストリーミング(ダウンストリーム)

streaming.gif

ストリームボタンをクリックすると HTTPストリーミングレスポンスの内容がメインコンテンツに逐次表示されます。

この画面を作成する Python コードは次の通りです。

main.py
+import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pyboots as pb

app = FastAPI()

templates = Jinja2Templates(directory="templates")

# Bootstrap の CSS と JS を公開
app.mount("/ui/static", StaticFiles(directory="static"), name="static")

# 画面ページのレイアウト
pg = pb.Page()
with pg.Header(style='height: 80px;'):
    with pg.Container():
        with pg.Row():
            pg.H1(text="Hello World!")
with pg.Main():
    with pg.LeftSidebar(style='width: 280px; min-height: calc(100vh - 80px);'):
        with pg.Nav():
            with pg.NavItem():
                c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
            with pg.NavItem():
                c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
                    ('東日本', '東日本'),
                    ('西日本', '西日本'),
                ])
            with pg.NavItem():
                c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
                    ('', 'dog', True),
                    ('', 'cat', False),
                    ('', 'monkey', True)
                ])
            with pg.NavItem():
                c_confirm = pg.Button(id='btn', label='確認')
                c_start = pg.Button(id='btn2', label='ストリーム', classes='btn btn-secondary')
    with pg.Container():
        with pg.Row():
            with pg.Col():
                c_greeting = pg.P()
+                c_message = pg.StreamExample()


# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[c_name, c_area, c_pet])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, name: pb.Input, area: pb.Select, pet: pb.CheckboxGroup):
    pets = [key for key, val, chk in pet.choices if chk]

    text = f'''こんにちは {name.value} さん!
{name.value} さんのお住まいは{area.value}です.
{name.value} さんが飼っているペットは{pets}です.'''

    data = c_greeting.set(text=text)
    return data


+# ストリームボタンクリック
+@c_start.event('click', app, outs=[c_message])
+@app.post("/ui/index/start/onclick")
+async def on_btn2_click():
+    text = '吾輩は猫である。名前はまだ無い。どこで生れたかとんと見当けんとうがつかぬ。何でも薄暗いじめじめした所でニャーニャー泣いていた事だけは記憶している。'
+    async def generate_data():
+        for c in text:
+            yield c
+            await asyncio.sleep(0.05)
+
+    headers = {'x-pyboots-response': 'stream'};
+    return StreamingResponse(generate_data(), media_type="text/plain", headers=headers)


# 画面ページの HTML を返すパスハンドラ
@app.get("/ui/index", response_class=HTMLResponse)
async def ui_index(request: Request):
    return templates.TemplateResponse(
        request=request, name="index.html", context={"content": pg.get_content()}
    )


# 画面ページの JavaScript を返すパスハンドラ
@app.get("/ui/index/js/pyboot.js")
async def ui_index_script():
    return StreamingResponse(pg.iter_script(), media_type="application/javascript")

ストリームレスポンスを用いる場合は、そのことをブラウザ側へ通知する必要があります。レスポンスの HTTPヘッダに {'x-pyboots-response': 'stream'} を設定します。加えて次のようにストリームを受信する部品をデコレータ @c_start.event() の引数 outs に指定します。

@c_start.event('click', app, outs=[c_message])

静的なカスタム部品

独自の部品を作成できます。

部品が満たすべき必須要件

  • Pydantic の BaseModel クラスを継承する
  • ユーザが指定できる部品属性を BaseModel 形式で定義する
  • 部品属性 id を持つ
  • HTML要素(Jinja2テンプレート)を内部属性 _x_enter_template に定義する
  • 部品属性の id とHTML要素の id 属性を一致させる

クリックなどのイベントリスナーは、id が付与されたHTML要素に追加されます。id は HTML要素の識別のみならずブラウザとサーバ間の紐づけなど部品を識別する唯一不変の識別子としてシステム内で利用されます。

カスタム部品のコード例

main.py
from pydantic import BaseModel

# カスタム部品
class H3(BaseModel):
    id: str = ''
    text: str
    _x_enter_template = '<h3 id="{{ id }}">{{ text }}</h3>'

# カスタム部品クラスを画面ページの extra_parts に指定
pg = pb.Page(extra_parts=[H3])
pg.H3(text="Hello World!")

関数 pb.Page() の引数 extra_parts にカスタム部品を指定すればページレイアウトで利用できます。

実行結果
image.png

他の部品を包含するコンテナ部品を作成する場合は、開始タグを内部属性 _x_enter_template に、終了タグを内部属性 _x_leave_template に指定します。

コンテナ部品のコード例

# コンテナ部品を定義
class Div(BaseModel):
    id: str = ''
    classes: str = ''
    _x_enter_template = '<div id="{{ id }}" class="{{ classes }}">'
    _x_leave_template = '</div>'

コンテナ部品は with文にて他の部品を包含できます。

pg = pb.Page(extra_parts=[H3, Div])
with pg.Div():
    title = pg.H3(text="Hello World!")

次のような HTML が生成されるでしょう。

<div id="x_12345" class="">
  <h3 id="x_67890">Hello World!</h3>
</div>

データ検証

HTTPリクエスト Body の JSON は、FastAPI と Pydantic により検証されます。

次は、click イベントのパスハンドラーの引数にカスタム部品 H3 を指定する例です。引数の変数名を部品 id の x_67890 とし、その型を部品の型 H3 とします。そうすることでリクエスト Body の JSON が検証され当該部品の属性値が引数に格納されます。詳しくは FastAPI ドキュメントの「複数のボディパラメータ」を参照ください。

# 確認ボタンクリック
@c_confirm.event('click', app, nodes=[title])
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(event: pb.NodeEvent, x_67890: H3)
    print(x_67890.text)

実行結果

Hello World!

FastAPI の APIドキュメントで API のスキーマを確認できます。
image.png

部品グループ

部品数が増えるとパスハンドラの引数にそれらを1つずつ並べるのは手間です。その場合は、次のように部品をまとめたグループを作成することができます。

from typing_extensions import Annotated
from pyboots.fastapi_helper import NodeGroup

pg = pb.Page()
with pg.Container(classes='container'):
    with pg.Row():
        c_name = pg.Input(id='name', value='', label='名前', placeholder='太郎')
        c_area = pg.Select(id='area', label='お住まいの地域', value='東日本', choices=[
            ('東日本', '東日本'),
            ('西日本', '西日本'),
        ])
        c_pet = pg.CheckboxGroup(id='pet', label='ペット', choices=[
            ('', 'dog', True),
            ('', 'cat', False),
            ('', 'monkey', True)
        ])
        c_confirm = pg.Button(id='btn', label='確認')

# 部品グループ
sb = NodeGroup('Form', app, nodes=[c_name, c_area, c_pet])
Form = Annotated[dict, sb.var_type]

# 確認ボタンクリック
@sb.event('click', c_confirm)
@app.post("/ui/index/confirm/onclick")
async def on_btn_click(form: Form):
    name, area, pet = form.name, form.area, form.pet

ここでは、部品グループのモデル sb.var_type を辞書型 dict に Annotated することで新たな型 Form を定義しそれをパスハンドラの引数で用います。

動的なカスタム部品

ユーザーインタラクションにより部品の値を書き換えたいことがあります。本ツールはそのためのインタフェースを用意しています。部品はそれらのインタフェースを実装することで動的に内容を書き換えることができます。

インタフェース一覧

名前 呼び出し 想定用途
init(id, data) DOMContentLoaded 部品の初期化
handler(id, event, post) 画面イベント発生時 イベント委譲の取捨選択。サーバへ通知する場合は post(callback) を呼ぶ
get(id, event) 画面イベントをサーバへ通知するとき 現在の部品の値を返す。入力値の検証。エラーを throw するとサーバへの通知はキャンセルされる
set(id, data) サーバからレスポンスを受信したとき 指定された値を部品に設定する
downstream(id, stream) サーバからストリーミングレスポンスを受信したとき ストリーミングレスポンスの受信処理

先ほどの静的なカスタム部品を動的な部品に書き換えます。

from pydantic import BaseModel

# カスタム部品
class H3(BaseModel):
    id: str = ''
    text: str
    _x_enter_template = '<div><h3 id="{{ id }}"></h3></div>'
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    handler: function(id, event, post) {
        event.preventDefault();
        if (event.target.matches('h3')) post();
    },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        return {
            id: elem.id,
            text: elem.textContent,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('text' in data)   elem.textContent = data.text;
    },
    downstream: async function(id, stream) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const decoder = new TextDecoder();
        const reader = stream.getReader();
        elem.textContent = '';
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            elem.textContent = elem.textContent + decoder.decode(value);
        }
    }
}
'''

イベントリスナーはページロード時にのみ指定の id の要素に設定されます。要素を書き換える際は注意してください

動作確認環境

  • bootstrap v5.3
  • python 3.12

Pythonパッケージ

  • fastapi 0.110.1
  • Jinja2 3.1.3
  • pydantic 2.7.0
  • uvicorn 0.29.0

セットアップ

フォルダ構成

./
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── app/
    ├── main.py
    ├── pyboots/   # 今回、作成したツール
    │   ├── __init__.py
    │   ├── fastapi_helper.py
    │   ├── node.py
    │   ├── page.py
    │   └── pyboots.js
    ├── static/    # Bootstrap のサイトからダウンロード
    │   ├── css/
    |   |   └── bootstrap.min.css
    │   └── js/
    |       └── bootstrap.bundle.min.js
    └── templates/
        └── index.html

Bootstrap ダウンロード

Bootstrapのダウンロードページ から zip ファイルをダウンロード。bootstrap.min.cssbootstrap.bundle.min.js を取り出し上記フォルダに配置する。

Docker関連ファイル

Dockerfile

Dockerfile
FROM public.ecr.aws/docker/library/python:3.12-slim
WORKDIR /app
COPY ./requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY app/. ./
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml

docker-compose.yml
services:
  pyboots:
    image: pyboots:0.1
    build:
      context: .
      dockerfile: ./Dockerfile
    container_name: pyboots
    working_dir: /app
    command: ["uvicorn", "main:app", "--reload", "--port", "8000", "--host", "0.0.0.0", "--log-level", "debug"]
    ports:
      - "0.0.0.0:8000:8000"
    volumes:
      - ./app:/app

requirements.txt

requirements.txt
fastapi
uvicorn[standard]
pydantic
jinja2

Pythonスクリプトと JavaScript

以降が今回、作成したツール pyboots(仮称)です。

app/pyboots/page.py

本機能のサーバ側処理の中核となる Page クラスと NodeBroker クラスを定義しています。

page.py
import os
import re
import html
from jinja2 import Environment, select_autoescape
from . import node

# HTL要素の属性 id の正規表現パターン
PATTERN_ID_RAW = r'^[a-zA-Z][a-zA-Z0-9_]*$'
PATTERN_ID = re.compile(PATTERN_ID_RAW)


class Page:
    """WEB ページ"""
    def __init__(self, jinja2_env=None, extra_parts=None):
        if not jinja2_env:
            jinja2_env = Environment(
                autoescape=select_autoescape(['html']),
                trim_blocks=True,  # 行の先頭の空白を削除
                lstrip_blocks=True  # ブロックの前の空白を削除
            )
        extra_parts = extra_parts or []
        self.contents = []
        self.extra_scripts = []
        self.num_nodes = 0
        self.jinja2_env = jinja2_env
        self.script_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pyboots.js')
        # ユーザ定義の画面部品
        self.extra_parts = {cls.__name__: cls for cls in extra_parts}

        # Jinja2 ユーザ定義フィルタ
        if 'nl2br' not in jinja2_env.filters:
            # 改行文字を <br> へ変換
            jinja2_env.filters['nl2br'] = lambda text: html.escape(text).replace('\n', '<br>')

    def add_content(self, content):
        self.contents.append(content)

    def get_content(self):
        return ''.join(self.contents)

    def add_script(self, content):
        self.extra_scripts.append(content)

    def iter_script(self):
        with open(self.script_file, 'r') as fp:
            yield fp.read() 
        for script in self.extra_scripts:
            yield script

    def __getattr__(self, attr_name):
        """文字列 attr_name に合致するクラスのインスタンスを作成する関数を返す

        使用例
        pg = pyboot.Page(jinja2_env=templates.env)
        with pg.Header():
            with pg.Container():
                with pg.Row():
                    pg.H1(text="Hello")
        """
        cls = self.extra_parts.get(attr_name) or getattr(node, attr_name, None)
        if cls is None:
            raise AttributeError("'Page' object has no attribute '{}'".format(attr_name))

        def new(*args, **kwargs):
            """ノードとそのページャを作成"""
            node = cls(*args, **kwargs)
            node_id = 'x_{}'.format(self.num_nodes)
            self.num_nodes += 1
            return NodeBroker(self, node, node_id, jinja2_env=self.jinja2_env)

        return new


class NodeBroker:
    """ツリーの始点と終端ノードをページに追加"""
    def __init__(self, page, node, default_id, jinja2_env):
        if not hasattr(node, 'id'):
            raise TypeError("expected 'id' attribute, not {}".format(node))
        if not hasattr(node, '_x_enter_template'):
            raise TypeError("expected '_x_enter_template' attribute, not {}".format(node))
        if not callable(getattr(node, 'model_dump', None)):
            raise TypeError("expected 'model_dump()' method, not {}".format(node))
        if jinja2_env is None:
            raise RuntimeError("required jinja2_env")

        self.page = page
        self.node = node
        self.jinja2_env = jinja2_env
        self.leave_render = None

        # ノードに id が指定されていなければデフォルト値を設定
        if not node.id:
            node.id = default_id
        elif node.id.startswith('x_'):
            raise ValueError("'{}' is not allowed for user-specified id. starts with 'x_' are system reserved".format(node.id))

        # ノード id 検証
        prohibitions = 'event',
        if node.id in prohibitions:
            raise ValueError("'{}' is not a valid id. cannot be used the following words {} ".format(node.id, prohibitions))
        if not PATTERN_ID.match(node.id):
            raise ValueError("'{}' is not a valid id. must match the following pattern '{}'".format(node.id, PATTERN_ID_RAW))

        # HTML要素 (開始) をページに追加
        context = node.model_dump()
        page.add_content(NodeRender(node._x_enter_template, context, jinja2_env).render())

        # スクリプトをページに追加
        if hasattr(self.node, '_x_script'):
            page.add_script(NodeScriptRender(node._x_script, context, jinja2_env).render())

        # HTML要素 (終了)
        leave_template = getattr(self.node, '_x_leave_template', None)
        if leave_template:
            self.leave_render = NodeRender(leave_template, context, jinja2_env)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """HTML要素 (終了) をページに追加"""
        if self.leave_render:
            self.page.add_content(self.leave_render.render())

    @property
    def var_type(self):
        return self.node.__class__

    @property
    def id(self):
        return self.node.id

    def set(self, **kwargs):
        if 'id' in kwargs:
            raise ValueError("'id' attribute cannot be changed")
        data = self.node.copy(update=kwargs)
        return data

    def add_event_listener(self, url_path, event_name, nodes, outs):
        """画面イベントをサーバへ通知するブラウザ側のスクリプトをページに追加"""
        template = '''
        nodeBroker.setup.push(() => {
        nodeBroker.addNodeEventListener({{ url_path | tojson }}, {{ event_name | tojson }}, {{ event_target | tojson }}, {{ depends | tojson }}, {{ outs | tojson }});
        });
        '''

        context = {
            'url_path': url_path,
            'event_name': event_name,
            'event_target': self.id,
            'depends': [x.id for x in nodes],
            'outs': [x.id for x in outs],
        }

        # ページにスクリプトを追加
        self.page.add_script(NodeRender(template, context, self.jinja2_env).render())
        return

    def event(self, event_name, app, nodes=None, outs=None):
        """Nodeイベントリスナーのデコレータ"""
        nodes = nodes or []
        outs = outs or []
        def decorator(handler):
            routes = get_routes_by_name(app, handler.__name__)
            if not routes:
                raise ValueError("not found route name '{}' in api routes".format(handler.__name__))
            elif len(routes) > 1:
                raise ValueError("duplicate route name '{}' in api routes".format(handler.__name__))
            url_path = routes[0].path
            self.add_event_listener(url_path, event_name, nodes, outs)
            return handler
        return decorator


def get_routes_by_name(app, name):
    """API のルート取得"""
    routes = []
    for route in app.routes:
        if getattr(route, 'name', '') == name:
            routes.append(route)
    return routes


class NodeRender:
    """画面部品の HTML をレンダリング"""
    def __init__(self, template, context, jinja2_env):
        self.template = template
        self.context = context
        self.jinja2_env = jinja2_env

    def render(self):
        template = self.jinja2_env.from_string(self.template)
        raw_text = template.render(self.context)
        return raw_text


class NodeScriptRender:
    """画面部品の JavaScript をレンダリング"""
    def __init__(self, template, context, jinja2_env):
        self.context = context
        self.jinja2_env = jinja2_env
        self.template = '''
{
const nodeId = {{ id | tojson }};
const node = ''' + template + ''';
const context = {{ _context | tojson }};
nodeBroker.push(nodeId, node, context);
}
'''

    def render(self):
        template = self.jinja2_env.from_string(self.template)
        raw_text = template.render(_context=self.context, **(self.context))
        return raw_text

app/pyboots/pyboots.js

本機能のブラウザ側処理の中核となるスクリプトです。普段は JS を使わないので雑ですみません。

pyboots.js
// 画面部品毎の init(), get(), set() 関数を格納する
const nodeBroker = {
  nodes: {},    // ノードを保持
  context: {},  // ノードを初期化するコンテキスト
  setup: [],    // イベントリスナー設定などの初期化を行う関数リスト

  // 初期化処理
  boot: async function () {
    // Initialize each nodes
    const promises = Object.entries(this.nodes).map(([id, node]) => node.init(id, this.context[id]));
    const results = await Promise.allSettled(promises);
    results.filter(r => r.status === 'rejected').forEach(r => console.error(r.reason));
    this.setup.forEach(f => f());
  },

  // ノード追加
  push: function (nodeId, node, context) {
    if (node.init === undefined) node.init = (id, data) => { };
    if (node.handler === undefined) node.handler = (id, ev, cb) => cb();
    if (node.get === undefined) node.get = (id, ev) => { };
    if (node.set === undefined) node.set = (id, data) => { };
    this.nodes[nodeId] = node;
    this.context[nodeId] = context;
  },

  // 画面部品にイベントハンドラを設定
  addNodeEventListener: function (url, eventName, eventTargetId, dependencyIds, outIds) {
    const elem = document.getElementById(eventTargetId);
    if (!elem) {
      console.error(`element with ID ${eventTargetId} not found`);
      return;
    }

    elem.addEventListener(eventName, async function (event) {
      const nodeHandler = nodeBroker.nodes[eventTargetId].handler;
      await nodeHandler(eventTargetId, event, function (callback) {
        nodeBroker.eventHandler(url, event, dependencyIds, outIds)
          .catch(error => {
            console.error('Error:', error);
          }).finally(() => {
            if (callback) callback();
          });
      });
    });
  },

  // イベントハンドラ
  eventHandler: async function (url, ev, dependencyIds, outIds) {
    let payload;
    const event = { id: ev.target.id, pointer_type: ev.pointerType, type: ev.type };
    if (dependencyIds.length == 0) {
      payload = event;
    } else {
      const [numRejected, nodeData] = await this.getNodes(dependencyIds, ev);
      if (numRejected > 0) return;
      nodeData.event = event;
      payload = nodeData;
    }

    const response = await nodeBroker.postJSON(url, payload);

    if (response.headers.get('x-pyboots-response') == 'stream') {
      // Streaming response の場合
      await this.downstreamNodes(response.body, outIds)
    } else {
      let data = await response.json();
      if (!data) return;
      if (!Array.isArray(data)) data = [data];
      // List[node, node, ...] to Object( {nodeId: node, nodeId: node, ...} )
      const nodeData = data
        .filter(node => (outIds.length == 0 || node.id in outIds))
        .reduce((d, node) => {
          d[node.id] = node;
          return d;
        }, {});
      await this.setNodes(nodeData);
    }
  },

  // nodeIds の部品の属性を取得
  // Return: nodeData = {nodeId: {...}, nodeId: {...}, ...}
  getNodes: async function (nodeIds, ev) {
    let numRejected = 0;
    let nodeData = {};
    if (nodeIds.length == 0) return [numRejected, nodeData];
    const promises = nodeIds.map(id => this.nodes[id].get(id, ev));
    const results = await Promise.allSettled(promises);
    results.forEach((r, index) => {
      r.id = nodeIds[index];
      if (r.status === 'fulfilled') {
        nodeData[r.id] = r.value;
      } else {
        console.info('Node error:', r.reason);
        numRejected++;
        nodeData[r.id] = null; // ここでエラーに対応したデフォルト値を設定
      }
    });
    return [numRejected, nodeData];
  },

  // nodeData を部品に設定
  // nodeData = {nodeId: {...}, nodeId: {...}, ...}
  setNodes: async function (nodeData) {
    const promises = Object.entries(nodeData).map(([id, value]) => this.nodes[id].set(id, value));
    const results = await Promise.allSettled(promises);
    results.filter(r => r.status === 'rejected').forEach(r => console.log('node error:', r.reason));
  },

  // 部品の downstream
  downstreamNodes: async function (stream, nodeIds) {
    // stream を複製
    const streams = await this.splitStream(stream, nodeIds.length);
    const validIds = nodeIds.filter(id => 'downstream' in this.nodes[id]);
    const promises = validIds.map((id, index) => this.nodes[id].downstream(id, streams[index]));
    const results = await Promise.allSettled(promises);
    results.filter(r => r.status === 'rejected').forEach(r => console.log('node error:', r.reason));
  },

  // サーバにPOSTリクエストを送信する関数
  postJSON: async function (url, payload) {
    const requestOptions = {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify(payload)
    };
    const response = await fetch(url, requestOptions);
    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }
    return response;
  },

  // ReadableStream オブジェクト reader を n 個に複製
  splitStream: async function (stream, n) {
    let newStreams = [stream];
    for (let i = 1; i < n; i++) {
      const [left, right] = newStreams[0].tee();
      newStreams[0] = left;
      newStreams.push(right);
    }
    return newStreams;
  }
};

document.addEventListener('DOMContentLoaded', async function () {
  await nodeBroker.boot();
});

app/pyboots/node.py

画面部品の定義です。

node.py
from typing import List, Tuple, Optional
from pydantic import BaseModel, Field


BASIC_SCRIPT = '''
{
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        return {
            id: elem.id,
            classes: elem.getAttribute('class'),
            style: elem.style,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('classes' in data) elem.setAttribute('class', data.classes);
        if ('style' in data) elem.style = data.style;
    }
}
'''


class Header(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'container-fluid d-grid gap-3 align-items-center'
    style: Optional[str] = 'height: 15vh; max-height: 15vh;'

    _x_enter_template = '<div id="{{ id }}">'
    _x_leave_template = '</div>'
    _x_script = BASIC_SCRIPT


class Main(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'd-flex flex-nowrap'
    style: Optional[str] = ''

    _x_enter_template = '<main id="{{ id }}" class="{{ classes }}" style="{{ style }}">'
    _x_leave_template = '</main>'
    _x_script = BASIC_SCRIPT


class LeftSidebar(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'd-flex flex-column flex-shrink-0 p-3 text-bg-dark'
    style: Optional[str] = 'width: 280px; height: 100vh'

    _x_enter_template = '<div id="{{ id }}">'
    _x_leave_template = '</div>'
    _x_script = BASIC_SCRIPT


class Container(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'container-fluid'
    style: Optional[str] = ''

    _x_enter_template = '<div id="{{ id }}">'
    _x_leave_template = '</div>'
    _x_script = BASIC_SCRIPT


class Row(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'row'
    style: Optional[str] = ''

    _x_enter_template = '<div id="{{ id }}">'
    _x_leave_template = '</div>'
    _x_script = BASIC_SCRIPT


class Col(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'col'
    style: Optional[str] = 'heigh'

    _x_enter_template = '<div id="{{ id }}">'
    _x_leave_template = '</div>'
    _x_script = BASIC_SCRIPT


class Nav(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'nav nav-pills flex-column mb-auto'
    style: Optional[str] = ''

    _x_enter_template = '<ul id="{{ id }}">'
    _x_leave_template = '</ul>'
    _x_script = BASIC_SCRIPT


class NavItem(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'nav-item'
    style: Optional[str] = ''

    _x_enter_template = '<li id="{{ id }}">'
    _x_leave_template = '</li>'
    _x_script = BASIC_SCRIPT


class H1(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'display-5 fw-bold text-body-emphasis'
    style: Optional[str] = ''
    text: Optional[str] = ''

    _x_enter_template = '<h1 id="{{ id }}"></h1>'
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        return {
            id: elem.id,
            classes: elem.getAttribute('class'),
            style: elem.style,
            text: elem.textContent,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('classes' in data) elem.setAttribute('class', data.classes);
        if ('style' in data)   elem.style      = data.style;
        if ('text' in data)   elem.textContent = data.text;
    }
}
'''


class P(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'mb-1'
    text: str = ''

    _x_enter_template = '<div id="{{ id }}"><p></p></div>'
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const childs = elem.children;
        return {
            id: elem.id,
            classes: childs[0].getAttribute('class'),
            text: Array.from(childs).map(p => p.textContent).join('\\n'),
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('text' in data) {
            while (elem.firstChild) elem.removeChild(elem.firstChild);
            const lines = data.text.split('\\n');
            lines.forEach(line => {
                const p = document.createElement('p');
                p.setAttribute('class', data.classes);
                p.textContent = line;
                elem.appendChild(p);
            });
        }
    }
}
'''


class Button(BaseModel):
    id: Optional[str] = ''
    classes: Optional[str] = 'btn btn-primary'
    label: Optional[str] = ''

    _x_enter_template = '<button type="button" id="{{ id }}" style="margin-right:0.5em;"></button>'
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    handler: function(id, event, post) {
        event.target.disabled = true;
        post(() => event.target.disabled = false);
    },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        return {
            id: elem.id,
            classes: elem.getAttribute('class'),
            label: elem.textContent,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('classes' in data) elem.setAttribute('class', data.classes);
        if ('label' in data)   elem.textContent = data.label;
    }
}
'''


class Input(BaseModel):
    id: Optional[str] = ''
    label: Optional[str] = ''
    type: Optional[str] = 'text'
    value: Optional[str] = ''
    disabled: Optional[bool] = False
    placeholder: Optional[str] = ''

    _x_enter_template = '''
<div id="{{ id }}" class="mb-3">
<label for="{{ id }}_input" class="form-label"></label>
<input type="text" class="form-control" id="{{ id }}_input">
</div>
'''
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    handler: function(id, event, post) {
        event.preventDefault();
        // input 要素のときだけサーバへ通知
        if (event.target.matches('input')) post();
    },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const label = elem.children[0];
        const input = elem.children[1];
        return {
            id:          elem.id,
            label:       label.textContent,
            type:        input.type,
            value:       input.value,
            disabled:    input.disabled,
            placeholder: input.placeholder,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        let label = elem.children[0];
        let input = elem.children[1];
        if ('label' in data)       label.textContent = data.label;
        if ('type' in data)        input.type        = data.type;
        if ('value' in data)       input.value       = data.value;
        if ('disabled' in data)    input.disabled    = data.disabled; 
        if ('placeholder' in data) input.placeholder = data.placeholder;
    }
}
'''


class Select(BaseModel):
    id: Optional[str] = ''
    label: Optional[str]  = Field(default='', examples=['Your name'])
    value: Optional[str] = ''
    choices: Optional[List[Tuple[str, str]]] = []  # list of (label, value)
    disabled: Optional[bool] = False

    model_config = {
        'json_schema_extra': {
            'examples': [
                {
                    'id': 'id-my-favorite',
                    'label': '好きな動物は?',
                    'value': 'cat',
                    'choices': [('', 'dog'), ('', 'cat'), ('', 'monkey')],
                    'disabled': True,
                }
            ]
        }
    }

    _x_enter_template = '''
<div id="{{ id }}" class="mb-3">
 <label for="{{ id }}_select" class="form-label"></label>
 <select class="form-select" id="{{ id }}_select">
  <option value="" selected></option>
 </select>
</div>
'''
    _x_script = '''
{
    // Select
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const label = elem.children[0];
        const select = elem.children[1];
        const options = select.children;
        return {
            id:       elem.id,
            label:    label?.textContent,
            value:    select?.value,
            disabled: select?.disabled,
            choices:  Array.from(options).map(o => [o.textContent, o.value]),
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        let label = elem.children[0];
        let select = elem.children[1];
        if ('choices' in data) {
            while (select.firstChild) select.removeChild(select.firstChild);
            data.choices.forEach(([caption, value]) => {
                const opt = document.createElement('option');
                opt.value = value;
                opt.textContent = caption;
                select.appendChild(opt);
            });
        }
        if ('label' in data)    label.textContent = data.label;
        if ('value' in data)    select.value      = data.value;
        if ('disabled' in data) select.disabled   = data.disabled; 
    }
}
'''


class CheckboxGroup(BaseModel):
    id: Optional[str] = ''
    label: Optional[str]  = ''
    choices: Optional[List[Tuple[str, str, bool]]] = []  # list of (label, value, checked)

    _x_enter_template = '''
<div id="{{ id }}" class="mb-3">
 <label class="form-label"></label>
 <div class="form-check">
  <input type="checkbox">
  <label class="form-check-label"></label>
 </div>
</div>
'''
    _x_script = '''
{
    // CheckboxGroup
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const labelGroup = elem.firstElementChild;
        const childs = elem.querySelectorAll('.form-check');
        const label = labelGroup.textContent;
        const choices = Array.from(childs).map(div => {
            const input = div.children[0];
            return [div.children[1].textContent, input.value, Boolean(input.checked)]
        });
        return {
            id:      elem.id,
            label:   label,
            choices: choices,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        elem.querySelectorAll('.form-check').forEach(child => {
            elem.removeChild(child);
        });
        if ('label' in data) elem.firstElementChild.textContent = data.label;
        if ('choices' in data) {
            data.choices.forEach(([caption, value, checked]) => {
                let div = document.createElement('div');
                let input = document.createElement('input');
                let label = document.createElement('label');
                div.className = "form-check";
                input.type = "checkbox";
                input.className = "form-check-input";
                input.value = value;
                input.checked = checked;
                label.className = "form-check-label";
                label.textContent = caption;
                div.appendChild(input);
                div.appendChild(label);
                elem.appendChild(div);
            });       
        }
    }
}
'''


class StreamExample(BaseModel):
    id: Optional[str] = ''
    text: str = ''

    _x_enter_template = '<p id="{{ id }}" class="mb-3"></p>'
    _x_script = '''
{
    init: function(id, data) { this.set(id, data); },
    get: function(id, event) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        return {
            id: elem.id,
            text: elem.textContent,
        };
    },
    set: function(id, data) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        if ('text' in data) elem.textContent = data.text;
    },
    downstream: async function(id, stream) {
        const elem = document.getElementById(id);
        if (elem === undefined) throw new Error("element '${id}' is not found");
        const decoder = new TextDecoder();
        const reader = stream.getReader();
        elem.textContent = '';
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            const text = decoder.decode(value);
            elem.textContent = elem.textContent + text;
        }
    }
}
'''

app/pyboots/__init__.py

お約束の __init__.py

__init__py
from pydantic import BaseModel
from typing import List, Dict, Union, Optional

from .page import (
    Page,
    NodeBroker,
)

from .node import (
    Main,
    Container,
    Header,
    Row,
    Col,
    LeftSidebar,
    Nav,
    NavItem,
    H1,
    P,
    Button,
    Input,
    Select,
    CheckboxGroup,
    StreamExample,
)

class NodeEvent(BaseModel):
    id: str
    pointer_type: Optional[Union[str, None]] = None
    type: str

class NodeEventMessage(BaseModel):
    event: NodeEvent
    nodes: List[Union[Dict[str, Dict], None]] = []

app/pyboots/fastapi_helper.py

fastapi_helper.py はちょっとしたスニペットなので必須ではありません。 実行時に複数の部品を組み合わせた型(Pydantic モデル)を生成します。前述の部品グループを参照。

fastapi_helper.py
from typing import List, Any, Dict, Union, Callable, Optional
from pydantic import create_model
from fastapi import FastAPI
from starlette.routing import BaseRoute
from typing_extensions import Annotated
from . import NodeEvent, NodeBroker


class NodeGroup:
    def __init__(self, name: str, app: FastAPI, nodes: List[NodeBroker]=[], outs: List[NodeBroker]=[]) -> None:
        self.name = name
        self.app = app
        self.nodes = nodes
        self.outs = outs

    @property
    def var_type(self):
        """型定義を生成"""
        if not self.nodes:
            return NodeEvent
        else:
            nodes_fields = {x.id: (Union[x.var_type, None], None) for x in self.nodes}
            model = create_model(
                self.name,
                event=(NodeEvent, ...),
                **nodes_fields,
            )
            return model

    def __call__(self, body: Dict) -> Any:
        return self.var_type.model_validate(body)

    def event(self, event_name: str, event_target: NodeBroker) -> Callable:
        """Nodeイベントリスナーのデコレータ"""
        return event_target.event(event_name, self.app, nodes=self.nodes, outs=self.outs)

実行方法

コンテナイメージを作成し実行。

docker compose build

docker compose up -d

ブラウザで http://localhost:8000/ui/index を開く。

今後の展望

ブラウザとサーバのいずれも画面部品のインターフェースでアクセスすることがこのツールの特徴ですが、Python と JavaScript でそれぞれ実装が必要なことが課題です。本ツールを利用しながらブラッシュアップを続けたいと思います。

最後まで目を通して頂きありがとうございます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?