Python
非同期処理
Python3
並列処理
AsyncAwait

Pythonの非同期I/O(asyncio)を試す

多くのプログラマが、以下構文に馴染んでいることでしょう。


  • forやif、whileといった、構造化プログラミングの構文

  • instance = class()やx = instance.method()、instance.property = yといった、オブジェクト指向の構文

しかし、asyncやawaitといった構文には、あまり馴染みがない人も多いのではないでしょうか。私もその一人です。

Pythonには、並列処理に利用できる構文が三つ用意されており、asyncやawaitはそのうちの一つ、非同期I/Oの構文となります。


  • multiprocessing:マルチプロセス

  • threading:マルチスレッド

  • asyncio:非同期I/O

本記事は、以下の内容を含みます。


  • Sleep Sort、Hello, World、"MapReduce"の実装

  • コルーチンやフューチャについての簡単な説明

  • 非同期I/Oでマルチプロセスの恩恵を受ける方法

ご理解の一助になれば幸いです。


Sleep Sort

Sleep Sortとは、ソートアルゴリズムの一種です。


具体的な手順は以下のように超シンプルなもの。


  1. 配列から値を読み込み、その値だけの時間スリープする

  2. スリープし終わったらその値を出力する

これを配列の全ての値に対して並列処理してやると、値の小さい方から順に出力される。

--@tobigitsuneさんの記事より


一般にはネタと思われているようです。しかし、並列処理を書かなければならないので、ハードルは高いのではないでしょうか。Qiitaでは、以下の人たちがSleep Sortを実装しているようです。Pythonでの実装には★をつけました。async / awaitを使った例はまだ無いようです。

早速ですが、以下、Pythonでの実装例と実行結果となります。コンソールからでも、Jupyter環境からでも実行できるように考慮してあります。

import asyncio

async def sleep_and_append(e, sorted_values):
await asyncio.sleep(e)
sorted_values.append(e)

def sleep_sort(array):
loop = asyncio.get_event_loop()
loop_was_not_running = not loop.is_running()
sorted_values = []
def print_sorted_values(future):
print(sorted_values)
if loop_was_not_running:
loop.stop()
coroutines = (sleep_and_append(e, sorted_values) for e in array)
future = asyncio.gather(*coroutines)
asyncio.ensure_future(future)
future.add_done_callback(print_sorted_values)
if loop_was_not_running:
loop.run_forever()

if __name__ == '__main__':
array = [5, 3, 6, 3, 6, 3, 1, 4, 7]
sleep_sort(array)

[1, 3, 3, 3, 4, 5, 6, 6, 7]

async defによって定義されるコルーチンは、ジェネレータと似ていて、コールした時点では、処理が実行されず、コルーチンオブジェクトが生成されます。ファクトリ関数と捉えることもできるでしょうが——「実行が遅延される」と捉えたほうが、個人的にはより良く理解できると思います。コルーチンが実行されるのは、イベントループが走った時点です。コルーチンは、await式で実行が一時停止し、またそこから再開するという、サブルーチンには無い性質を持ちます。await式に指定できるのは、awaitableなオブジェクトに限ります。

並列実行を行うため、asyncio.gatherにより、複数のコルーチンオブジェクトから、一つのフューチャオブジェクトを生成しています。フューチャは、繰り延べられる(deferred)もの——言うなれば「将来の結果」といったところでしょうか。生成された時点では結果が入っていません。asyncio.ensure_futureによって実行をスケジューリング。イベントループが走り、コルーチンの実行が完了し次第、結果が格納されます。Sleep Sortの例ではフューチャ内の結果を参照していませんが、のちに結果を参照する例も見ます。結果を参照する代わりにフューチャに完了時コールバック関数を登録し、コールバック関数内でソート済配列をプリントしています。print_sorted_valuesが、いわゆるクロージャであり、自身が定義された環境内の変数への参照をもつ点にはご留意ください。


Hello, World

世界に挨拶してみましょう。構造は先ほどと同じです。実装例と実行結果を示します。コルーチン内でコンソールに出力すると共に、値を返して、コールバック関数print_hello_world内でフューチャから結果を取り出し、プリントしています。結果はどうなるでしょうか。

import asyncio

import sys

async def sleep_and_write(char, time=0):
await asyncio.sleep(time)
sys.stdout.write(char)
return char

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop_was_not_running = not loop.is_running()
def print_hello_world(future):
print(''.join(future.result()))
if loop_was_not_running:
loop.stop()
coroutines = (
sleep_and_write("w", 0.8),
sleep_and_write("l", 1.1),
sleep_and_write("r", 1.0),
sleep_and_write("o", 0.9),
sleep_and_write("d", 1.2),
sleep_and_write("l", 0.3),
sleep_and_write("e", 0.1),
sleep_and_write(",", 0.6),
sleep_and_write("h"),
sleep_and_write(" ", 0.7),
sleep_and_write("o", 0.5),
sleep_and_write("l", 0.2),
sleep_and_write("\n", 1.3))
future = asyncio.gather(*coroutines)
asyncio.ensure_future(future)
future.add_done_callback(print_hello_world)
if loop_was_not_running:
loop.run_forever()

hello, world

wlrodle,h ol

異なる結果が得られましたね。asyncio.gatherのドキュメントには、以下の記載があります。


the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival)


直訳すると、返されるフューチャの結果は、結果のリストである(元々のシーケンスの順番であり、必ずしも結果が到着した順番ではない)ということです。並列処理でありながら、順番を保持してくれるのは、なかなか有用な性質ではないでしょうか。


"MapReduce"

以下のコードは、0から10億の数字を四分割して並列実行で合計し、それぞれの結果を最後に合計する例です。asyncioでありながら、マルチプロセスで処理を実行するテクニックを使っています。いわば、PC内で、MapReduceを行なうようなものです。マルチコア環境では、normal版とasync版とで、実行時間が異なるはずです。

import asyncio

import concurrent.futures
from datetime import datetime

async def sum_(start, stop):
print('{}, start[{}-{}]'.format(datetime.now(), start, stop))
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as executor:
x = await loop.run_in_executor(executor, sum, range(start, stop))
print('{}, end[{}-{}]'.format(datetime.now(), start, stop))
return x

def async_sum():
print('{}, async_sum start'.format(datetime.now()))
loop = asyncio.get_event_loop()
loop_was_not_running = not loop.is_running()
def reduce(future):
print(sum(future.result()))
if loop_was_not_running:
loop.stop()
print('{}, async_sum end'.format(datetime.now()))
coroutines = (
sum_(0, 250000000),
sum_(250000000, 500000000),
sum_(500000000, 750000000),
sum_(750000000, 1000000000))
future = asyncio.gather(*coroutines)
asyncio.ensure_future(future)
future.add_done_callback(reduce)
if loop_was_not_running:
loop.run_forever()

def normal_sum():
print('{}, normal_sum start'.format(datetime.now()))
print(sum(range(0, 1000000000)))
print('{}, normal_sum end'.format(datetime.now()))

if __name__ == '__main__':
normal_sum()
async_sum()

2018-10-09 18:32:05.336274, normal_sum start

499999999500000000
2018-10-09 18:32:22.588707, normal_sum end
2018-10-09 18:32:22.588747, async_sum start
2018-10-09 18:32:22.594261, start[250000000-500000000]
2018-10-09 18:32:22.612808, start[0-250000000]
2018-10-09 18:32:22.638820, start[750000000-1000000000]
2018-10-09 18:32:22.673976, start[500000000-750000000]
2018-10-09 18:32:31.142530, end[250000000-500000000]
2018-10-09 18:32:31.211867, end[750000000-1000000000]
2018-10-09 18:32:31.220176, end[0-250000000]
2018-10-09 18:32:31.236131, end[500000000-750000000]
499999999500000000
2018-10-09 18:32:31.236306, async_sum end

期待どおり、筆者の環境(コア数2)では、normal版が17.3秒、async版が8.53秒と、実行時間が半分となりました。

実装のポイントは、ProcessPoolExecutorオブジェクトを生成し、run_in_executorの第一引数に指定することです。run_in_executorにより、通常の関数をawaitableなオブジェクト(コルーチン)に変換しているのも、テクニックと言えるでしょう。


まとめ

馴染んでしまえば、有用そうです。道具箱にお加えになってはいかがでしょうか。本記事がその一助になれば幸いです。


参考サイト