はじめに
メッセージをポーリングしながらほかの処理を動かし、メッセージ受信をトリガーに別の処理を動かすというケースはよくあります。本記事では asyncio を用いてこのような並列処理のデモを行います。
デモの内容
以下を想定します。
- ある API から長時間レスポンスを受信し続ける(以降リクエストタスクと呼ぶ)
- SQS にポーリングを行い、メッセージを受信したらリクエストタスクを再起動する(以降ポーリングタスクと呼ぶ)
実装
構成
次の 2 つのコンポーネントを構成します。
- タスクマネージャー(リクエストタスク、リクエストタスクの再起動を管理)
- ポーリングタスク
これらを asyncio で並列に動かします。
各コンポーネントの解説とコード
タスクマネージャー
import asyncio
import logging
import aiohttp
class RequestTaskManager:
def __init__(self, users: list[str]):
self.users: list[str] = users # リクエストを行うユーザーを格納する配列
self.req_tasks: list[asyncio.Task] = [] # asyncio の Task を格納する配列
self.execution_count: int = 1 # タスク実行回数
async def _req_aiohttp(self, name: str, n: int):
"""リクエストを実行する"""
while True:
logger.info(f"[リクエスト] {n}回目の起動タスク: {name} リクエストを開始します。")
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/delay/3") as response:
logger.info(f"[リクエスト] {n}回目の起動タスク: {name} Status is {response.status}")
logger.info(f"[リクエスト] {n}回目の起動タスク: {name} リクエストが完了しました。")
await asyncio.sleep(1)
async def start_tasks(self):
"""リクエストタスクを起動する"""
logger.info(f"[タスクマネージャー] {self.execution_count}回目のリクエストタスクを起動します。")
self.req_tasks = [
asyncio.create_task(self._req_aiohttp(user, self.execution_count))
for user in self.users
]
await asyncio.sleep(0)
async def cancel_tasks(self):
"""リクエストタスクを停止する"""
for task in self.req_tasks:
task.cancel()
await asyncio.gather(*self.req_tasks, return_exceptions=True)
self.req_tasks = []
async def reset_tasks(self):
"""リクエストタスクをリセットする"""
logger.info("[タスクマネージャー] リクエストタスクをリセットします。")
# すべてのタスクが停止するまで待機
await self.cancel_tasks()
self.execution_count += 1
# リクエストタスクを常駐化
await self.start_tasks()
リクエストタスクの開始、停止、再起動を管理します。
-
start_tasks()
でリクエストタスクを開始 -
cancel_tasks()
でリクエストタスクを停止 -
reset_tasks()
でstart_tasks()
とcancel_tasks()
を実行(再起動)
メッセージ受信
import boto3
from botocore.exceptions import ClientError
boto3_session = boto3.Session(profile_name=PROFILE_NAME)
sqs_client = boto3_session.client("sqs", region_name="ap-northeast-1")
def _receive_message():
"""SQSからメッセージを受信する"""
try:
res = sqs_client.receive_message(QueueUrl=QUEUE_URL)
if 'Messages' in res:
messages = res['Messages']
for i, message in enumerate(messages):
logger.info(f"[SQSクライアント] {i + 1}件目のメッセージ: {message['Body']}")
return True
else:
logger.info("[SQSクライアント] 受信したメッセージはありません。")
return False
except ClientError as e:
logger.error(f"[SQSクライアント] メッセージの受信に失敗しました: {e}")
raise e
boto3 を使って SQS キューをポーリングします。
メッセージの受信有無をポーリングタスクに伝えます。
メッセージを受信したら削除する必要がありますが、ここでは未実装です。
ポーリングタスク
async def polling(task_manager: RequestTaskManager):
"""ポーリングしてメッセージを受信したらリクエストタスクをリセット"""
logger.info("[ポーリング] ポーリングタスクを起動します。")
m = 1
while True:
logger.info(f"[ポーリング] {m}回目のポーリングを開始します。")
is_receive_message = _receive_message()
if is_receive_message:
logger.info("[ポーリング] メッセージを受信しました。")
# タスクをリセットするまで待機
await task_manager.reset_tasks()
await asyncio.sleep(POLLING_INTERVAL)
m += 1
ポーリング実行を常駐化するためのタスクです。
メッセージを受信したらリクエストタスクを再起動するため、タスクマネージャーを引数で受け取ります。
エントリーポイント
async def main():
users = ["Alice", "Bob"]
# タスクマネージャーを起動
task_manager = RequestTaskManager(users)
# タスクを開始(リクエストタスクを常駐化)
await task_manager.start_tasks()
# ポーリングを開始(ポーリングタスクを常駐化)
await polling(task_manager)
タスクマネージャーとポーリングタスクを起動します。
2つともで await
実行することで並列処理が実現します。
ポイント
- 他の処理と並列で動かしたい処理は
asyncio.create_task
で実行する。このときawait
を付与するかどうかは、処理完了を待ちたいかどうかによる - 複数の非同期処理の完了を待ちたいときは
await asyncio.gather
で待機 -
await asyncio.sleep(0)
ですぐにタスクを実行する
実行結果
リクエストタスクとポーリングタスクが並列で動いています。
メッセージを受信した場合、リクエストタスクがリセットされたことを確認できました。
2025-06-07 12:07:20,265 - [INFO] - [タスクマネージャー] 1回目のリクエストタスクを起動します。
2025-06-07 12:07:20,265 - [INFO] - [リクエスト] 1回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:20,266 - [INFO] - [リクエスト] 1回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:20,266 - [INFO] - [ポーリング] ポーリングタスクを起動します。
2025-06-07 12:07:20,266 - [INFO] - [ポーリング] 1回目のポーリングを開始します。
2025-06-07 12:07:20,271 - [INFO] - Loading cached SSO token for my-sso
2025-06-07 12:07:21,240 - [INFO] - [SQSクライアント] 1件目のメッセージ: テストメッセージ
2025-06-07 12:07:21,240 - [INFO] - [ポーリング] メッセージを受信しました。
2025-06-07 12:07:21,240 - [INFO] - [タスクマネージャー] リクエストタスクをリセットします。
2025-06-07 12:07:21,241 - [INFO] - [タスクマネージャー] 2回目のリクエストタスクを起動します。
2025-06-07 12:07:21,241 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:21,241 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:25,042 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:25,112 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:25,235 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:25,306 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:26,239 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:26,307 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:30,207 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:30,397 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:30,654 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:30,844 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:31,243 - [INFO] - [ポーリング] 2回目のポーリングを開始します。
2025-06-07 12:07:31,330 - [INFO] - [SQSクライアント] 受信したメッセージはありません。
2025-06-07 12:07:31,400 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:31,846 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:35,446 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:35,637 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:36,639 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:37,248 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:37,438 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:38,440 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:40,463 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:40,655 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:41,332 - [INFO] - [ポーリング] 3回目のポーリングを開始します。
2025-06-07 12:07:41,445 - [INFO] - [SQSクライアント] 受信したメッセージはありません。
2025-06-07 12:07:41,657 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:42,335 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:42,532 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:43,534 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:46,193 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:46,387 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:47,386 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:47,389 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:47,576 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:48,579 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:51,450 - [INFO] - [ポーリング] 4回目のポーリングを開始します。
2025-06-07 12:07:51,492 - [INFO] - [SQSクライアント] 1件目のメッセージ: テストメッセージ
2025-06-07 12:07:51,492 - [INFO] - [ポーリング] メッセージを受信しました。
2025-06-07 12:07:51,493 - [INFO] - [タスクマネージャー] リクエストタスクをリセットします。
2025-06-07 12:07:51,493 - [INFO] - [タスクマネージャー] 3回目のリクエストタスクを起動します。
参考