0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python】aiohttpでストリーミングサーバを動かす

Last updated at Posted at 2025-06-08

はじめに

リアルタイム性が求められる Web アプリケーションにおいて、クライアントに対して継続的にデータを送信するストリーミングサーバを実装する方法があります。この記事では、Python の非同期 Web フレームワークである aiohttp を使用して、シンプルなストリーミングサーバとクライアントを実装する方法を解説します。

検証に使用した環境は以下のとおりです。

Python: 3.10.12
aiohttp: 3.12.4

実装

サーバ側

StreamResponse を使いレスポンスオブジェクトを生成します。
await response.prepare(request) を実行することで接続を確立します。
await response.write でクライアントにレスポンスを返します。
stream_handler 関数は引数に request を受け取るようになっています。これは aiohttp のルーティングシステムが自動生成・注入してくれるものであり、明示的に渡す必要はありません。

import asyncio
import logging
import json
from aiohttp import web
from aiohttp.client_exceptions import ClientConnectionError, ClientConnectionResetError

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)

async def stream_handler(request):
    # クエリパラメータ取得
    name = request.rel_url.query.get('name', 'Anonymous')
    client_ip = request.remote
    logger.info(f"接続を開始しました。クライアントIP={client_ip}, name={name}")

    response = web.StreamResponse(
        status=200,
        headers={
            'Content-Type': 'application/json',
        }
    )
    await response.prepare(request)

    counter = 0
    try:
        while True:
            data = {
                'message': f'Hello {name}, this is {counter} time stream message.',
                'timestamp': asyncio.get_event_loop().time()
            }
            json_data = json.dumps(data) + '\n'

            await response.write(json_data.encode('utf-8'))
            await asyncio.sleep(2)
            counter += 1

    except (ClientConnectionError, ClientConnectionResetError, ConnectionResetError) as e:
        logger.info(f"クライアント{name} ({client_ip}) との接続が切れました。{type(e).__name__}: {repr(e)}")
    except asyncio.CancelledError as e:
        logger.info(f"タスクがキャンセルされました。{type(e).__name__}: {repr(e)}")
    except Exception as e:
        logger.error(f"想定外のエラーが発生しました。{type(e).__name__}: {repr(e)}")
    finally:
        try:
            await response.write_eof()
        except Exception:
            pass  # クライアント切断時の write_eof 失敗は無視

    return response

def main():
    app = web.Application()
    app.router.add_get('/stream', stream_handler)
    web.run_app(app, port=8080)

if __name__ == '__main__':
    main()

クライアント側

クライアントは非同期でサーバに接続し、JSON データを受信します。
async for line in response.content: を使うことで、レスポンスが返ってくるごとに処理が可能です。
ストリーミング接続を想定しているため、timeout の totalsock_readNone を渡します。これで接続が永続化します。

import aiohttp
import asyncio
import json
import logging
from aiohttp.client_exceptions import ClientConnectorError

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logger = logging.getLogger(__name__)

async def stream_client():
    name = "Alice"
    url = f'http://localhost:8080/stream?name={name}'

    try:
        timeout = aiohttp.ClientTimeout(
            total=None,
            connect=20,
            sock_connect=20,
            sock_read=None
        )
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as response:
                logger.info("接続成功。ストリーム受信中...")

                async for line in response.content:
                    try:
                        data = json.loads(line.decode())
                        logger.info(f"[{data['timestamp']:.2f}] {data['message']}")
                    except json.JSONDecodeError:
                        logger.error(f"JSONの読み込みに失敗しました: {line}")

    except aiohttp.ClientPayloadError as e:
        logger.error(f"リクエストエラーが発生しました。{type(e).__name__}: {repr(e)}")
    except ClientConnectorError as e:
        logger.error(f"接続に失敗しました。{type(e).__name__}: {repr(e)}")
    except Exception as e:
        logger.error(f"想定外のエラーが発生しました。{type(e).__name__}: {e}")

def main():
    asyncio.run(stream_client())

if __name__ == '__main__':
    main()

実行結果

サーバ側

======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
2025-06-08 09:48:47,646 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice
2025-06-08 09:48:51,651 - [INFO] - クライアントAlice (127.0.0.1) との接続が切れました。ClientConnectionResetError: ClientConnectionResetError('Cannot write to closing transport')
2025-06-08 09:48:51,651 - [INFO] - 127.0.0.1 [08/Jun/2025:09:48:47 +0900] "GET /stream?name=Alice HTTP/1.1" 200 0 "-" "Python/3.10 aiohttp/3.12.4"
2025-06-08 09:48:52,283 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice
2025-06-08 09:51:24,509 - [INFO] - クライアントAlice (127.0.0.1) との接続が切れました。ClientConnectionResetError: ClientConnectionResetError('Cannot write to closing transport')
2025-06-08 09:51:24,511 - [INFO] - 127.0.0.1 [08/Jun/2025:09:48:52 +0900] "GET /stream?name=Alice HTTP/1.1" 200 0 "-" "Python/3.10 aiohttp/3.12.4"
2025-06-08 09:51:24,687 - [INFO] - 接続を開始しました。クライアントIP=127.0.0.1, name=Alice

クライアント側

2025-06-08 09:55:18,008 - [INFO] - 接続成功。ストリーム受信中...
2025-06-08 09:55:18,008 - [INFO] - [2765.24] Hello Alice, this is 0 time stream message.
2025-06-08 09:55:20,011 - [INFO] - [2767.24] Hello Alice, this is 1 time stream message.
2025-06-08 09:55:22,014 - [INFO] - [2769.24] Hello Alice, this is 2 time stream message.
2025-06-08 09:55:24,017 - [INFO] - [2771.25] Hello Alice, this is 3 time stream message

また、curl の実行結果も確認します。

$ curl http://localhost:8080/stream?name=Bob
{"message": "Hello Bob, this is 0 time stream message.", "timestamp": 4522.007941714}
{"message": "Hello Bob, this is 1 time stream message.", "timestamp": 4524.010992717}
{"message": "Hello Bob, this is 2 time stream message.", "timestamp": 4526.014284811}
{"message": "Hello Bob, this is 3 time stream message.", "timestamp": 4528.017576009}

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?