TL;DR
Python の asyncio で並行処理をするには asyncio.as_completed
が書きやすい。
前置き
asyncio は async/await 構文を用いた非同期処理ライブラリです。
Python 3.4 以降で導入され、3.7 でいくつか改良が入っています。
async/await のちゃんとした説明は世の中に優れた記事がたくさんありますのでそちらに譲ります。例えば、
- Pythonにおける非同期処理: asyncio逆引きリファレンス - Qiita
- Pythonの非同期通信(asyncioモジュール)入門を書きました - ゆくゆくは有へと
- [Python] 🐰 なんとなく理解するasyncio 🐢 - くろのて
- async/await 入門(JavaScript) - Qiita
など。
こういった記事を読んでも、実際自分が行いたい処理として実装するには書いてみないと分からないことも多いです。
今回は、「重い処理やネットワークの遅延などでユーザーが待たされるストレスを軽減するために、X秒経っても終わらなかったら先に別のメッセージを出す」みたいなことをしたくなり、asyncio でどうやるのかを調べました。これと等価な状況を、寿司を使って説明することを思い付いたため本記事にまとめます1。
全て、以下の公式ドキュメントに書いてあった内容です。
コードと実行結果
先にどん、と長いコードを出す感じになってしまうのですが、以下のようなプログラムを書きました。非同期とか並列処理とかは動かしてみないとイメージがつかめないので、とりあえず以下のコードをコピペして動かしてもらえればと思います (Python 3.7+ で動きます) 2。
import asyncio
import datetime
import time
import sys
async def cook_sushi():
"""sushiを握るのには sushi_cook_time 秒かかる"""
print_with_time('sushi職人「sushi一丁!」')
await asyncio.sleep(sushi_cook_time)
print_with_time('sushi職人「sushiお待ち!」')
return 'sushi'
async def cook_miso():
"""misoは1秒でできる"""
print_with_time('sushi職人「miso一丁!」')
await asyncio.sleep(1)
print_with_time('sushi職人「misoお待ち!」')
return 'miso'
def eat(dish):
print_with_time(f"客「{dish}うまあ😋」")
def print_with_time(str):
print(f"{datetime.datetime.now().strftime('%H:%M:%S')} {str}")
async def case1():
"""Case1: 寿司も味噌汁も頼んでおとなしく待ち、来た順に食う"""
for future in asyncio.as_completed([cook_sushi(), cook_miso()]):
result = await future
eat(result)
return None
async def case2():
"""Case2: 寿司を頼んでから5秒かかっても来なかったら店を出る"""
try:
result = await asyncio.wait_for(cook_sushi(), timeout=5.0)
eat(result)
except asyncio.TimeoutError:
print_with_time('客「sushiが来ないなら帰らせて頂く」')
return None
async def case3():
"""Case3: 寿司を頼んでから5秒かかっても来なかったら味噌汁を頼む"""
for future in asyncio.as_completed([cook_sushi(), asyncio.sleep(5)]):
result = await future
if result == 'sushi':
eat(result)
break
else:
# sushi が来なかった場合 (asyncio.sleep(5)の戻り値は None)
result = await cook_miso()
eat(result)
return None
if __name__ == '__main__':
sushi_cook_time = int(sys.argv[1])
print(case1.__doc__)
time.sleep(1)
asyncio.run(case1())
print('')
print(case2.__doc__)
time.sleep(1)
asyncio.run(case2())
print('')
print(case3.__doc__)
time.sleep(1)
asyncio.run(case3())
実行結果
sushi_cook_time = 3
として実行すると以下のようになります。
$ python3 awaited_sushi.py 3
Case1: 寿司も味噌汁も頼んでおとなしく待ち、来た順に食う
22:57:37 sushi職人「miso一丁!」
22:57:37 sushi職人「sushi一丁!」
22:57:38 sushi職人「misoお待ち!」
22:57:38 客「misoうまあ😋」
22:57:40 sushi職人「sushiお待ち!」
22:57:40 客「sushiうまあ😋」
Case2: 寿司を頼んでから5秒かかっても来なかったら店を出る
22:57:41 sushi職人「sushi一丁!」
22:57:44 sushi職人「sushiお待ち!」
22:57:44 客「sushiうまあ😋」
Case3: 寿司を頼んでから5秒かかっても来なかったら味噌汁を頼む
22:57:45 sushi職人「sushi一丁!」
22:57:48 sushi職人「sushiお待ち!
sushi_cook_time = 7
すると以下のようになります。
$ python3 awaited_sushi.py 7
Case1: 寿司も味噌汁も頼んでおとなしく待ち、来た順に食う
22:57:51 sushi職人「miso一丁!」
22:57:51 sushi職人「sushi一丁!」
22:57:53 sushi職人「misoお待ち!」
22:57:53 客「misoうまあ😋」
22:57:59 sushi職人「sushiお待ち!」
22:57:59 客「sushiうまあ😋」
Case2: 寿司を頼んでから5秒かかっても来なかったら店を出る
22:58:00 sushi職人「sushi一丁!」
22:58:05 客「sushiが来ないなら帰らせて頂く」
Case3: 寿司を頼んでから5秒かかっても来なかったら味噌汁を頼んで引き続き寿司を待つ
22:58:06 sushi職人「sushi一丁!」
22:58:11 sushi職人「miso一丁!」
22:58:12 sushi職人「misoお待ち!」
22:58:12 客「misoうまあ😋」
22:58:13 sushi職人「sushiお待ち!」
22:58:13 客「sushiうまあ😋」
説明
ここでは case1、case2、case3 という3種類の行動を取る客を考えて、それぞれコルーチンを作りました。コルーチンは、async def case1():
のように定義され、 Python 3.7+ では
asyncio.run(case1())
のように実行できます。さらに、これらのコルーチンの中で、cook_sushi()
, cook_miso()
の2つのコルーチンを呼び出しています。
case1
「寿司も味噌汁も頼んでおとなしく待ち、来た順に食う」場合です。実行順序などは気にせず、ただ並列にすれば良いだけで、以下のように書けます。
for future in asyncio.as_completed([cook_sushi(), cook_miso()]):
result = await future
eat(result)
asyncio.as_completed
にコルーチンのリストを渡しているだけで、シンプルです。asyncio.gather
や asyncio.wait
を使っても書くことはできます。
case2
次に、少し時間を考慮して、「寿司を頼んでから5秒かかっても来なかったら店を出る」場合です。これは要するにタイムアウト処理なので、そのために用意された asyncio.wait_for()
を使います。
try:
result = await asyncio.wait_for(cook_sushi(), timeout=5.0)
except asyncio.TimeoutError:
print('客「sushiが来ないなら帰らせて頂く」')
このようにすると、5秒を過ぎると TimeoutError
が出るので、例外処理してやれば良いです。
これも色々な書き方で書けますが、asyncio.wait_for
を使うのがが一番分かりやすいと思います。
case3
いよいよやりたかったことですが、「寿司を頼んでから5秒かかっても来なかったら味噌汁を頼んで引き続き寿司を待つ」で表されます。
これについても、 case1 と同様に、 asyncio.as_completed
で書けます。
for future in asyncio.as_completed([cook_sushi(), asyncio.sleep(5)]):
result = await future
if result == 'sushi':
eat(result)
break
else:
# sushi が来なかった場合 (asyncio.sleep(5)の戻り値は None)
result = await cook_miso()
eat(result)
今回は、cook_sushi
と asyncio.sleep
を並行に走らせて、先に返ってきたのが sushi じゃなかったら味噌汁をオーダーします。sushi が先に来たら勝手に for から break して asyncio.sleep
をキャンセルせずに放置しているのが若干気になりますが、今回は重たい処理じゃないので気にしないことにします。
はじめ、 case3
は以下のような書き方をしていました。
async def case3():
done, pending = await asyncio.wait(
[cook_sushi(), asyncio.sleep(5)], return_when=asyncio.FIRST_COMPLETED)
for d in done:
result = d.result()
if result == 'sushi':
eat(result)
else:
# sushi が来なかった場合 (asyncio.sleep(5)の戻り値は None)
result = await cook_miso()
eat(result)
if len(pending) > 0:
done, pending = await asyncio.wait(
pending, return_when=asyncio.ALL_COMPLETED)
for d in done:
eat(d.result())
asyncio.wait
を使ってこのように書けなくもないですが、asyncio.as_completed
を使うことで簡潔に、分かりやすく書けました。なお以下の記事では as_completed
の他に callback を使う方法も紹介されています。
また、より複雑な処理をしたければ asyncio.Queue
を使うと良さそうでしたが今回は試しませんでした。
並行処理する際の注意
もう一点ハマったのは、「コルーチンの中で非同期ではない重たい処理をすると他のタスクをブロックしてしまう」ということです。例えば、 cook_sushi()
コルーチンの中にある asyncio.sleep
を time.sleep
に変えると、結果が変わってしまいます。
同様に、「HTTP request を投げてその間に別のことをする」には非同期の aiohttp などを使わないと実現できません。はじめ Requests を使っていてハマりました。上で挙げた記事にも書かれていました。
その他
-
ラーメンで理解するasync/await - Qiita
- "寿司 非同期" とかでググって何も引っかからなかったので安心していたら、記事を大方書き終わったところでこちらを見つけてしまいました。言語と食べ物が違うので許してください。
-
await って言う単語 | ++C++; // 未確認飛行 C ブログ
- async/await や関連の英単語について書いてあり、勉強になりました。
-
Rebuild: 206: Make Ruby Differentiable (omo)
- Erik Meijer 氏が色んな言語に async/await 組み込むコンサルをしたという話が面白かった。