0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python】asyncioで排他制御を行う

Posted at

はじめに

asyncio を使用しているとき、インスタンス変数などクラスの共有リソースを各並列処理タスクが更新する場合があります。その際、同時に更新されると競合状態(Race Condition)が発生し不整合を引き起こします。これを防ぐためにあるタスクが共有リソースを更新する際、他のタスクが更新できないようにロックを行うことができます。

サンプルコード

以下はフェッチタスクを100個並列で作成し、各タスクがフェッチ回数をインクリメントするコードです。同時にフェッチ回数が更新されないように、更新中は with 構文による asyncio.Lock() を使用します。

import asyncio
import logging

import httpx

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logging.getLogger("httpx").setLevel(logging.INFO)
logger = logging.getLogger(__name__)

TASK_COUNT = 100

class Fetcher:
    def __init__(self):
        self.client = self._create_client()
        self.fetch_count = 0
        self._lock = asyncio.Lock()

    def _create_client(self):
        """httpxクライアントを作成する"""
        client = httpx.AsyncClient()
        return client

    async def close_connection(self):
        """クライアントの接続を閉じる"""
        await self.client.aclose()
    
    async def _increment_count(self):
        """フェッチ回数をインクリメントする"""
        # インクリメント中は排他的アクセスを行う
        async with self._lock:
            # NOTE: 通常以下のコードを使用する
            # self.fetch_count += 1

            # NOTE: 排他制御の確認のため、以下のコードを使用する
            tmp = self.fetch_count
            await asyncio.sleep(0)
            self.fetch_count = tmp + 1

    async def fetch(self, url):
        """指定されたURLにGETリクエストを送信する"""
        try:
            response = await self.client.get(url)
            await self._increment_count()

            response.raise_for_status()

        except httpx.HTTPError as e:
            logger.error(f"エラーが発生しました。")

async def main():
    fetcher = Fetcher()
    url = "https://httpbin.org/get"

    # GETリクエストタスクをスケジューリング
    tasks = [
        asyncio.create_task(fetcher.fetch(url))
        for _ in range(TASK_COUNT)
    ]

    # タスクを並列実行
    await asyncio.gather(*tasks)

    await fetcher.close_connection()

    logger.info(f"フェッチ回数: {fetcher.fetch_count}")

if __name__ == "__main__":
    asyncio.run(main())

結果

100回になりました。

2025-08-10 09:58:18,271 - [INFO] - フェッチ回数: 100

排他制御が行われないサンプルコード

_increment_count メソッドをロックが実装されていない以下のコードに置き換えます。

    async def _increment_count(self):
        """フェッチ回数をインクリメントする"""
        # NOTE: 通常以下のコードを使用する
        # self.fetch_count += 1

        # NOTE: 排他制御の確認のため、以下のコードを使用する
        tmp = self.fetch_count
        await asyncio.sleep(0)
        self.fetch_count = tmp + 1

結果

100回ではなく45回となりました。

2025-08-10 09:58:30,087 - [INFO] - フェッチ回数: 45

解説

  1. タスクA: self.fetch_count が 0 の時に値を読み込み、tmp に 0 を保持する
  2. タスクA: await で処理を中断し、イベントループはタスクBに制御を渡す
  3. タスクB: self.fetch_count はまだ 0 なので、値を読み込み、tmp に 0 を保持する
  4. タスクB: 処理を最後まで実行し、self.fetch_count0 + 1 で 1 に更新する
  5. その後、制御がタスクAに戻る
  6. タスクA: 自身の tmp(値は 0 )を使って処理を再開し、self.fetch_count0 + 1 で 1 に更新する

この結果、2つのタスクが実行されたにもかかわらず、fetch_count は 2 ではなく 1 になってしまいます。これが、更新が失われる「競合状態」の正体です。asyncio.Lock() は、この一連の処理(読み込み〜書き込み)が完了するまで、他のタスクが割り込むことを防ぐ役割を果たします。

ここでは、競合状態を意図的に発生させるために await asyncio.sleep(0) を入れています。これは「現在のタスクを一度中断し、他の待機中タスクに実行機会を与える」という働きをします。これにより、値を読み取ってから書き込むまでの間に、他のタスクが割り込む状況を擬似的に作り出しています。

参考

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?