LoginSignup
154

Python の asyncio は超便利

Last updated at Posted at 2021-11-30

Python の asyncio (公式ページ) は超便利なので紹介します。

■何が便利?

要するに JavaScript ではもはや当たり前になっている async/await が実現できます。
つまり、非同期(処理をしている間、同期して完了を待つのでなく、次の処理を実行するやり方)を実現します。
非同期により、全体の 処理速度を爆上げ できる場合がよくあります。

例えば、外部サービスリクエストや、ファイル・DBへの読み書きなど、I/O関連は時間がかかる割にCPUは空いてたりするので、そこが有効に活用されるようになるわけです。

多くのシステムはI/Oが大量にあったりするので圧倒的です。

■それってスレッドでもできるのでは?

もちろん可能ですが、コンセプトが異なり、スレッドに比べて 圧倒的にお手軽 です。

スレッドと何が違うのかというと、主な違いは、スレッドは複数の処理が同時に走るということです。処理の裏で別の処理が同時に走るため、値の読み/書きでは必ずロックやスレッドセーフを考慮しなければなりません。

一方で asyncio は処理の裏で別の処理が走ることはあり得ません。(I/Oが走ってることはあり得ますけど。)
すべての処理は直列に走ります。走ってる処理が await になると、待ち行列に並んでいる次の処理が走り始めます。

これが最高に良いのです。ロックもスレッドセーフも考慮不要なのですから。
なぜなら、裏で別の処理が走ることはないわけですから、1つの処理を実行している間は(await するまでは)必ずその処理だけが全てを占有できるわけです。占有できるのでロックやスレッドセーフの考慮は不要となり、ロック/スレッドセーフの高コストな処理がない点も高速化に寄与します。それ以上にコードがシンプルになるのでメンテナンスが楽になります。

では逆に、スレッドが有利になる場合はなんでしょうか。複数のCPUがあり、それを各スレッドに割り当てた時ですかね。(プロセスと呼ぶべきか。)
ただ、ロックやスレッドセーフはそれなりにコストもかかるので、本当にCPUを複数使うことで効果が出るかは要件次第です。
デバッグもとても大変なものになりますので、まずは asyncio から検討するのが良いかと思います。

■具体的な使い方

import asyncio

async def main():
  print('Hello ...')
  await asyncio.sleep(1)
  print('... World!')

# Python 3.7+
asyncio.run(main())

標準機能なのでインストールは不要です。
必ず main 的な async メソッドを1つ用意し、一番外側で run する必要があります。

■よく使う機能の紹介

具体的な使い方は標準ドキュメントに大量の情報がありますのでそっちをみていただくとして、ここでは良く使う機能に絞って、かつ、要するに何者なのかが分かるようにまとめます。

sleep

await asyncio.sleep(1)

非同期に一定時間待ちます。

gather

※ 3.11以降はTaskGroupの利用をお勧め

await asyncio.gather(
  async_method1(),
  async_method2(),
)

複数同時に並行実行します。並行してI/Oを待つことができます。全てが終わったら完了します。処理自体は実際は直列ですが、処理の中で I/O を await した際に、自身の処理はIO待ちで止まってしまいますが、並行している別の処理に制御がわたるので、CPUが効率的に使われます。実際は直列ながらも並列処理ができる(I/Oが並列する)わけです。

create_task / Task

※ 3.11以降はTaskGroupの利用をお勧め

task1 = asyncio.create_task(async_method1())

# 終了を待つこともできる
# ※ async_method1()の実行でエラーが出ていればここで発生することになる
await task1

async メソッドの処理を別スレッド立てるみたいに実行します。完了をawaitするのでなく、並行して実行します。ただし、asyncio ですので真の平行なのではなく、await した際に、自分の順番が来たら実行されます。裏側に eventloop というキューのような仕組みがあり、誰かがawaitする度にeventloop から処理が1つ取り出されて実行されます。これを繰り返すのですが、このeventloopに追加されます。

Task.result

result1 = task1.result()

タスク元とのなった async メソッドの実行結果を取得する。

  • 実行でエラーが出ていればここで発生することになる
  • まだ終わっていないなら InvalidStateError が発生する

Task.done

is_done = task.done()

タスクが終了しているかどうかを bool で取得する。

  • 実行でエラーが出ていればここで発生することになる

TaskGroup

※ 3.11で追加

async with asyncio.TaskGroup() as tg:
  tg.create_task(async_method1())
  tg.create_task(async_method2())
# 例外発生 or すべてのタスク完了で抜ける
  • create_Task をより安全に便利にしたもの。
  • 例外が発生した場合、create_task 中の他のものはすべて自動的に Cancel される。
  • いずれかで例外が発生した場合、同時に発生した例外を個別に補足可能。同時というのは個々の Task の中で次の await までの間という意味。
try:
  async with asyncio.TaskGroup() as tg:
    tg.create_task(async_method1())
    tg.create_task(async_method2())
except* ValueError as e:
  for _e in e.exceptions:  # 発生したValueError型のエラーがtupleで入っている
    print(_e)
  • except* は 3.11 から導入された文法。これで個々に発生した例外を例外の型ごとに個別に取得できる。結果、print(e) は発生した回数だけ呼ばれることになる。これでもれなく例外処理が可能となる。

Lock

lock = asyncio.Lock()

# ... その後
async with lock:
  # 共有しているデータへのアクセス

ロックの仕組みを async で実現します。async with を抜けたら自動的にロックが解放されます。

Semaphore

sem = asyncio.Semaphore(10)

# ... 別のどこか
async with sem:
  # 共有しているデータへのアクセス

「セマフォ」と読みます。
基本、ロックと同じで、共有リソースへのアクセスを一定個数以内(上記なら10個以内)に保ちたい場合に使います。
上記なら async with sem の中に入れるのは 10 処理まで、ということになります。

Event

some_event = asyncio.Event()

# ... 別のどこか
# 発火を待つ
await some_event.wait()

# ... 別のどこか
# イベント発火
event.set()

何かのイベントのきっかけがトリガーされたことを通知します。
イベントのインスタンスごとに Event オブジェクトを生成します。
wait() したときにすでに発火済みだった場合も拾えます。
event オブジェクトは発火したかどうかのフラグ的なもので、一度発火すると、ずっと発火しっぱなし(フラグ立ちっぱなし)です。
clear() を呼ぶことでフラグを降ろして再利用可能ですが、前の発火なのか降ろした後のすぐの発火なのか見分けられないので、event オブジェクトを使い捨てながら使ったほうが現実的に思っています。(チャット欄などで希望頂ければ問題&解決策の事例記事をアップします。)

Queue

複数の async メソッド同士でやり取りするのに非常に便利なクラス。
非同期な処理の完了を一方が無限ループで queueget し続け、もう一方が完了し次第 put していく作りはよくやります。

hoge_queue: 'asyncio.Queue[Hoge]' = asyncio.Queue()

# queue から1つ取り出し
hoge = await hoge_queue.get()

# queue が空なら例外(QueueEmpty)が発生する取り方
# ※ await 不要! なので async でないメソッドからも呼べる
try:
  hoge = hoge_queue.get_nowait()
except asyncio.QueueEmpty:
  # 空だった場合の処理

# 事前に空かチェック
if queue.empty():
  # 空の場合の処理

# queue に追加
queue.put_nowait(hoge)
queueで保持可能なアイテム数を指定する場合
hoge_queue: 'asyncio.Queue[Hoge]' = asyncio.Queue(1)  # 1個まで

# queue に追加(いっぱいなら例外)
try:
  queue.put_nowait(hoge)
except asyncio.QueueFull:
  # いっぱいの場合の処理

# queue に追加(いっぱいなら待つ)
await queue.put(hoge)

# 事前にいっぱいかチェック
if queue.full():
  # いっぱいの場合の処理

無限ループの場合、終了でループが終わるつくりにするのに一工夫は必要です。アイテムとして終了フラグ的なものを渡すなど(なにしろ次が来るまで永久に待ち続けてしまいますので)。

型の指定で 'asyncio.Queue[Hoge]' とすることでアイテムの型指定が可能です。シングルクォート必須なので注意。

PriorityQueue

優先順位順に取り出せるキュー(基本は前述のキューと同じ)。

hoge_queue: 'asyncio.PriorityQueue[Hoge]' = asyncio.PriorityQueue()

他は Queue と完全に同じ。
ただし、アイテム(上記でいえば Hoge)は優先順位順に並んだ状態で管理される。優先順位の評価には __lt__ が使われる。

アイテムの指定例1
# アイテムの優先順位を1つに限定して定義可能な場合はこの方法

@dataclass
class Hoge:
  num: int
  
  def __lt__(self, other: 'Oricon'):
    # num が小さいものを優先
    return self.num < other.num

# ... 別のどこか
# num が小さいものから順に get できる
hoge = await hoge_queue.get()
アイテムの指定例2
# アイテムの優先順位が複数あったり、アイテムのクラスを触れない場合はこの方法

# put する際にタプルで順位を渡す
hoge_queue.put_nowait((item.num, item))

# num が小さいものから順に get できる
_, item = await hoge_queue.get()

よく使うのはこれくらい。

■ async メソッドの Test (pytest) 方法

pytest-asyncio のインストール

下記のように pipfile[dev-packages] ブロックに pytest 関連をインストール。
pytest-asyncio は pytest で asyncio を扱うためのモジュール。これを加えます。

pipfile
[dev-packages]
pytest = "*"
pytest-asyncio = "*"
pytest-mock = "*"

async なテストケースの実行方法

@pytest.mark.asyncio
async def test__call_async_method():
  # await が使える
  actual = await async_method1()

  # チェック
  assert actual == 123
  • @pytest.mark.asyncio をつける

async な fixture の実現方法

async def fixure1():
  return await async_method1()

def test__something(fixure1):
  assert fixure1 == 1234
  • fixture はアノテーションなど不要で async なメソッドが利用可能。
  • 使うときもawait不要。await 済みのデータを取得した状態でやってくる。
  • ただし yield をつかって setup/teardown をしたい場合、pytest-asyncio のバージョン 0.19 あたりから仕様が変わったようで、@pytest_asyncio.fixture が必須になる。
@pytest_asyncio.fixture
async def conn():
  async with connect() as conn:
    yield conn

def test__something(conn):
  assert conn.get() == 1234

async な mock / mocker.patch.object の実現方法

def test__mock(self):
  hoge = AsyncMock(Hoge)

  mocker.patch.object(hoge, 'async_method1', return_value=123)
  # → await hoge.async_method1() の結果 123 が返る

  mocker.patch.object(hoge, 'normal_method1', return_value=123)
  # → hoge.normal_method1() の結果 123 が返る
  • MagicMock の代わりに AsyncMock を使うことでメンバで持っているメソッドすべてが、async かどうかにあわせてモックメソッドになる。つまり、async なら await の後、return_valueside_effect の指定したものが反映される動きになる。
  • AsyncMock したオブジェクトに対して mocker.patch.object を使うことで await かそうでないかは内部で自動的に判断して吸収してくれる。

こちらもどうぞ

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
154