はじめに
asyncio を使用しているとき、インスタンス変数などクラスの共有リソースを各並列処理タスクが更新する場合があります。その際、同時に更新されると競合状態(Race Condition)が発生し不整合を引き起こします。これを防ぐためにあるタスクが共有リソースを更新する際、他のタスクが更新できないようにロックを行うことができます。
サンプルコード
以下はフェッチタスクを100個並列で作成し、各タスクがフェッチ回数をインクリメントするコードです。同時にフェッチ回数が更新されないように、更新中は with
構文による asyncio.Lock()
を使用します。
import asyncio
import logging
import httpx
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
)
logging.getLogger("httpx").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
TASK_COUNT = 100
class Fetcher:
def __init__(self):
self.client = self._create_client()
self.fetch_count = 0
self._lock = asyncio.Lock()
def _create_client(self):
"""httpxクライアントを作成する"""
client = httpx.AsyncClient()
return client
async def close_connection(self):
"""クライアントの接続を閉じる"""
await self.client.aclose()
async def _increment_count(self):
"""フェッチ回数をインクリメントする"""
# インクリメント中は排他的アクセスを行う
async with self._lock:
# NOTE: 通常以下のコードを使用する
# self.fetch_count += 1
# NOTE: 排他制御の確認のため、以下のコードを使用する
tmp = self.fetch_count
await asyncio.sleep(0)
self.fetch_count = tmp + 1
async def fetch(self, url):
"""指定されたURLにGETリクエストを送信する"""
try:
response = await self.client.get(url)
await self._increment_count()
response.raise_for_status()
except httpx.HTTPError as e:
logger.error(f"エラーが発生しました。")
async def main():
fetcher = Fetcher()
url = "https://httpbin.org/get"
# GETリクエストタスクをスケジューリング
tasks = [
asyncio.create_task(fetcher.fetch(url))
for _ in range(TASK_COUNT)
]
# タスクを並列実行
await asyncio.gather(*tasks)
await fetcher.close_connection()
logger.info(f"フェッチ回数: {fetcher.fetch_count}")
if __name__ == "__main__":
asyncio.run(main())
結果
100回になりました。
2025-08-10 09:58:18,271 - [INFO] - フェッチ回数: 100
排他制御が行われないサンプルコード
_increment_count
メソッドをロックが実装されていない以下のコードに置き換えます。
async def _increment_count(self):
"""フェッチ回数をインクリメントする"""
# NOTE: 通常以下のコードを使用する
# self.fetch_count += 1
# NOTE: 排他制御の確認のため、以下のコードを使用する
tmp = self.fetch_count
await asyncio.sleep(0)
self.fetch_count = tmp + 1
結果
100回ではなく45回となりました。
2025-08-10 09:58:30,087 - [INFO] - フェッチ回数: 45
解説
- タスクA:
self.fetch_count
が 0 の時に値を読み込み、tmp
に 0 を保持する - タスクA:
await
で処理を中断し、イベントループはタスクBに制御を渡す - タスクB:
self.fetch_count
はまだ 0 なので、値を読み込み、tmp
に 0 を保持する - タスクB: 処理を最後まで実行し、
self.fetch_count
を0 + 1
で 1 に更新する - その後、制御がタスクAに戻る
- タスクA: 自身の
tmp
(値は 0 )を使って処理を再開し、self.fetch_count
を0 + 1
で 1 に更新する
この結果、2つのタスクが実行されたにもかかわらず、fetch_count は 2 ではなく 1 になってしまいます。これが、更新が失われる「競合状態」の正体です。asyncio.Lock()
は、この一連の処理(読み込み〜書き込み)が完了するまで、他のタスクが割り込むことを防ぐ役割を果たします。
ここでは、競合状態を意図的に発生させるために await asyncio.sleep(0)
を入れています。これは「現在のタスクを一度中断し、他の待機中タスクに実行機会を与える」という働きをします。これにより、値を読み取ってから書き込むまでの間に、他のタスクが割り込む状況を擬似的に作り出しています。
参考