#本稿について
Pythonバージョン3.2から追加された,concurrent.futuresモジュールの使い方を備忘録としてまとめる.
concurrent.futures
モジュールは結論から言ってしまえば,マルチスレッド,マルチプロセス両方のインターフェースを提供する.
#どんな場面で使われるか?
Q. 並行問題に対して,非同期アプリケーションに合わない場合やどうしていいかわからない場合には,どうするか?
A. 問題対象となる部分の処理をスレッドあるいはプロセスに委譲します.委譲した処理をコルーチンのように見せかけて,制御をイベントループに解放し,最終的な結果を処理します.
このAを実現するために用意されたのが concurrent.future
モジュールです.
#なにができるのか?
concurrent.future
モジュールは asycio
モジュールにも統合されており,この2つのモジュールを組み合わせることで,マルチスレッドやマルチプロセスで実行されるブロッキング関数を非同期のノンブロッキングコルーチンのように使用できます.
どのように使うのか?
concurrent.future
モジュールには,Executor
オブジェクトとFuture
オブジェクトがあります.
Executor
クラスについて
Executor
は並列に作業項目を処理できるリソースのプールを表しています. multiprocessing
モジュールの Pool
クラスの目的と似ているようですが,インターフェスと設計が異なります.
Executor
クラスはインスタンス化されていない基底クラスです.Executor
クラスには,以下の2つのサブクラスが存在します.
-
ThreadPoolExecutor
:スレッドプールを指定して,非同期処理を行います -
ProcessPollExecutor
:プロセスプールを指定して,非同期処理を行います.
これらのことから,Executor
クラスからマルチスレッドとマルチプロセスの両方のインターフェスを提供しているといえます.
これらのクラスは3つメソッドを提供します.
-
submit(fn, *args, **kwargs)
:fn関数をリソースプールで実行するようにスケジュールし,Future
オブジェクトを返す -
map(func, *iterables, timeout=None, chunksize=1)
:multiprocessing.Pool.map()
メソッドと同様に,func
関数をiterableなオブジェクトの要素をそれぞれに対して実行します. -
shutdown(wait=True)
:Executor
をシャットダウンし,全てのリソースを解放します.
以下,実装例.
from gmaps import Geocoding
from concurrent.futures import ThreadPoolExecutor
api = Geocoding(api_key='maruhi')
PLACES = (
'Reykjavik', 'Vien', 'Zadar',
'Venice', 'Wrocow', 'Bolognia',
'Berlin', 'Dehil', 'New York',
'Osaka'
)
POOL_SIZE = 4
def fetch_place(place):
return api.geocode(place)[0]
def present_result(geocoded):
print("{:s}, {:6.2f}, {:6.2f}".format(
geocoded['formatted_address'],
geocoded['geometry']['location']['lat'],
geocoded['geometry']['location']['lng'],
))
def main():
with ThreadPoolExecutor(POOL_SIZE) as executor:
results = executor.map(fetch_place, PLACES)
print(type(results))
print(results)
for result in results:
present_result(result)
if __name__ == "__main__":
main()
$python geocoding_by_concurrentfutures.py
<class 'generator'>
<generator object _chain_from_iterable_of_lists at 0x000001E2A3CED9C8>
Reykjavík, Iceland, 64.15, -21.94
3110 Glendale Blvd, Los Angeles, CA 90039, USA, 34.12, -118.26
Zadar, Croatia, 44.12, 15.23
Venice, Metropolitan City of Venice, Italy, 45.44, 12.32
Wrocław, Poland, 51.11, 17.04
Bologna, Metropolitan City of Bologna, Italy, 44.49, 11.34
Berlin, Germany, 52.52, 13.40
Delhi, India, 28.70, 77.10
New York, NY, USA, 40.71, -74.01
Osaka, Japan, 34.69, 135.50
Future
クラスについて
Future
クラスはExecutor.submit()
関数によって生成されます.
Future
オブジェクトは呼び出し可能なオブジェクトの非同期実行を管理し,処理結果を示します.登録した呼び出し可能オブジェクトの戻り値は, Future.result()
メソッドで取得します.終了していない場合,結果が準備できるまでブロックします.result()
メソッドによる結果の取得は,処理の終了後である必要はなく, result()
は終了を待って,値を返します.
[(公式)Futureオブジェクトについて]https://docs.python.org/ja/3/library/concurrent.futures.html#future-objects
from concurrent.futures import ThreadPoolExecutor
def loudy_return():
print("processing")
return 42
with ThreadPoolExecutor(1) as executor:
future = executor.submit(loudy_return)
print(future)
print(future.result())
$python sample_concurrent_futures.py
processing
<Future at 0x27f17bd76c8 state=finished returned int>
42
##参考文献