0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPIでWebSocketの超入門

Posted at

FastAPIを使って簡単にWebSocketで送受信する

websocketsをインストール

FastAPIでWebSocketを実装するために、websocketsをインストールする。

pip install websockets

やってみる

acceptで受信待ちして、クライアントと接続したら、receive_textsend_textで送受信する。

await websocket.accept()
while True:
    data = await websocket.receive_text()
    await websocket.send_text(f"Message text was: {data}")

コネクションが切断した場合は例外が発生する

コネクションが切断されると、receive_textメソッドでWebSocketDisconnectの例外が発生する。
相手がすでにコネクションを切断しているとsend_textでも同様に例外が発生する。

try:
    while True:
        response = await websocket.receive_text()
        logger.info(f"receive {response}")
        await asyncio.sleep(10)
        await websocket.send_text(f"Message text was {response}")
        logger.info("send message")
except WebSocketDisconnect:
    logger.error(traceback.format_exc())

パスやクエリー情報も取得可能

PathやQueryの情報も取得できる。

async def get_cookie_or_token(
    websocket: WebSocket,
    session: Union[str, None] = Cookie(default=None),
    token: Union[str, None] = Query(default=None),
):
    if session is None and token is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return session or token


@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    item_id: str,
    q: Union[int, None] = None,
    cookie_or_token: str = Depends(get_cookie_or_token),
):

最後に

FastAPIを使って簡単にサーバ側がWebSocketの実装ができることがわかった。FastAPIのページにセッションの管理の
サンプルも載っているので参考にしたらいいかと思う。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?