はじめに
Pythonで使えるWebSocketのライブラリといえばwebsocketsです。個人的にwebsocketsはシンプルで使いやすく気に入っています。一方でシンプルすぎて何やっているのかよく分からない(ブラックボックスな)部分もあるなと感じています。そういったブラックボックスな部分をクリアにするために、websocketsに関する疑問点をまとめてみました。
WebSocketとは
本稿ではWebSocket関する説明は省略します。気になる方は以下を参考にしてください。
Python websockets
websockets is a library for building WebSocket servers and clients in Python with a focus on correctness and simplicity.
和訳:websocketsとはPythonでWebSocketのサーバとクライアントを構築するための正確さとシンプルさを重視ライブラリです。
以下にechoサーバーの例を示します。確かにシンプルに記述できます。
#!/usr/bin/env python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
#!/usr/bin/env python
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
print(await websocket.recv())
asyncio.run(hello())
server.py
が起動している状態で、client.py
を実行すると"Hello world!"
が返ってきます。普通のechoサーバーです。
(Python==3.7.0, websockets==9.1)
python server.py
python client.py
Hello world!
疑問点
上記に記述したechoサーバーに関する疑問点と自分なりの回答です。個人的な予想もたくさん含まれるので参考程度にしてください。
なぜasyncio(async/await)を使っているの?
asyncioはasync/await構文を使いノンブロッキング処理を実装するためのライブラリです。ノンブロッキング処理は、I/Oの処理を待たずに次の処理を行う処理方法です(マルチスレッド、マルチプロセスとは異なるという点に注意です)。WebsocketにおけるI/Oの処理はネットワークI/O(Websocketのコネクション生成、メッセージの送信・受信)が該当します。ノンブロッキング処理を行うことによって、一つのプロセスで複数のクライアントとコネクションを張れます。ブロッキング処理だと一つのクライアントとしかコネクションが張れません(もしくはクライアントの数だけプロセスを立ち上げることになります)。また、ノンブロッキング処理によってPCのリソースを効率的に活用できます。
asyncioを用いてノンブロッキング処理を行う際に重要となるキーワードがコルーチンです。サブルーチンはエントリポイントが一つであり、エントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはエントリポイントが複数あり、いったん処理を中断しても続きから処理を再開できます。コルーチンはサブルーチンを一般化したものと考えることもできます。サブルーチンのみではブロッキング処理しか記述できませんが、コルーチンを用いることでノンブロッキング処理を記述できるようになります。実際にwebsocketsはコルーチンを使用しています。以下の図はクライアント側の接続ライフサイクルの各状態において、どのコルーチンが動作しているかを示しています。
コルーチンオブジェクト(関数)を作成するためには、async def
を宣言に使います。生成されたコルーチンオブジェクトを実行するためにはawait
する必要があります。ちなみにawait
できるオブジェクトをawaitableオブジェクトと呼び、コルーチン以外にもTaskやFutureなどがあります(asyncioのAPIを使っていると勝手にコルーチンオブジェクトがFutureオブジェクトに変換されたりします。ややこしい、、、)。
結論、asyncioはWebsocketのネットワークI/Oをノンブロッキング処理するために使われているのだと思います。ノンブロッキング処理を行うことによって、一つのプロセスで複数のクライアントとコネクションを張れたり、PCのリソースを効率的に活用できます。その際にコルーチンを使用する必要があったため、async/await構文が記述されていたのだと思います。
永続通信?
永続通信です。なぜなら、メッセージが受信側に届くことは、ライブラリが保証しているからです。ちなみに永続通信とは、メッセージが受信側に届くまで通信システムが保持しつづける通信のことです。
下の図は、ウェブソケット上に構築されたアプリケーションとリモートエンドポイントの間でデータがどのように流れるのかを示しています。これはサーバー側とクライアント側のどちらにも当てはまります。受信部を見てみるとメッセージがメッセージキューに入っていることがわかります。recv() はメッセージキューから次のメッセージをフェッチしていることがわかります。これならメッセージの受信を失敗したりしなさそうです。
永続通信できているか実験をしてみます。以下のコードはクライアントが2つのメッセージを送信して、echoサーバーがそのメッセージをそのままクライアントに送り返します。クライアントは送信の5秒後にメッセージをrecv()し標準出力します。
#!/usr/bin/env python
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world! 1")
await websocket.send("Hello world! 2")
await asyncio.sleep(5.0)
print(await websocket.recv())
print(await websocket.recv())
asyncio.run(hello())
server.py
が起動している状態で、client.py
を実行すると5秒後に"Hello world! 1"
と"Hello world! 2"
が出力されました。永続通信できてそうです。
python server.py
python client.py
Hello world! 1
Hello world! 2
このことから、私たちはメッセージの送信や受信のタイミングを過度に心配する必要がなくなりました!
サーバーのメッセージ受信はなんでループ?
以下の部分のことです。
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
結論としては、接続ハンドラは1つのメッセージを処理した後に終了するため、複数のメッセージを処理するループを書く必要があるからです。
試しにループなしで受信してみます。
#!/usr/bin/env python
import asyncio
import websockets
async def echo(websocket, path):
message = await websocket.recv()
await websocket.send(message)
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
python server_no_loop.py
クライアントから1つのメッセージを送信してみます。
#!/usr/bin/env python
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world! 1")
print(await websocket.recv())
asyncio.run(hello())
python client_single_msg.py
Hello world! 1
一つのメッセージなら、問題ないです。
続いて、クライアントから2つのメッセージを送信してみます。
#!/usr/bin/env python
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world! 1")
await websocket.send("Hello world! 2")
print(await websocket.recv())
print(await websocket.recv())
asyncio.run(hello())
python client_multi_msg.py
Hello world! 1
Traceback (most recent call last):
...
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason
2つ以上のメッセージではエラーが発生します。1つ目のメッセージを受信した段階でコネクションが閉じているので、2つ目以降のメッセージは受信できません。
サーバーのawait asyncio.Future()
って何?
永久ループにするために記述されています。ちなみにasyncio.run
は最上位のエントリーポイントのコルーチンを実行するための関数です。高レベルAPIなのでよく使います。
async def main():
async with websockets.serve(echo, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
以下のように書き換えることも可能です。ちなみにasyncio.get_event_loop()
は低レベルAPIなので直接触ることは少ないらしいです。
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
最後に
間違っている箇所があったら、コメントでご指摘をお願いします。
参考
- https://qiita.com/south37/items/6f92d4268fe676347160
- https://youtu.be/8ARodQ4Wlf4
- https://websockets.readthedocs.io/en/stable/index.html
- https://docs.python.org/ja/3/library/asyncio.html
- https://qiita.com/icoxfog417/items/07cbf5110ca82629aca0
- https://qiita.com/maueki/items/8f1e190681682ea11c98
- https://tech.morikatron.ai/entry/2020/07/20/100000
- http://www-higashi.ist.osaka-u.ac.jp/~nakata/mobile-cp/chap-02j-2-6up.pdf