はじめに
並列処理では、実行中に特定のタスクをキャンセルしたい場面が出てきます。その方法を書きます。
タスクキャンセル
class asyncio.Task
にはタスクをキャンセルするためのメソッド cancel
があります。
公式ドキュメント:
以下の検証コードでは Alice と Bob がリクエストを並列で送り続け、以下の条件でそのタスクをキャンセルします。
- 5秒後に Alice のタスクをキャンセル
- 10秒後に Bob のタスクをキャンセル
検証コード:
import asyncio
from datetime import datetime
import aiohttp
async def req_aiohttp(name):
try:
while True:
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - {name} のリクエストが開始しました。 ------\n")
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/delay/5") as response:
print(f"{name} Status: {response.status}")
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - {name} のリクエストが完了しました。 ------\n")
except asyncio.CancelledError:
print(f"{name} のタスクはキャンセルされました。")
async def cancel_tasks(task_map):
"""秒数をカウントし、指定されたタイミングでタスクをキャンセルする"""
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - タスク監視が開始しました。 ------\n")
for n in range(20):
await asyncio.sleep(1)
print(f"{n + 1} 秒経過")
if n + 1 == 10:
# タスク開始5秒後にAliceのタスクをキャンセルする
print("Alice のタスクをキャンセルします。")
task_map["Alice"].cancel()
if n + 1 == 15:
# タスク開始10秒後にBobのタスクをキャンセルする
print("Bob のタスクをキャンセルします。")
task_map["Bob"].cancel()
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - タスク監視が完了しました。 ------\n")
async def main():
users = ["Alice", "Bob"]
task_map = {user: asyncio.create_task(req_aiohttp(user)) for user in users}
# キャンセルタスクが完了するまで待機
await cancel_tasks(task_map)
# リクエストタスクが完了するまで待機
await asyncio.gather(*task_map.values(), return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())
実行結果:
------ 14:19:25.957475 - タスク監視が開始しました。 ------
------ 14:19:25.957534 - Alice のリクエストが開始しました。 ------
------ 14:19:25.957669 - Bob のリクエストが開始しました。 ------
1 秒経過
2 秒経過
3 秒経過
4 秒経過
5 秒経過
Bob Status: 200
6 秒経過
------ 14:19:32.106096 - Bob のリクエストが完了しました。 ------
------ 14:19:32.106715 - Bob のリクエストが開始しました。 ------
Alice Status: 200
------ 14:19:32.819046 - Alice のリクエストが完了しました。 ------
------ 14:19:32.819634 - Alice のリクエストが開始しました。 ------
7 秒経過
8 秒経過
9 秒経過
10 秒経過
Alice のタスクをキャンセルします。
Alice のタスクはキャンセルされました。
11 秒経過
Bob Status: 200
12 秒経過
------ 14:19:38.140351 - Bob のリクエストが完了しました。 ------
------ 14:19:38.140709 - Bob のリクエストが開始しました。 ------
13 秒経過
14 秒経過
15 秒経過
Bob のタスクをキャンセルします。
Bob のタスクはキャンセルされました。
16 秒経過
17 秒経過
18 秒経過
19 秒経過
20 秒経過
------ 14:19:45.996533 - タスク監視が完了しました。 ------
以下のように動作していることが確認できました。
[Alice リクエスト開始] [Bob リクエスト開始]
↓
--- 10秒 ---
↓
[Alice リクエストキャンセル]
↓
--- 20秒 ---
↓
[Bob リクエストキャンセル]
考察
タスクの実行開始タイミング
以下のコードでは先にリクエストタスクを作成し、次にキャンセルタスクを実行しています。
users = ["Alice", "Bob"]
task_map = {user: asyncio.create_task(req_aiohttp(user)) for user in users}
# キャンセルタスクが完了するまで待機
await cancel_tasks(task_map)
しかし、出力結果は先にタスク監視のメッセージが表示され、逆になっています。
------ 14:19:25.957475 - タスク監視が開始しました。 ------
------ 14:19:25.957534 - Alice のリクエストが開始しました。 ------
------ 14:19:25.957669 - Bob のリクエストが開始しました。 ------
このことから、await asyncio.sleep(1)
が実行されるまでリクエストタスクが実行されていないことがわかります。asyncio.create_task
は実行をスケジュールする(実行待ちの状態にする)だけで、実行自体を行うメソッドではありません。asyncio.create_task
で作成されたタスクが実行されるのはどこかで await
が呼ばれた後です。
公式ドキュメント:
またはタスクグループという方法もあるようです(未検証):
コードに記載したとおりリクエストタスクを開始してからキャンセルタスクを開始したい場合、タスク作成後に await asyncio.sleep(0)
を呼びます。
async def main():
users = ["Alice", "Bob"]
task_map = {user: asyncio.create_task(req_aiohttp(user)) for user in users}
# スケジュールされたタスクを実行する
await asyncio.sleep(0)
await cancel_tasks(task_map)
await asyncio.gather(*task_map.values(), return_exceptions=True)
参考:
sleep() は常に現在の Task を一時中断し、他の Task が実行されるのを許可します。
delay を 0 に設定することで、他のタスクを実行可能にする最適な方針を提供します。この方法は、実行時間の長い関数が、その実行時間全体にわたってイベントループをブロックしないようにするために利用できます。
このようにすることで、以下のとおりリクエストタスクが先に動くようになります。
------ 14:52:57.659309 - Alice のリクエストが開始しました。 ------
------ 14:52:57.659471 - Bob のリクエストが開始しました。 ------
------ 14:52:57.659555 - タスク監視が開始しました。 ------
補足
asyncio に関する一般的なことではなく今回の検証コードに限ったことですが、キャンセルタスクを create_task
する方法もあります。
import asyncio
from datetime import datetime
import aiohttp
import time
async def req_aiohttp(name):
try:
while True:
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - {name} のリクエストが開始しました。 ------\n")
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/delay/5") as response:
print(f"{name} Status: {response.status}")
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - {name} のリクエストが完了しました。 ------\n")
except asyncio.CancelledError:
print(f"{name} のタスクはキャンセルされました。")
async def cancel_tasks(task_map):
"""秒数をカウントし、指定されたタイミングでタスクをキャンセルする"""
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - タスク監視が開始しました。 ------\n")
for n in range(20):
await asyncio.sleep(1)
print(f"{n + 1} 秒経過")
if n + 1 == 10:
# タスク開始5秒後にAliceのタスクをキャンセルする
print("Alice のタスクをキャンセルします。")
task_map["Alice"].cancel()
if n + 1 == 15:
# タスク開始10秒後にBobのタスクをキャンセルする
print("Bob のタスクをキャンセルします。")
task_map["Bob"].cancel()
print(f"\n------ {datetime.now().strftime('%H:%M:%S.%f')} - タスク監視が完了しました。 ------\n")
async def main():
users = ["Alice", "Bob"]
task_map = {user: asyncio.create_task(req_aiohttp(user)) for user in users}
- await cancel_tasks(task_map)
- await asyncio.gather(*task_map.values(), return_exceptions=True)
+ canceler = asyncio.create_task(cancel_tasks(task_map))
+ await asyncio.gather(*task_map.values(), canceler, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())
こちらのコードのほうがよいと思います。キャンセルタスクで await するのは少しおかしな感じがします。キャンセルタスクは監視タスクでもあるため、リクエストタスクと並行して動かすほうが適切だと考えられるためです。
まとめ
- asyncio のタスクキャンセルを使用して実行結果を確認した
-
create_task
はタスクスケジュールを行う(タスクを実行待ちにする)
参考
タスクキャンセルメソッド:
タスクキャンセル解説:
タスク作成:
タスクグループ:
スリープ: