はじめに
リアルタイム性が求められる Web アプリケーションにおいて、クライアントに対して継続的にデータを送信するストリーミングサーバを実装する方法があります。この記事では、Python の非同期 Web フレームワークである aiohttp を使用して、シンプルなストリーミングサーバとクライアントを実装する方法を解説します。
検証に使用した環境は以下のとおりです。
Python: 3.10.12
aiohttp: 3.12.4
実装
サーバ側
StreamResponse
を使いレスポンスオブジェクトを生成します。
await response.prepare(request)
を実行することで接続を確立します。
await response.write
でクライアントにレスポンスを返します。
stream_handler
関数は引数に request
を受け取るようになっています。これは aiohttp のルーティングシステムが自動生成・注入してくれるものであり、明示的に渡す必要はありません。
import asyncio
import logging
import json
from aiohttp import web
from aiohttp.client_exceptions import ClientConnectionError, ClientConnectionResetError
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
async def stream_handler(request):
# クエリパラメータ取得
name = request.rel_url.query.get('name', 'Anonymous')
client_ip = request.remote
logger.info(f"接続を開始しました。クライアントIP={client_ip}, name={name}")
response = web.StreamResponse(
status=200,
headers={
'Content-Type': 'application/json',
}
)
await response.prepare(request)
counter = 0
try:
while True:
data = {
'message': f'Hello {name}, this is {counter} time stream message.',
'timestamp': asyncio.get_event_loop().time()
}
json_data = json.dumps(data) + '\n'
await response.write(json_data.encode('utf-8'))
await asyncio.sleep(2)
counter += 1
except (ClientConnectionError, ClientConnectionResetError, ConnectionResetError) as e:
logger.info(f"クライアント{name} ({client_ip}) との接続が切れました。{type(e).__name__}: {repr(e)}")
except asyncio.CancelledError as e:
logger.info(f"タスクがキャンセルされました。{type(e).__name__}: {repr(e)}")
except Exception as e:
logger.error(f"想定外のエラーが発生しました。{type(e).__name__}: {repr(e)}")
finally:
try:
await response.write_eof()
except Exception:
pass # クライアント切断時の write_eof 失敗は無視
return response
def main():
app = web.Application()
app.router.add_get('/stream', stream_handler)
web.run_app(app, port=8080)
if __name__ == '__main__':
main()
クライアント側
クライアントは非同期でサーバに接続し、JSON データを受信します。
async for line in response.content:
を使うことで、レスポンスが返ってくるごとに処理が可能です。
ストリーミング接続を想定しているため、timeout の total
と sock_read
に None
を渡します。これで接続が永続化します。
import aiohttp
import asyncio
import json
import logging
from aiohttp.client_exceptions import ClientConnectorError
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)
async def stream_client():
name = "Alice"
url = f'http://localhost:8080/stream?name={name}'
try:
timeout = aiohttp.ClientTimeout(
total=None,
connect=20,
sock_connect=20,
sock_read=None
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
logger.info("接続成功。ストリーム受信中...")
async for line in response.content:
try:
data = json.loads(line.decode())
logger.info(f"[{data['timestamp']:.2f}] {data['message']}")
except json.JSONDecodeError:
logger.error(f"JSONの読み込みに失敗しました: {line}")
except aiohttp.ClientPayloadError as e:
logger.error(f"リクエストエラーが発生しました。{type(e).__name__}: {repr(e)}")
except ClientConnectorError as e:
logger.error(f"接続に失敗しました。{type(e).__name__}: {repr(e)}")
except Exception as e:
logger.error(f"想定外のエラーが発生しました。{type(e).__name__}: {e}")
def main():
asyncio.run(stream_client())
if __name__ == '__main__':
main()
実行結果
サーバ側
======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
2025-06-08 09:48:47,646 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice
2025-06-08 09:48:51,651 - [INFO] - クライアントAlice (127.0.0.1) との接続が切れました。ClientConnectionResetError: ClientConnectionResetError('Cannot write to closing transport')
2025-06-08 09:48:51,651 - [INFO] - 127.0.0.1 [08/Jun/2025:09:48:47 +0900] "GET /stream?name=Alice HTTP/1.1" 200 0 "-" "Python/3.10 aiohttp/3.12.4"
2025-06-08 09:48:52,283 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice
2025-06-08 09:51:24,509 - [INFO] - クライアントAlice (127.0.0.1) との接続が切れました。ClientConnectionResetError: ClientConnectionResetError('Cannot write to closing transport')
2025-06-08 09:51:24,511 - [INFO] - 127.0.0.1 [08/Jun/2025:09:48:52 +0900] "GET /stream?name=Alice HTTP/1.1" 200 0 "-" "Python/3.10 aiohttp/3.12.4"
2025-06-08 09:51:24,687 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice
クライアント側
2025-06-08 09:55:18,008 - [INFO] - 接続成功。ストリーム受信中...
2025-06-08 09:55:18,008 - [INFO] - [2765.24] Hello Alice, this is 0 time stream message.
2025-06-08 09:55:20,011 - [INFO] - [2767.24] Hello Alice, this is 1 time stream message.
2025-06-08 09:55:22,014 - [INFO] - [2769.24] Hello Alice, this is 2 time stream message.
2025-06-08 09:55:24,017 - [INFO] - [2771.25] Hello Alice, this is 3 time stream message
また、curl の実行結果も確認します。
$ curl http://localhost:8080/stream?name=Bob
{"message": "Hello Bob, this is 0 time stream message.", "timestamp": 4522.007941714}
{"message": "Hello Bob, this is 1 time stream message.", "timestamp": 4524.010992717}
{"message": "Hello Bob, this is 2 time stream message.", "timestamp": 4526.014284811}
{"message": "Hello Bob, this is 3 time stream message.", "timestamp": 4528.017576009}
参考