13
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【初心者向け】Pythonの非同期,並行処理であるasyncioを理解して使いこなす

Last updated at Posted at 2022-10-24

0. はじめに

pythonでスクレイピングといったらどのライブラリを使いますか?
多分大多数の人は「requests+BeautifulSoup」「Selenium」と答えると思うし、ググったらまずそうなると思うし、私もそうでした。

もちろんそれでOKなんですが、最近「Pyppeteer」とか「Requests-HTML」,「Playwright」なんてのを徐々に耳にするようになった。
しかしそれらの記事を詳細に調査していくと必ず今度は「asyncio」なる概念が登場する。

そこで2回に分けて記事にすることにします。
(本記事へ反響あれば次回記事書きます)

第1回:python asyncioを理解して使いこなす(本記事)
第2回:Requests-HTML(puppeteer)でスクレイピング(次回予定記事※本記事が需要あれば)
ます

  • 動作環境
    • OS : Windows10 pro
    • python: 3.9.6 ※3.7以上が必須
    • 基本は.pyファイルで説明。※いつもの記事と違い、jupyter環境だとasyncio関連でエラーになるので注意(3で回避方法を説明)

なおasyncioはプレインストールされてますのでpip不要です

1. asyncioとは?

1-1.まずは用語に関して(プロセス/スレッド/処理)

ここらへんが基本だけれどわかりづらい部分だとも思うので、少し雑な説明だが私なりの理解を記してしておく
(文字が続きますがご容赦ください)

1-1-1.プロセス

プログラム実行~終了までのこと。要するにメモ帳やパワポなど独立して動作するプログラム(アプリ)を指す
Windowsでいうと、タスクマネージャを起動(Esc+Shift+Ctrl)すると最初にこのプロセスが表示されるはずである。
※CPUは1コアに対して一度に1つのプロセスしか実行できない

1-1-2.マルチプロセス

プログラムを複数立ち上げて実行すること。
Excelとかメモ帳とか同時に動かせる状態はマルチプロセスと言える

1-1-3.(シングル)スレッド/マルチスレッド

プロセスから生成されるもの(プロセスよりも小さな実行単位)で、処理を順番に実行するもの。
途中で処理効率を上げるために並列で処理させれば「マルチスレッド」となる

※マルチプロセスとの違いは、スレッド間でメモリが共有されるか否か。
プロセス1とプロセス2はメモリが共有されないが、スレッド1とスレッド2は共有される。
別プロセスのメモリ情報を直接参照することはできない

1-1-4.同期処理とは

複数タスク(スレッド)を実行する際に1個ずつ「順番に」実行される方式のこと。

1-1-5.非同期処理とは

複数タスクを実行する際に、あるタスクが動いていても「処理を止めずに別のタスクを実行」できる方式のこと

1-1-6.逐次処理とは

1つずつ順番に処理する方法。処理Aが完全に終わったら処理Bを処理すること

1-1-7.並行処理とは

処理AとBを切り替えながら処理すること

1-1-8.並列処理とは

処理AとBを同時並列で処理すること

※なお、ここで使用している「タスク」という単語に関しては一般的にスレッドとイコールとされることが多いですが、
asyncioの理解の中にも「タスク」という単語が出てきます。

1-2.asyncioとは?

さて、基本的な用語の説明は終わったので本筋に行きます。

asyncioは上で説明したものの中でどれにあたるのよ?って話ですが、
結論から言うと「asyncioはシングルスレッド + 非同期処理 + 並行処理」にあたります。

これだけ聞くと、なんだよマルチスレッドの方がいいんじゃないの?と思うかもしれませんが、マルチスレッドには難しい点があります。
それは「スレッドセーフ」等各スレッドを管理する必要性です(詳細は本筋でないので省略します)。

まずは以下で、以後asyncioと比較する為の簡単な例として「普通の関数(≒逐次処理)」の処理を記載します。

普通の関数
import time

def one_print():
    time.sleep(3) #3秒の何かしらの処理時間
    print(1)

def two_print():
    print(2)

one_print()
two_print()

実行結果は当たり前だが、one_print関数が完了した後にtwo_print関数が動くだけである。

実行結果
1
2

1-3. イベントループ/コルーチン/タスク

・asyncioは「イベントループという仕組みを利用して、シングルスレッドで並列処理を実行するライブラリ」である。
・asyncioはネットワークやDBの入出力を待ってる時間でほかのタスクを並列に実行できる。
  ※あくまで「外部」へのI/Oの待ち時間に有効で、内部処理(CPUの計算等)の待ち時間には利用できない。
  ※asyncioの待ち時間部分は「専用命令」しか対応していないので注意

1-3-1.イベントループとは?

何かしらリクエストがあった時に「決められた(登録された)順番に従って処理を進め、次のリクエストを待つループを繰り返すもの」と私はざっくり理解しました。詳しい意味は長くなりますので、ご自身でも調べてみてください。
asyncio的に言うなら、後述するタスクやコルーチンの実行/管理を行い、並列処理を実現できるものと言えそう。

asyncio.runで、イベントループを作ることができる
※ただし、asyncio.runの中で新たにasyncio.runを行うことはできない。

1-3-2.コルーチン(async関数)とは?

コルーチンとは、処理を途中で中断して再開できるルーチン(処理の固まり≒関数)のこと。
関数の定義にasync(非同期:asynchronousの略)を付ける
※コルーチンの定義は、関数「def ・・」の手前に「async def ・・」をつけるだけでOK

1-3-3.コルーチン(await関数)とは?

コルーチンの中で中断/再開する処理を指定するもの。
でその関数の中で時間がかかるI/O処理にawaitをつける。※awaitをつけられるのはコルーチンやタスクのみ

なお、コルーチン関数を普通の関数みたいに実行した場合は通常の関数と違い、関数の最後にreturnがあっても値は帰ってこない為結果を受け取れず、代わりにオブジェクトが返ってくる

ではどうやってreturnを受け取るのか?といえば、イベントループにコルーチン関数をのせればいいのである。

しかし、以下サンプルのようにただawaitしただけでは並列処理にはならない。※1個のコルーチンが終わったら次のコルーチンを処理するだけになってしまう
また、待ち時間の個所がtime.sleepではないことに注意(asyncioの待ち時間部分は「専用命令」しか対応していない)

イベントループとコルーチン
import asyncio

async def one_print():
    await asyncio.sleep(3) #asyncの中の待ち時間はtime.sleepではなくてasyncio.sleepを使う
    print(1)

async def two_print():
    print(2)

async def main(): #コルーチンを定義
    #中断とか再開したい処理にはいちいちawaitをつける
    await one_print() #コルーチンの中で中断/再開する処理その1
    await two_print() #コルーチンの中で中断/再開する処理その2

asyncio.run(main()) #イベントループを作成

※これを実行しても3秒の待ち時間に並列処理をしてくれるわけでもなく、以下のように並列処理にならない・・・

実行結果
1
2

1-3-4.タスクとは?

そこでタスクの登場なのである。

タスクとはコルーチンをラップして実行状態を待つもので、戻り値の取得もできるもの
コルーチンの並列処理を行う為には、このタスクをイベントループに登録(create_task)する必要がある。

イベントループとコルーチンとタスク
import asyncio

async def one_print():
    await asyncio.sleep(3) #待ち時間はtime.sleepではなくてasyncio.sleepを使う
    print(1)

async def two_print():
    print(2)

async def main(): #コルーチンを定義
    #タスクを作成
    task1 = asyncio.create_task(one_print())
    task2 = asyncio.create_task(two_print())
    await task1
    await task2

asyncio.run(main()) #イベントループを作成

すると、今回は「2」が先に出力されていることがわかる。
これはタスク1の待ち時間にタスク2を実行したから出力される結果であり、狙い通りの並行処理ができていることがわかる

実行結果
2
1

もう少しわかりやすい例でもやってみようと思う。

並列処理の中身がわかるような例(一斉に入店し、メニューで迷ってる客)
import asyncio

#引数は考察時間、座席番号
async def c_order(n, sheet):
    print(f'座席{sheet}は考え中')
    await asyncio.sleep(n)
    print(f'座席{sheet}のオーダー入ります')
    return f'座席{sheet}注文確定'

#コルーチン
async def order():
    print('お客さんたち注文は?')
    task1 = asyncio.create_task(c_order(3, 1)) #座席1の客の考察時間3秒
    task2 = asyncio.create_task(c_order(5, 2)) #座席2の客の考察時間5秒(一番長い)
    task3 = asyncio.create_task(c_order(2, 3)) #座席1の客の考察時間2秒(一番短い)
    menu1 = await task1
    menu2 = await task2
    menu3 = await task3
    
    print(menu1, menu2, menu3)
    
asyncio.run(order())

これを実行すると、以下のように処理が短い順に注文が決定している様子がよくわかる。

実行結果
お客さんたち注文は?
座席1は考え中
座席2は考え中
座席3は考え中
座席3のオーダー入ります ←まず考察時間が一番短い座席3が決定
座席1のオーダー入ります ←次に考察時間が短い座席1が決定
座席2のオーダー入ります ←一番考察時間が長い座席2が決定
座席1注文確定 座席2注文確定 座席3注文確定 ←全タスク完了後に実行

1-4. gather

gatherは引数にコルーチン、またはタスクを受け取って、すべてが終わり次第、結果を投入した順で返す。
「tasks」というところに登録してあるコルーチンをまとめて実行して、結果を渡した順で返って来るというものになっています。

gatherで一括task登録
import asyncio

#引数は考察時間、座席番号
async def c_order(n, sheet):
    print(f'座席{sheet}は考え中')
    await asyncio.sleep(n)
    print(f'座席{sheet}のオーダー入ります')
    return f'座席{sheet}注文確定'

async def order():
    print('お客さんたち注文は?')
    task = {
        c_order(3, 1),
        c_order(5, 2),
        c_order(2, 3)
    }
    #gatherで一括task登録可能
    result = await asyncio.gather(*task)
    print(result)
    
asyncio.run(order())

同様に並列処理が実行されていることがわかる。

実行結果
お客さんたち注文は?
座席2は考え中
座席1は考え中
座席3は考え中
座席3のオーダー入ります
座席1のオーダー入ります
座席2のオーダー入ります
['座席2注文確定', '座席1注文確定', '座席3注文確定']

2. 実例として、ポケモンの例で処理時間を比較してみる

実例??として「pokeapi」というAPIを使用して、asyncioの効果を測定したいと思う。

pokeapiのサンプル(requests使用)
import requests
import time
import pandas as pd

result = []

def get_poke(id):
    r = requests.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
    result.append([r.json()['id'], r.json()['name']])
    
def main():
    start = time.time() 
    for poke_id in range(1,152):
        get_poke(poke_id)
    end = time.time()
    print(f"processing time:{end-start}") #処理時間を測定する
    
main()
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))

これを実行すると、かかった処理時間とデータフレームにした初代ポケモンの151匹の図鑑IDとポケモン名称(英語)が取得できる。
普通のrequestsで実行すると、10秒かかっていることがわかる

実行結果
processing time:10.358959913253784 ★処理時間は10秒!

      id        name
0      1   bulbasaur  ※フシギダネ(ポケモン図鑑No.001)
1      2     ivysaur  ※フシギソウ(ポケモン図鑑No.002)
2      3    venusaur  ※フシギバナ(ポケモン図鑑No.003)
3      4  charmander  ※ヒトカゲ(以下略)
4      5  charmeleon
..   ...         ...
146  147     dratini
147  148   dragonair
148  149   dragonite
149  150      mewtwo
150  151         mew

今度は同様の内容を「asyncio:並行処理」で実行して処理時間がどう変わるか?を測定する。

pokeapiのサンプル
import httpx #asyncio版のrequests
import time
import asyncio
import pandas as pd

result = []

async def get_poke(id):
    async with httpx.AsyncClient() as client:
        r = await client.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
        result.append([r.json()['id'], r.json()['name']])

#コルーチン
async def main():
    start = time.time() 
    tasks = []
    #タスクを設定する(151匹分)
    for poke_id in range(1,152):
        tasks.append(get_poke(poke_id))
    #タスク実行
    await asyncio.gather(*tasks)
    end = time.time()
    print(f"processing time:{end-start}") #処理時間を測定する
    
asyncio.run(main()) #イベントループ作成
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))

すると、以下のように3秒にまで短縮できていることがわかる。

実行結果
processing time:3.65269136428833 ★処理時間は3秒!

※データフレームは同じなので省略

3. jupyterでasyncioを実行する為には?

今まで説明してきたコードをjupyterで実行するとエラーが発生する
「RuntimeError: asyncio.run() cannot be called from a running event loop」
なぜこんなエラーが発生するか?といえば、なんとjuputerそのものがイベントループで動いているらしい。
最初の方に述べたが、「イベントループの中でイベントループを発生させることはできない」からエラーになるのである。

解決策はいくつかあるが、簡単なのは以下のようにasyncio.run()の代わりにawait関数として実行させてやればいい。
(一応コード内にそれ以外の実行方法も書いておいた)

先程の座席注文の例をjupyterで実行する
import asyncio

#引数は考察時間、座席番号
async def c_order(n, sheet):
    print(f'座席{sheet}は考え中')
    await asyncio.sleep(n)
    print(f'座席{sheet}の注文決定')
    return f'座席{sheet}注文確定'

async def order():
    print('お客さんたち注文は?')
    task = {
        c_order(3, 1),
        c_order(5, 2),
        c_order(2, 3)
    }
    #gatherで一括task登録可能
    result = await asyncio.gather(*task)
    print(result)
    
#asyncio.run(order()) ←jupiterではここでエラーになる
await order() #よって代わりに「await」を使用する

"""(おまけ)もしくはawaitの代わりに以下(その1/2)でもOK"""
#その1
# loop = asyncio.get_event_loop() #jupyterのループを取得
# loop.create_task(order())

#その2
# loop = asyncio.get_event_loop() #jupyterのループを取得
# asyncio.run_coroutine_threadsafe(order(), loop)

4. さいごに

どうだったであろうか?
かなり細かくわかりやすい例で記事を書いたつもりなので、なんとなくは理解できたと思う。
本記事への反響があれば次回はスクレイピングにこのasyncioを応用しているrequests_htmlを解説する予定です。反響なければ第一回で終わりますのであしからず。

なお、nest-asyncioというものを使用すればイベントループをネストできるので興味ある方はこちらも調べてみてください

参考資料たち

13
12
6

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?