こんにちは!watnowに所属している27卒のshooooooma415です!
これが初めての投稿となります!
今回はFastAPIを用いたWebsocket通信の実装について書きたいと思います!
初めての記事なので多めにみてもらえると助かります!
そもそもWebsocket通信って何?
簡単にいうとコネクトして確立することで自由にクライアント側とサーバ側の双方向の通信を行うもので,リアルタイムなやり取りを実現するときに使います!
双方向の通信を実現することができるため,HTTP通信とは違ってサーバ側からやり取りを行うことができます!
HTTP通信じゃダメなの?何が素晴らしいの?
別にわざわざ複雑なWebsocket通信を使わなくてもHTTP通信で事足りるじゃん...そう思いますよね?
ここで何がWebsocket通信は素晴らしいのか少し書こうと思います.
サーバー側から能動的にメッセージを送ることができる!
これめちゃくちゃ素晴らしいです.HTTP通信ではクライアント側がサーバー側にリクエストを送ることで初めてサーバー側がメッセージを送ることができます.しかし!Websocket通信ではサーバー側から送りたいときに能動的にクライアント側へ送ることができるのです!
コネクションを確立させるのが1回で済む!
スマートですね.もし何回もリクエストを送りたい局面にあるとき,HTTP通信で行うと何回もコネクションを確立する必要があります.Websocket通信を用いると1回で済むんです.素晴らしい!
FastAPIで導入してみよう
今回は前述の通り,FastAPIを用いてWebsocket通信を実装します.FastAPIはインストールされている前提で話を進めていきます!
環境構築できていない方は以下のドキュメントに従って進めてください!
実行環境:
...
fastapi==0.114.0
uvicorn==0.30.6
...
さあまずはFastAPIでWebsocketを実装するのに必要なものを入れましょう!
pip install websockets
これでセットアップは完了です!実装を始めましょう!
実装しよう
基本的な実装はここに書かれています!公式ドキュメントを参考にして実装したい方はリンクからそうぞ!
WebSocketクラスのメソッドには何があるの?
ここでソースコードの説明に入る前に,先にWebsocketクラスにはどのようなメソッドがあるのかみてみましょう!今回用いたメソッドはこちらです!
accept()
WebSocket接続を受け入れるのに使います.このメソッドは接続が確立されるまでは待機し続けます.
close()
WebSocket接続を切ります.これによって接続を終了させることができます.
send_text()
クライアント側から送信されたテキストメッセージを受け取ります.
send_text(message: str)
クライアント側へテキストメッセージを送信します.
今回何を作りたかったのか
今回はリアルタイムランキング機能をWebsocket通信を用いて実装することを目指しました!特定のイベントに参加しているユーザーが現在地をサーバーに送信して他の参加者との位置情報を比較し,リアルタイムでランキングを表示する仕組みを実装しました!
Websocket通信で行う主な機能
・コネクションの確立:
イベントが始まると参加者とのコネクションを確立させます.
・リアルタイム位置情報の更新:
クライアント側が現在地をサーバーに送信.
サーバー側でそのデータを集計し,ランキングを計算して返します.
・イベント終了時の処理
イベントが終了すると,全参加者に「イベント終了通知」を送り,最終ランキングを確定させます.
目的地に到着したユーザーには通知を送信して,Websocket通信の接続を切ります.
ソースコードの説明
今回はwebsocket.py
という名でrouterを作成しました!
全体像
ソースコード全体はこんな感じです!
from fastapi import APIRouter,WebSocket
from typing import List, Dict
from datetime import datetime, timezone
import json
def get_websocket_router(supabase_url: str):
router = APIRouter(prefix="/ws", tags=["Websocket"])
@router.websocket("/ranking/{event_id}")
async def websocket_endpoint(event_id:int,websocket: WebSocket):
await websocket.accept()
connected_client: Dict[int, WebSocket] = {}
user_location: Dict[int, Location] = {}
event_deadline_time = calculate_deadline(event_id)
event_start_time = get_start_time(event_id)
try:
await websocket.send_text(json.dumps({
"action": "connected",
"message": "WebSocket connection established"
}))
while True:
now = datetime.now(timezone.utc)
if now >= event_deadline_time:
finish_message = {
"action": "event_finished",
"message": "イベントは終了しました。"
}
delete_all_distance() #データベース内の位置情報の削除
#接続の切断
connected_client.values().send_text(json.dumps(finish_message))
connected_client.values().close()
connected_client.clear()
break
data = await websocket.receive_text()
message = json.loads(data)
notified_users: set[int] = set()
if message["action"] == "update_location":
user_id = message["user_id"]
connected_clients[user_id] = websocket
latitude = float(message["latitude"])
longitude = float(message["longitude"])
user_location[user_id] = Location(latitude=latitude, longitude=longitude)
distance = calculate_distance(event_id, user_locations[user_id])
if is_distance_present(user_id) == True:
update_distance(distance,user_id)
else:
insert_distance(distance,user_id)
await send_ranking(websocket)
elif message["action"] == "get_ranking":
await send_ranking(websocket)
elif message["action"] == "arrival_notification":
finish_message = FinishMessage(
action=message["action"],
user_id=message['user_id'],
arrival_time=message['arrival_time']
)
delete_distance(finish_message.user_id)
await send_ranking(websocket)
add_arrival_time(finish_message.user_id, finish_message.arrival_time, event_id)
aliase_id = profile_service.judge_aliase(finish_message.user_id)
update_aliase_id(finish_message.user_id,aliase_id)
send_renew_aliase(finish_message.user_id)
except Exception as e:
print(f"WebSocket error: {e}")
return router
...といっても分かりにくい(綺麗にコードかけ)と思うので,要点ごとにまとめて解説を行おうと思います!
import文
Websocket通信を行うルーター,エンドポイントを作るのに必要なライブラリをインポートします.
from fastapi import APIRouter, WebSocket
ルーターを作成する
今回僕はSupabaseを用いたので,引数にurlをとりました.
def get_websocket_router(supabase_url: str):
router = APIRouter(prefix="/ws", tags=["Websocket"])
エンドポイントの作成
FastAPIでWebsocket通信のエンドポイントを作成するには次のように書きます!
REST とそんなに変わらないんです!簡単ですね!今回はevent_idをエンドポイントに含ませています.
@router.websocket("/ranking/{event_id}")
async def websocket_endpoint(event_id:int,websocket: WebSocket):
Websocket通信を行うにあたっての準備
await websocket.accept()
connected_client: Dict[int, WebSocket] = {}
user_location: Dict[int, Location] = {}
WebSocket通信は初期段階でHTTPリクエストからプロトコルを切り替える「ハンドシェイク」が必要なんです!この処理が完了された後にaccept()
を使って接続を確立します!
また,通信を行うにあたって,準備として通信をしているクライアントの情報を保持するため辞書を作っています.これを用意することで,接続しているクライアント内でランキングを作ることができたり,辞書から指定したクライアントを削除することで接続を切ったりすることができます!
⚠️この辞書は接続しているクライアントの辞書ではありません!サーバーとクライアントは1対一の通信を行なっているため,他のクライアントの情報が載ることはありません!
ですので,もし今回のようにランキングを作るとなるとDBに入れて取ってきてソートするという形にする必要があります!
Websocket接続の初期メッセージの送信
前提としてですが,action
に入っている内容で識別してクライアント側が処理を書けるようにしています!
await websocket.send_text(
json.dumps(
{
"action": "connected",
"message": "WebSocket connection established"
}
)
)
このメッセージが送信されてからWebsocket通信が始まります!サーバー側からメッセージを送信するにはsend_text()
を用います!
メインループの開始
while True:
now = datetime.now(timezone.utc)
接続されている間はメインループが回り続けています.
常時クライアントからのメッセージを受信して,それに応じた処理を実行するようにします.
このようにすることでクライアント側からの受信に対応することができます.
now
はイベント終了時刻になったかどうかを判定するために使います!後ほど出てきます!
サーバー側からの能動的な送信(イベント終了時の処理)
Websocket通信の雛形とも言えるサーバー側からの通信です!今回はある時刻になるとサーバー側から処理を行ってメッセージを送信する実装を行いました!
if now >= event_deadline_time:
finish_message = {
"action": "event_finished",
"message": "イベントは終了しました。"
}
delete_all_distance() #データベース内の位置情報の削除
#接続の切断
connected_clients.values().send_text(json.dumps(finish_message))
connected_clients.values().close()
connected_client.clear()
break
現在時刻と終了時刻を比較して,終了時刻に達していれば今接続されているクライアントとの接続を切流処理です.先ほど用意した辞書に入っているクライアントに対してメッセージをサーバー側から送信し,接続を切っています.
データベースを用いてクライアントの位置情報を扱っているので,イベント終了と同時にその位置情報もここで削除します!
クライアント側からのメッセージの受信
次にクライアント側からのメッセージを受け取るコードを書いていきましょう!
ここで,どのような工程で受信して処理を行うか先に述べておきます!
- クライアント側からのデータを非同期で受信
-
action
の中身で条件分岐を行う - クライアント側の要求に対しての処理を行う
簡単に述べるとこんな感じです!もしかするともっといい方法があるのかもしれませんが,今回はこのような内容で実装しました!
それではコードを見ていきましょう!
data = await websocket.receive_text() #メッセージの受信
message = json.loads(data)
WebSocket接続を通じてクライアントから送信されたテキストデータを非同期的に受信します!data
に格納されたテキストデータはreceive_text()
によってstr
型で受け取ります!素敵ですね!
if message["action"] == "update_location":
user_id = message["user_id"]
connected_clients[user_id] = websocket
latitude = float(message["latitude"])
longitude = float(message["longitude"])
user_locations[user_id] = Location(latitude=latitude, longitude=longitude)
distance = calculate_distance(event_id, user_locations[user_id]) #目的地までの距離を計算
#データベースに位置情報を格納
if is_distance_present(user_id) == True:
update_distance(distance,user_id)
else:
insert_distance(distance,user_id)
await send_ranking(websocket) #自分の位置情報を更新したので新しいランキング情報を送信
elif message["action"] == "get_ranking":
await send_ranking(websocket) #最新のランキングを送信
elif message["action"] == "arrival_notification":
finish_message = FinishMessage(
action=message["action"],
user_id=message['user_id'],
arrival_time=message['arrival_time']
)
delete_distance(finish_message.user_id) #位置情報をデータベースから削除
await websocket_service.send_ranking(websocket)
add_arrival_time(user_id, arrival_time, event_id) #到着時間をデータベースに格納
return router
if文で条件分岐を行って受信したデータを元に処理を行っています!サーバー側からデータを送信するときは非同期処理にしています!
ルーターをmain.py
に追加する
最後の仕上げです!作成したrouterをmain.py
に追加しましょう!
from fastapi import FastAPI
from routers.websocket import get_websocket_router
app = FastAPI()
supabase_url = os.getenv('SUPABASE_URL')
app.include_router(get_websocket_router(supabase_url))
最後に
改善点は多々あると思いますが,FastAPIを用いてWebsocket通信を実装することができました!リアルタイムな双方向の通信はものすごく魅力的なので使える幅は広いと思います!ぜひ使ってみてください!
参考