LoginSignup
34
39

More than 3 years have passed since last update.

【Python】asyncio(非同期I/O)のイベントループをこねくり回す

Posted at

目的

Python公式ドキュメントを参考にasyncioライブラリのイベントループを理解する。

環境

python 3.7.4

注意

import asyncioを忘れずに。本稿のサンプルコードでは省略しています。

イベントループメソッド

イベントループの取得

asyncio.get_event_loop()

カレントイベントループを取得。カレントスレッドにカレントイベントループがなければ自動的にイベントループを作り,それをカレントイベントループに設定する。一度設定し,再度作っても同じカレントイベントループになる。

loop = asyncio.get_event_loop()
loop_2 = asyncio.get_event_loop()
print(loop is loop_2) # True
asyncio.new_event_loop()

新しいイベントループを作る。ただしカレントイベントループではない。

new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

引数loopをカレントイベントループに設定する。

new_loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop(new_loop)
asyncio.get_running_loop()

カレントスレッドで実行中のイベントループを返す。もしイベントループがなければ例外(RuntimeError)を発生させる。この関数はコルーチンもしくはコールバックからでのみ使用可能。

async def main():
    print(asyncio.get_running_loop()) # <_UnixSelectorEventLoop running=True closed=False debug=False>
    print('Hello') # Hello
    await asyncio.sleep(1)
    print('World') # World

asyncio.run(main())

ループの実行と停止

loop.run_until_complete(future)

引数future(Futureのインスタンス)が完了するまで実行する。引数がコルーチンの場合は,asyncio.Taskとして実行するまで予約される。Futureの戻り値か例外を返す。

async def main():
    print('Hello') # Hello
    await asyncio.sleep(1)
    print('World') # World

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
loop.run_forever()

loop.stop()が呼ばれるまでループし続ける。

def main(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()
loop.call_soon(main, loop)

try:
    loop.run_forever()
finally:
    loop.close()
loop.stop()

イベントループを停止する。

loop.is_running()

イベントループが実行中ならTrueを返す。

loop.is_closed()

イベントループが閉じられたらTrueを返す。

loop.close()

イベントループを閉じる。

(コルーチン関数)loop.shutdown_asyncgens()

非同期ジェネレーターオブジェクトを閉じる。

async def my_async_generator(iterable):
    for num in iterable:
        await asyncio.sleep(1)
        yield num

async def main():
    generator = my_async_generator([1, 2, 3, 4, 5])
    async for item in generator:
        print(item)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.call_soon(loop.shutdown_asyncgens)
    loop.close()

コールバックのスケジューリング

loop.call_soon(callback, *args, context=None)

引数callbackをイベントキューに入れる。引数*argscallbackの引数。 callbackは入れた順に呼び出される(FIFO)。スレッドセーフではないのでマルチスレッドでの使用する際は注意が必要。

loop = asyncio.get_event_loop()
loop.call_soon(print, 'Hello') # Hello
loop.call_soon(print, 'World') # World
loop.call_soon(loop.stop)
loop.run_forever()
loop.close()
loop.call_soon_threadsafe(callback, *args, context=None)

マルチスレッドでスレッドセーフにcallbackを呼び出す時に使用。

import threading
import asyncio

def say_hello():
    print('Hello')

def say_world():
    print('World')

loop_thread1 = asyncio.new_event_loop()
loop_thread2 = asyncio.new_event_loop()

def run(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

loop_thread1.call_soon_threadsafe(say_hello)
loop_thread2.call_soon_threadsafe(say_world)

loop_thread1.call_soon_threadsafe(loop_thread1.stop)
loop_thread2.call_soon_threadsafe(loop_thread2.stop)

thread1 = threading.Thread(target=run, args=(loop_thread1,))
thread2 = threading.Thread(target=run, args=(loop_thread2,))
thread1.start()
thread2.start()

# Hello
# World

遅延コールバックのスケジューリング

loop.call_later(delay, callback, *args, context=None)

引数delayに遅らせたい時間(秒)を設定する。そうすることでdelay秒後にcallbackが呼ばれる。この関数を用いる事で自前のasyncio.sleep()を作ることができる。

def set_result_unless_cancelled(future, result):
    if future.cancelled():
        return result
    future.set_result(result)

# 自前のasyncio.sleep()
async def my_asyncio_sleep(delay, result=None):
    if delay <= 0:
        return result

    loop = asyncio.get_event_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                    set_result_unless_cancelled,
                    future,
                    result)
    try:
        return await future
    finally:
        h.cancel()
loop.call_at(when、callback、* args、context = None)

loop.time()からの経過時間後にcallbackを返す。call_later()は内部でcall_at()を呼び出しているので実質的にcall_later()と同じ。

async def my_asyncio_sleep(delay, result=None):
    if delay <= 0:
        return result

    loop = asyncio.get_event_loop()
    future = loop.create_future()
    # h = loop.call_later(delay, set_result_unless_cancelled, future, result)
    h = loop.call_at(loop.time() + delay, set_result_unless_cancelled, future, result)
    try:
        return await future
    finally:
        h.cancel()
loop.time()

モノトニック時刻を返す。

FutureとTaskオブジェクトの作製

loop.create_future()

Futureオブジェクトを作製する。

loop = asyncio.get_event_loop()
loop.create_future()
loop.create_task(coro)

引数coroはコルーチンを指す。コルーチンを実行予約のリストに格納する。Taskオブジェクトを返す。

import asyncio
import time

# firstからend-1までの和を求めて
# asyncio.sleep()で1秒待機
async def async_do_sum(first, end):
    result = sum(range(first, end))
    print(result)
    await asyncio.sleep(1)

async def async_main():
    t1 = time.time()

    # Taskオブジェクトの作製
    task1 = asyncio.create_task(async_do_sum(1, 10 ** 2 + 1))
    task2 = asyncio.create_task(async_do_sum(1, 10 ** 4 + 1))
    task3 = asyncio.create_task(async_do_sum(1, 10 ** 6 + 1))

    await asyncio.wait({task1, task2, task3})

    t2 = time.time()

    print('計算に{:.2f}秒かかりました'.format((t2 - t1)))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())

# 5050
# 50005000
# 500000500000
# 計算に1.04秒かかりました
loop.set_task_factory(factory)

タスクファクトリーの作製。引数factoryの引数は(loop, coro)でなければいけない。詳しくはこちら

import asyncio

def set_value(value):
    task = asyncio.current_task()
    setattr(task, 'value', value)

def get_value():
    task = asyncio.current_task()
    return getattr(task, 'value', None)

async def do_sum():
    value = get_value()
    sum_value = sum(range(1, value + 1))
    print(sum_value)
    await asyncio.sleep(1)

async def do_fibonacci():
    value = get_value()
    print([i for i in fibonacci(value)])
    await asyncio.sleep(1)

def fibonacci(n):
    a, b = 0, 1

    for _ in range(n):
        yield a
        a, b = b, a + b

def task_factory(loop, coro):
    child_task = asyncio.Task(coro, loop=loop)
    parent_task = asyncio.current_task(loop=loop)
    current_task = getattr(parent_task, 'value', None)
    setattr(child_task, 'value', current_task)

    return child_task

async def main(value):
    set_value(value)

    await asyncio.gather(do_sum(),
                         do_fibonacci(),
                         )

loop = asyncio.get_event_loop()
loop.set_task_factory(task_factory)
loop.run_until_complete(main(value=10))
loop.close()

# 55
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
loop.get_task_factory()

現在のタスクファクトリーを返す。


ここで一旦アップします。時間を見つけて更新します。

参考文献

Python公式リファレンス(イベントループ)
Context information storage for asyncio

34
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
39