0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python】asyncioでSQSのポーリングタスクを並列で動かしてみた

Posted at

はじめに

メッセージをポーリングしながらほかの処理を動かし、メッセージ受信をトリガーに別の処理を動かすというケースはよくあります。本記事では asyncio を用いてこのような並列処理のデモを行います。

デモの内容

以下を想定します。

  • ある API から長時間レスポンスを受信し続ける(以降リクエストタスクと呼ぶ)
  • SQS にポーリングを行い、メッセージを受信したらリクエストタスクを再起動する(以降ポーリングタスクと呼ぶ)

実装

構成

次の 2 つのコンポーネントを構成します。

  1. タスクマネージャー(リクエストタスク、リクエストタスクの再起動を管理)
  2. ポーリングタスク

これらを asyncio で並列に動かします。

各コンポーネントの解説とコード

タスクマネージャー

import asyncio
import logging

import aiohttp

class RequestTaskManager:
    def __init__(self, users: list[str]):
        self.users: list[str] = users  # リクエストを行うユーザーを格納する配列
        self.req_tasks: list[asyncio.Task] = []  # asyncio の Task を格納する配列
        self.execution_count: int = 1  # タスク実行回数

    async def _req_aiohttp(self, name: str, n: int):
        """リクエストを実行する"""
        while True:
            logger.info(f"[リクエスト] {n}回目の起動タスク: {name} リクエストを開始します。")
            async with aiohttp.ClientSession() as session:
                async with session.get("https://httpbin.org/delay/3") as response:
                    logger.info(f"[リクエスト] {n}回目の起動タスク: {name} Status is {response.status}")
            logger.info(f"[リクエスト] {n}回目の起動タスク: {name} リクエストが完了しました。")
            await asyncio.sleep(1)

    async def start_tasks(self):
        """リクエストタスクを起動する"""
        logger.info(f"[タスクマネージャー] {self.execution_count}回目のリクエストタスクを起動します。")

        self.req_tasks = [
            asyncio.create_task(self._req_aiohttp(user, self.execution_count))
            for user in self.users
        ]

        await asyncio.sleep(0)

    async def cancel_tasks(self):
        """リクエストタスクを停止する"""
        for task in self.req_tasks:
            task.cancel()
        await asyncio.gather(*self.req_tasks, return_exceptions=True)
        self.req_tasks = []

    async def reset_tasks(self):
        """リクエストタスクをリセットする"""
        logger.info("[タスクマネージャー] リクエストタスクをリセットします。")

        # すべてのタスクが停止するまで待機
        await self.cancel_tasks()

        self.execution_count += 1

        # リクエストタスクを常駐化
        await self.start_tasks()

リクエストタスクの開始、停止、再起動を管理します。

  • start_tasks() でリクエストタスクを開始
  • cancel_tasks() でリクエストタスクを停止
  • reset_tasks()start_tasks()cancel_tasks() を実行(再起動)

メッセージ受信

import boto3
from botocore.exceptions import ClientError

boto3_session = boto3.Session(profile_name=PROFILE_NAME)
sqs_client = boto3_session.client("sqs", region_name="ap-northeast-1")

def _receive_message():
    """SQSからメッセージを受信する"""
    try:
        res = sqs_client.receive_message(QueueUrl=QUEUE_URL)
        if 'Messages' in res:
            messages = res['Messages']
            for i, message in enumerate(messages):
                logger.info(f"[SQSクライアント] {i + 1}件目のメッセージ: {message['Body']}")
            return True
        else:
            logger.info("[SQSクライアント] 受信したメッセージはありません。")
            return False
    except ClientError as e:
        logger.error(f"[SQSクライアント] メッセージの受信に失敗しました: {e}")
        raise e

boto3 を使って SQS キューをポーリングします。
メッセージの受信有無をポーリングタスクに伝えます。
メッセージを受信したら削除する必要がありますが、ここでは未実装です。

ポーリングタスク

async def polling(task_manager: RequestTaskManager):
    """ポーリングしてメッセージを受信したらリクエストタスクをリセット"""
    logger.info("[ポーリング] ポーリングタスクを起動します。")

    m = 1
    while True:
        logger.info(f"[ポーリング] {m}回目のポーリングを開始します。")
        is_receive_message = _receive_message()

        if is_receive_message:
            logger.info("[ポーリング] メッセージを受信しました。")

            # タスクをリセットするまで待機
            await task_manager.reset_tasks()

        await asyncio.sleep(POLLING_INTERVAL)
        m += 1

ポーリング実行を常駐化するためのタスクです。
メッセージを受信したらリクエストタスクを再起動するため、タスクマネージャーを引数で受け取ります。

エントリーポイント

async def main():
    users = ["Alice", "Bob"]

    # タスクマネージャーを起動
    task_manager = RequestTaskManager(users)

    # タスクを開始(リクエストタスクを常駐化)
    await task_manager.start_tasks()

    # ポーリングを開始(ポーリングタスクを常駐化)
    await polling(task_manager)

タスクマネージャーとポーリングタスクを起動します。
2つともで await 実行することで並列処理が実現します。

ポイント

  • 他の処理と並列で動かしたい処理は asyncio.create_task で実行する。このとき await を付与するかどうかは、処理完了を待ちたいかどうかによる
  • 複数の非同期処理の完了を待ちたいときは await asyncio.gather で待機
  • await asyncio.sleep(0) ですぐにタスクを実行する

実行結果

リクエストタスクとポーリングタスクが並列で動いています。
メッセージを受信した場合、リクエストタスクがリセットされたことを確認できました。

2025-06-07 12:07:20,265 - [INFO] - [タスクマネージャー] 1回目のリクエストタスクを起動します。
2025-06-07 12:07:20,265 - [INFO] - [リクエスト] 1回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:20,266 - [INFO] - [リクエスト] 1回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:20,266 - [INFO] - [ポーリング] ポーリングタスクを起動します。
2025-06-07 12:07:20,266 - [INFO] - [ポーリング] 1回目のポーリングを開始します。
2025-06-07 12:07:20,271 - [INFO] - Loading cached SSO token for my-sso
2025-06-07 12:07:21,240 - [INFO] - [SQSクライアント] 1件目のメッセージ: テストメッセージ
2025-06-07 12:07:21,240 - [INFO] - [ポーリング] メッセージを受信しました。
2025-06-07 12:07:21,240 - [INFO] - [タスクマネージャー] リクエストタスクをリセットします。
2025-06-07 12:07:21,241 - [INFO] - [タスクマネージャー] 2回目のリクエストタスクを起動します。
2025-06-07 12:07:21,241 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:21,241 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:25,042 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:25,112 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:25,235 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:25,306 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:26,239 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:26,307 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:30,207 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:30,397 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:30,654 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:30,844 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:31,243 - [INFO] - [ポーリング] 2回目のポーリングを開始します。
2025-06-07 12:07:31,330 - [INFO] - [SQSクライアント] 受信したメッセージはありません。
2025-06-07 12:07:31,400 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:31,846 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:35,446 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:35,637 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:36,639 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:37,248 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:37,438 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:38,440 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:40,463 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:40,655 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:41,332 - [INFO] - [ポーリング] 3回目のポーリングを開始します。
2025-06-07 12:07:41,445 - [INFO] - [SQSクライアント] 受信したメッセージはありません。
2025-06-07 12:07:41,657 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:42,335 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:42,532 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:43,534 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:46,193 - [INFO] - [リクエスト] 2回目の起動タスク: Bob Status is 200
2025-06-07 12:07:46,387 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストが完了しました。
2025-06-07 12:07:47,386 - [INFO] - [リクエスト] 2回目の起動タスク: Alice Status is 200
2025-06-07 12:07:47,389 - [INFO] - [リクエスト] 2回目の起動タスク: Bob リクエストを開始します。
2025-06-07 12:07:47,576 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストが完了しました。
2025-06-07 12:07:48,579 - [INFO] - [リクエスト] 2回目の起動タスク: Alice リクエストを開始します。
2025-06-07 12:07:51,450 - [INFO] - [ポーリング] 4回目のポーリングを開始します。
2025-06-07 12:07:51,492 - [INFO] - [SQSクライアント] 1件目のメッセージ: テストメッセージ
2025-06-07 12:07:51,492 - [INFO] - [ポーリング] メッセージを受信しました。
2025-06-07 12:07:51,493 - [INFO] - [タスクマネージャー] リクエストタスクをリセットします。
2025-06-07 12:07:51,493 - [INFO] - [タスクマネージャー] 3回目のリクエストタスクを起動します。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?