2
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIでWebSocket入門

Last updated at Posted at 2024-08-04

はじめに

WebSocketという言葉は何度も聞いたことがありましたが、使うのは難しそうだなと思っていました。そのため、これまではFirebaseなどのBaaSを利用して、WebSocketの利用を避けていました。

しかし、FastAPIを使ってWebSocketを試してみたところ、意外にも簡単に実装できたので、練習として作成した簡単なチャットアプリを例に、FastAPIでのWebSocketの使い方を紹介してみたいと思います!

Qiita_Websocket.gif
(左のチャット画面と右のチャット画面は別々のクライアントです)

記事を読む上での注意点

一部おかしな記述や不正確な情報が含まれているかもしれません。

もしお気付きの点やご指摘があれば、コメントで教えていただけると幸いです。

この記事の対象読者

  • WebSocketって聞いたことあるけど、何なのかはあまり知らない人
  • WebSocketを使ってみたいと思っているけど、難しそうと思っている人
  • FastAPIを使ってチャットアプリを作りたい人

WebSocketとは

WebSocketとは、クライアントとサーバーの双方向通信を可能にするプロトコルです。HTTP通信を使用して初期接続を行った後、クライアントとサーバー間で持続的な接続を維持し、リアルタイムでデータを送受信することができます。

なんとなくこの説明を聞いても、「ふーん、そんなもんか」と思うかもしれませんが、特定のユースケースにおいては非常に強大な力を発揮します。

例えば、自分以外のクライアントがデータを書き換えたときに、その書き換えたデータを自分の画面にも即座に反映させたい場合を考えてみます。ここでは、WebSocketとよく比較されるポーリングという手法と比較する形で、WebSocketの力を説明してみたいと思います。

ポーリング

ポーリングとは、定期的にサーバーにリクエストを送信する手法です。この場合、サーバーにデータが更新されていないかを問い合わせるために、何度もリクエストを送信する必要があります。しかも、別のクライアントがデータを更新した後も、次のポーリングまで時間がある場合があり、リアルタイム性も失われてしまいます。

Screenshot 2024-08-04 at 18.52.35.png

WebSocket

一方でWebSocketは、最初に1度のHTTPリクエストでハンドシェイクしてしまえば、以降はサーバー側からもクライアントに通知を送ることができます。これにより、別のクライアントがデータを更新したら即座にサーバー側から通知を送信することができるため、リアルタイムにデータを更新することができます。

Screenshot 2024-08-04 at 19.01.28.png

FastAPIのWebsocket

公式ドキュメント

公式のドキュメントにもFastAPIのWebSocketの使い方の説明があります。ここで紹介されている例はFastAPI単体で動作するミニマルな構成になっています。使い方を知りたい方はぜひ見てみてください。

使い方

"http://"ではなく"ws://"から始まるURLでWebSocketを開通させることができます。

このエンドポイントでは、新たなWebSoceketを作成し、クライアントからのメッセージを受け付けています。クライアントからのメッセージを受信したら、そのクライアントに対してメッセージをそのまま送信するというシンプルなAPIエンドポイントです。

from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept() # Websocketを開通
    try:
        while True:
            # ここに接続中に行いたい処理を記述する
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        websocket.close()

WebSocketクラスのメソッド

FastAPIが提供しているWebSocketクラスにはいくつかのメソッドがあります。以下によく使用するメソッドを挙げます。

accept()
WebSocket接続を受け入れます。このメソッドは接続が確立されるまで待機します。

close()
WebSocket接続を閉じます。これにより、接続が終了し、リソースが解放されます。

receive_text()
クライアントから送信されたテキストメッセージを受け取ります。

receive_json()
クライアントから送信されたJSONメッセージを受け取ります。受け取ったメッセージはPythonのデータ型として返されます。

send_text(message: str)
クライアントにテキストメッセージを送信します。

send_json(message: Any)
クライアントにJSONメッセージを送信します。メッセージはPythonのデータ型(辞書、リストなど)として提供できます。

制作物(チャットアプリ)

FastAPIの主な使用用途として、その名前にもあるようにAPIサーバーを実装することが挙げられます。そのため、今回は公式ドキュメントでは紹介されていなかったフロントエンドとの連携を練習するために、Next.jsとFastAPIを使用してチャットアプリを作成しました。

ここでは、WebSocket通信に関する部分を抜粋して、FastAPIのWebSocketの使用方法を紹介します。また、完全なコードはGitHubにて公開しています。稚拙な部分が多いかと思いますが、参考程度に見ていただければ幸いです。

バックエンド

まず、WebSocket接続を管理するためのConnectionManagerクラスを作成します。このクラスはクライアントの接続管理とメッセージ送信のロジックをカプセル化しています。

まず、コンストラクタで接続するクライアントを保持しておく配列を準備し、connect()でその配列にクライアントを追加していきます。クライアントは接続時に作成されるWebSocketインスタンスで判別します。

今回作成したチャットアプリではユーザーの情報はDBに保持していないので、WebSocketインスタンスを利用して、メッセージの送信者か、そうでないかしか判別できません。したがって、送信者と送信者以外で別々のJSONを送信しています。これはクライアント側で表示の仕方を変えるためです。

冒頭に添付した動画では2つのクライアント間でチャットを行なっていましたが、このクラスから分かるように何人でも同時にチャットを行うことができます。

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str, sender: WebSocket):
        data = {
            "text": message,
            "type": "broadcast"
        }
        for connection in self.active_connections:
            if connection != sender:
                await connection.send_json(data)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        data = {
            "text": message,
            "type": "personal"
        }
        await websocket.send_json(data)

補足
このクラスの中身は、WebSocketを用いて行いたい処理に応じて書き換えてください。今回はチャットアプリなので、クライアントの接続管理に加えて、メッセージのブロードキャストの処理などが含まれています。

次は、WebSocketでリアルタイムに通信を行う、ある種の”グループ”を作成し、その”グループ”ごとに、ConnectionManagerを作成します。今回のチャットアプリの場合は、チャットの部屋ごとにメッセージの通知をクライアントへとリアルタイムに送信します。そのため、今回はルームごとにConnectionManagerインスタンスを作成します。

以下に示したのは、ルームの作成部分です。ここで、ルームごとに一意のIDを生成して、そのIDをもとにConnectionManagerインスタンスを作成して、配列に保持しています。

async def create_room(db: AsyncSession, name: str) -> ChatRoom:
    socket_id = str(uuid.uuid4()) # ここでIDを生成
    new_room = ChatRoom(name=name, socket_id=socket_id)
    room_managers[socket_id] = ConnectionManager() # ConnectionManagerを作成
    db.add(new_room)
    await db.commit()
    await db.refresh(new_room)
    return new_room

最後にWebSocketのエンドポイント部分の説明を行います。ここではまず、先ほどルームごとに生成したIDを用いて、そのルームのConnectionManagerを取得します。次に、接続要求のあったクライアントのWebSocketインスタンスをConnectionManagerに登録します。

あとは、メッセージを受け取ったらそのメッセージを接続しているクライアントに送信してあげます。

@router.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    manager = get_room_manager(room_id)
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.send_personal_message(data, websocket)
            await manager.broadcast(data, websocket)
    except WebSocketDisconnect:
        manager.disconnect(websocket)

補足
全ての接続は、メモリのリストの中で管理されているため、プロセスの実行中にのみ機能し、単一のプロセスでのみ機能することに注意してください。

フロントエンド

フロントエンドでは、ルームに入室した時点からWebSocket通信を開始します。ここでは、ルームごとに定められたIDとともにAPIを叩くことでWebSocket通信を開始しています。ここで、作成されたWebSocketインスタンスが返ってくるので、これを保持しておきます。

ここで、接続が開始されると.onopenが呼び出され、終了すると.oncloseが呼び出されます。

useEffect(() => {
    const websocket = new WebSocket(`ws://localhost:8000/ws/${params.socket_id}`);
    setWs(websocket);

    websocket.onopen = () => {
        console.log("WebSocket connection established");
    };

    websocket.onclose = () => {
        console.log("WebSocket connection closed");
    };

    return () => {
        websocket.close();
    };
}, [params.socket_id]);

次に、メッセージを受信する部分の説明を行います。サーバー側から通知が来ると.onmessageが呼び出されます。ここでは受け取ったメッセージをJSONにパースしてメッセージに加えています。

useEffect(() => {
    if (websocket) {
        websocket.onmessage = (event) => {
            const data = JSON.parse(event.data);
            const newMessage: Message = data;
            setMessages(prevMessages => [...prevMessages, newMessage]);
        };
    }

    return () => {
        if (websocket) {
            websocket.onmessage = null;
        }
    };
}, [websocket]);

補足
今回はメッセージを受信する部分のコンポーネントを別にしているので、別のuseEffect内に記述していますが、もちろん接続する部分に書いてあげても大丈夫です。

最後に、メッセージを送信する部分の説明を行います。サーバーにメッセージを送信したいときは.sendを呼び出してあげればメッセージを送信することができます。ここで送信したメッセージは接続しているクライアントすべてに配信されます。

const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (websocket && message.trim()) {
        websocket.send(message);
        setMessage("");
    }
};

おわりに

なんだか難しそうだと思っていたWebSocketですが、FastAPIを使うと簡単に実装できることがわかってもらえたら嬉しいです。

今回はチャットアプリでしたが、例えば、オンラインオセロアプリを作成したかったら、ConnectionManagerをゲームごとに作成してあげて、クライアントを2人登録し、更新された盤面を送受信してあげれば良いです。また、時間のかかる処理にWebSocketを使いたかったら、処理ごとにConnectionManagerを作成し、依頼元のクライアントに必要であればタスク進捗状況を伝えたり、タスクが終了したらその結果を通知してあげれば良いと思います。

意外とシンプルで、選択肢の中にWebSocketがあれば、設計やアプリの選択肢も広がると思うので、ぜひ使ってみてください。

ご指摘やアドバイスがあればコメントで教えていただけると幸いです。

2
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?