#目的
Python公式ドキュメントを参考にasyncioライブラリのイベントループを理解する。
#環境
python 3.7.4
#注意
import asyncio
を忘れずに。本稿のサンプルコードでは省略しています。
#イベントループメソッド
##イベントループの取得
#####asyncio.get_event_loop()
カレントイベントループを取得。カレントスレッドにカレントイベントループがなければ自動的にイベントループを作り,それをカレントイベントループに設定する。一度設定し,再度作っても同じカレントイベントループになる。
loop = asyncio.get_event_loop()
loop_2 = asyncio.get_event_loop()
print(loop is loop_2) # True
#####asyncio.new_event_loop()
新しいイベントループを作る。ただしカレントイベントループではない。
new_loop = asyncio.new_event_loop()
#####asyncio.set_event_loop(loop)
引数loopをカレントイベントループに設定する。
new_loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop(new_loop)
#####asyncio.get_running_loop()
カレントスレッドで実行中のイベントループを返す。もしイベントループがなければ例外(RuntimeError)を発生させる。この関数はコルーチンもしくはコールバックからでのみ使用可能。
async def main():
print(asyncio.get_running_loop()) # <_UnixSelectorEventLoop running=True closed=False debug=False>
print('Hello') # Hello
await asyncio.sleep(1)
print('World') # World
asyncio.run(main())
##ループの実行と停止
#####loop.run_until_complete(future)
引数future(Futureのインスタンス)が完了するまで実行する。引数がコルーチンの場合は,asyncio.Task
として実行するまで予約される。Futureの戻り値か例外を返す。
async def main():
print('Hello') # Hello
await asyncio.sleep(1)
print('World') # World
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
#####loop.run_forever()
loop.stop()
が呼ばれるまでループし続ける。
def main(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(main, loop)
try:
loop.run_forever()
finally:
loop.close()
#####loop.stop()
イベントループを停止する。
#####loop.is_running()
イベントループが実行中ならTrue
を返す。
#####loop.is_closed()
イベントループが閉じられたらTrue
を返す。
#####loop.close()
イベントループを閉じる。
#####(コルーチン関数)loop.shutdown_asyncgens()
非同期ジェネレーターオブジェクトを閉じる。
async def my_async_generator(iterable):
for num in iterable:
await asyncio.sleep(1)
yield num
async def main():
generator = my_async_generator([1, 2, 3, 4, 5])
async for item in generator:
print(item)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.call_soon(loop.shutdown_asyncgens)
loop.close()
##コールバックのスケジューリング
#####loop.call_soon(callback, *args, context=None)
引数callbackをイベントキューに入れる。引数**argsはcallback*の引数。 callbackは入れた順に呼び出される(FIFO)。スレッドセーフではないのでマルチスレッドでの使用する際は注意が必要。
loop = asyncio.get_event_loop()
loop.call_soon(print, 'Hello') # Hello
loop.call_soon(print, 'World') # World
loop.call_soon(loop.stop)
loop.run_forever()
loop.close()
#####loop.call_soon_threadsafe(callback, *args, context=None)
マルチスレッドでスレッドセーフにcallbackを呼び出す時に使用。
import threading
import asyncio
def say_hello():
print('Hello')
def say_world():
print('World')
loop_thread1 = asyncio.new_event_loop()
loop_thread2 = asyncio.new_event_loop()
def run(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
loop_thread1.call_soon_threadsafe(say_hello)
loop_thread2.call_soon_threadsafe(say_world)
loop_thread1.call_soon_threadsafe(loop_thread1.stop)
loop_thread2.call_soon_threadsafe(loop_thread2.stop)
thread1 = threading.Thread(target=run, args=(loop_thread1,))
thread2 = threading.Thread(target=run, args=(loop_thread2,))
thread1.start()
thread2.start()
# Hello
# World
##遅延コールバックのスケジューリング
#####loop.call_later(delay, callback, *args, context=None)
引数delayに遅らせたい時間(秒)を設定する。そうすることでdelay秒後にcallbackが呼ばれる。この関数を用いる事で自前のasyncio.sleep()
を作ることができる。
def set_result_unless_cancelled(future, result):
if future.cancelled():
return result
future.set_result(result)
# 自前のasyncio.sleep()
async def my_asyncio_sleep(delay, result=None):
if delay <= 0:
return result
loop = asyncio.get_event_loop()
future = loop.create_future()
h = loop.call_later(delay,
set_result_unless_cancelled,
future,
result)
try:
return await future
finally:
h.cancel()
#####loop.call_at(when、callback、* args、context = None)
loop.time()
からの経過時間後にcallbackを返す。call_later()
は内部でcall_at()
を呼び出しているので実質的にcall_later()
と同じ。
async def my_asyncio_sleep(delay, result=None):
if delay <= 0:
return result
loop = asyncio.get_event_loop()
future = loop.create_future()
# h = loop.call_later(delay, set_result_unless_cancelled, future, result)
h = loop.call_at(loop.time() + delay, set_result_unless_cancelled, future, result)
try:
return await future
finally:
h.cancel()
#####loop.time()
モノトニック時刻を返す。
##FutureとTaskオブジェクトの作製
#####loop.create_future()
Futureオブジェクトを作製する。
loop = asyncio.get_event_loop()
loop.create_future()
#####loop.create_task(coro)
引数coroはコルーチンを指す。コルーチンを実行予約のリストに格納する。Taskオブジェクトを返す。
import asyncio
import time
# firstからend-1までの和を求めて
# asyncio.sleep()で1秒待機
async def async_do_sum(first, end):
result = sum(range(first, end))
print(result)
await asyncio.sleep(1)
async def async_main():
t1 = time.time()
# Taskオブジェクトの作製
task1 = asyncio.create_task(async_do_sum(1, 10 ** 2 + 1))
task2 = asyncio.create_task(async_do_sum(1, 10 ** 4 + 1))
task3 = asyncio.create_task(async_do_sum(1, 10 ** 6 + 1))
await asyncio.wait({task1, task2, task3})
t2 = time.time()
print('計算に{:.2f}秒かかりました'.format((t2 - t1)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
# 5050
# 50005000
# 500000500000
# 計算に1.04秒かかりました
#####loop.set_task_factory(factory)
タスクファクトリーの作製。引数factoryの引数は(loop, coro)
でなければいけない。詳しくはこちら。
import asyncio
def set_value(value):
task = asyncio.current_task()
setattr(task, 'value', value)
def get_value():
task = asyncio.current_task()
return getattr(task, 'value', None)
async def do_sum():
value = get_value()
sum_value = sum(range(1, value + 1))
print(sum_value)
await asyncio.sleep(1)
async def do_fibonacci():
value = get_value()
print([i for i in fibonacci(value)])
await asyncio.sleep(1)
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
def task_factory(loop, coro):
child_task = asyncio.Task(coro, loop=loop)
parent_task = asyncio.current_task(loop=loop)
current_task = getattr(parent_task, 'value', None)
setattr(child_task, 'value', current_task)
return child_task
async def main(value):
set_value(value)
await asyncio.gather(do_sum(),
do_fibonacci(),
)
loop = asyncio.get_event_loop()
loop.set_task_factory(task_factory)
loop.run_until_complete(main(value=10))
loop.close()
# 55
# [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
#####loop.get_task_factory()
現在のタスクファクトリーを返す。
ここで一旦アップします。時間を見つけて更新します。
#参考文献
Python公式リファレンス(イベントループ)
Context information storage for asyncio