0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【メモ】非同期関数のタスクとイベントループ

Posted at

概略

  • 非同期処理では、イベントループタスクの実行順序を管理し、効率的な処理を実現する
  • タスクは非同期処理(コルーチン)を実行するためのオブジェクトであり、イベントループに登録することで並行実行される
  • asyncio.new_event_loop()でイベントループを作成し、create_taskでタスクを登録する

実行ファイル

記事と同じ内容を含むnotebookをgithubにアップしました。実際に手を動かしながら実行される際に利用してみてください。
github: https://github.com/Tsucreator/learn_asynchronous

バックナンバー

前置き

まだまだ駆け出しの初学者です。記載内容に誤りや読者へ誤解を招く記載がありましたら、後学のためご指摘いただけるととても励みとなります。有識者の方よろしくお願いします。

非同期関数の実行タスクの管理

タスクとイベントループの概念

非同期処理は時間のかかる処理を後回しにすることでその利点を受け取ることができます。ただし、処理の実行順序を適宜入れ替えるには実行順序を管理することができる機能が必要です。この"実行順序を管理する機能"が**イベントループ(Event Loop)です。

イベントループでは、実行順序を管理する対象を
タスク(Task)**と呼びます。タスクを作成しイベントループへ登録し、イベントループ内にて処理の順序を割り当て効率的に処理を完遂します。以下にまとめます。

  • イベントループ (Event Loop): タスクの実行を管理する中心的なコンポーネント。タスクの実行順序を決定し、I/O待ちが発生した場合は他のタスクに処理を切り替える
  • タスク (Task): コルーチンを実行するためのオブジェクト。イベントループに登録することで、並行実行が可能となる

実装

より具体的な理解につながるよう実装をしていきます。唐突に利用しているメソッドがいくつかありますので簡単な説明を記載します。

以下の例では

  • asyncio.new_event_loop(): イベントループ(new_loop)を作成
  • set_event_loop(new_loop): 作成したイベントループのセット(別のループが動いている時、指定したイベントループをアクティブにする)
  • create_task: イベントループにタスクを登録。create_taskは引数に渡されたコルーチン(awaitableオブジェクト)を、タスクとしてイベントループに登録
  • run_until_complete(my_task): イベントループ内のタスクが完了するまで実行する

※notebook上ではエラーとなりますので、実行される場合は下方のコードを利用して下さい。

import asyncio

# 時間のかかる処理を模擬的に表現
async def my_task():
    print("Task is running")
    await asyncio.sleep(2) # 2秒かかる
    print("Task is done")

# 明示的にイベントループを作成する
new_loop = asyncio.new_event_loop()
# 作成したイベントループを実行するようにセットする
asyncio.set_event_loop(new_loop)
# my_task()を登録
my_task = new_loop.create_task(my_task())
# イベントループの実行
new_loop.run_until_complete(my_task)
# イベントループを閉じる
new_loop.close()

# 結果
>>>Task is running
>>>Task is done


※notebook用はこちら

"""
 jupyter notebook用のスクリプト。イベントループの作成はせずに内部のイベントループを明示的に取得して"作成した"ものとしています
 jupyter notebookでは内部のイベントループが実行されており、新しくイベントループを作成すると競合します
"""
import asyncio

# 時間のかかる処理を模擬的に表現
async def my_task():
    print("Task is running")
    await asyncio.sleep(2) # 2秒かかる
    print("Task is done")

# get_running_loop()で実行中のイベントループを取得
loop = asyncio.get_running_loop() 
# my_task()を登録
my_task = loop.create_task(my_task())
# イベントループの実行(run_until_complete()もすでに動いているイベントループと競合するのでエラーとなる)
await my_task

# 結果
>>>Task is running
>>>Task is done

syncioライブラリには、asyncio.run()というタスク実行関数があります。こちらを用いると明示的にイベントループを取得せずとも、内部的(暗黙的)にイベントループを作成してタスクを管理します。上記のサンプルコード例では、イベントループとタスクの理解をすることを目的としてイベントループを作成しています。

また、asyncio.new_event_loop()を用いてイベントループを明示的に作成するとき、作成 > イベントループのセット > イベントループを閉じる、という手順となります。asyncio.run()ではloopを閉じる処理も不要です。

基本的な非同期処理の考え方・実行については以上です。まとめると以下の通りです。

  • 非同期処理では、イベントループがタスクの実行順序を管理し、効率的な処理を実現する
  • タスクは非同期処理(コルーチン)を実行するためのオブジェクトであり、イベントループに登録することで並行実行される
  • asyncio.new_event_loop()でイベントループを作成し、create_taskでタスクを登録する

実装例2(おまけ)

サンプルコードがとても簡素なものであったため、もう少し具体的に利用シーンを想定して実装をしてみたいと思います。非同期処理が用いられるケースの一つに、いくつかのリンクにリクエストを送りデータを取得する、といった処理があります。

そちらを模擬的に実装してみます。

  • https://example.com/1, https://example.com/2, https://example.com/3, の3つのurlからデータを取得する
  • 一つのurlからデータを取得するには3秒必要となる
  • 同期処理と非同期処理、で実行速度を比較する

※同期処理

import asyncio
import time

"""同期処理の場合"""
print("####同期処理の場合#####")

# urlにアクセスしてデータを取得する処理
def fetch_data(url):
    print(f"{url}, からデータを取得中...")
    time.sleep(3)  # データを取得する処理を模擬的に再現(時間がかかる部分)
    print(f"{url}, からのデータ取得が完了")
    return f"Data from {url}"

def main():
    # 対象のurlを設定(今回は模擬的に実装しているので仮のurlを指定)
    urls = [
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    ]

    # 時間計測(開始時刻)
    start_time = time.time()

    # urlごとにタスクリストを作成する
    results = []
    for url in urls:
      result = fetch_data(url)
      results.append(result)  # 全てのタスクの完了を待つ

    # 時間計測(終了時刻)
    end_time = time.time()

    print(f"消費時間: {end_time - start_time:.2f}")
    print(f"結果: {results}")

if __name__ == "__main__":
    main()

# 結果
>>>####同期処理の場合#####
>>>https://example.com/1, からデータを取得中...
>>>https://example.com/1, からのデータ取得が完了
>>>https://example.com/2, からデータを取得中...
>>>https://example.com/2, からのデータ取得が完了
>>>https://example.com/3, からデータを取得中...
>>>https://example.com/3, からのデータ取得が完了
>>>消費時間: 9.00 
>>>結果: ['Data from https://example.com/1', 'Data from https://example.com/2', 'Data from https://example.com/3']


※非同期処理

import asyncio
import time

"""非同期処理の場合"""
print("####非同期処理の場合#####")

# urlにアクセスしてデータを取得する処理
async def fetch_data(url):
    print(f"{url}, からデータを取得中...")
    await asyncio.sleep(3)  # データを取得する処理を模擬的に再現(時間がかかる部分)
    print(f"{url}, からのデータ取得が完了")
    return f"Data from {url}"

async def main():
    # 対象のurlを設定(今回は模擬的に実装しているので仮のurlを指定)
    urls = [
        "https://example.com/1",
        "https://example.com/2",
        "https://example.com/3",
    ]

    # 時間計測(開始時刻)
    start_time = time.time()

    # get_running_loop()で実行中のイベントループを取得
    loop = asyncio.get_running_loop() # 明示的にイベントループを可視化
    # urlごとにタスクリストを作成 > イベントループへ登録
    tasks = [loop.create_task(fetch_data(url)) for url in urls]
    # asyncio.gatherを使って、複数のタスクを同時に実行し、結果を待つ
    results = await asyncio.gather(*tasks)

    # 時間計測(終了時刻)
    end_time = time.time()

    print(f"消費時間: {end_time - start_time:.2f}")
    print(f"結果: {results}")

if __name__ == "__main__":
    await main()
    
# 結果
>>>####同期処理の場合#####
>>>https://example.com/1, からデータを取得中...
>>>https://example.com/1, からのデータ取得が完了
>>>https://example.com/2, からデータを取得中...
>>>https://example.com/2, からのデータ取得が完了
>>>https://example.com/3, からデータを取得中...
>>>https://example.com/3, からのデータ取得が完了
>>>消費時間: 3.01 
>>>結果: ['Data from https://example.com/1', 'Data from https://example.com/2', 'Data from https://example.com/3']
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?