25
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【Python】Executor,Futureクラスについて

Last updated at Posted at 2019-11-25

#本稿について
Pythonバージョン3.2から追加された,concurrent.futuresモジュールの使い方を備忘録としてまとめる.

concurrent.futuresモジュールは結論から言ってしまえば,マルチスレッド,マルチプロセス両方のインターフェースを提供する.

#どんな場面で使われるか?

Q. 並行問題に対して,非同期アプリケーションに合わない場合やどうしていいかわからない場合には,どうするか?

A. 問題対象となる部分の処理をスレッドあるいはプロセスに委譲します.委譲した処理をコルーチンのように見せかけて,制御をイベントループに解放し,最終的な結果を処理します.

このAを実現するために用意されたのが concurrent.future モジュールです.

#なにができるのか?

concurrent.future モジュールは asycio モジュールにも統合されており,この2つのモジュールを組み合わせることで,マルチスレッドやマルチプロセスで実行されるブロッキング関数を非同期のノンブロッキングコルーチンのように使用できます.

どのように使うのか?

concurrent.futureモジュールには,ExecutorオブジェクトとFutureオブジェクトがあります.

Executor クラスについて

Executor は並列に作業項目を処理できるリソースのプールを表しています. multiprocessing モジュールの Pool クラスの目的と似ているようですが,インターフェスと設計が異なります.

Executor クラスはインスタンス化されていない基底クラスですExecutorクラスには,以下の2つのサブクラスが存在します.

  • ThreadPoolExecutor :スレッドプールを指定して,非同期処理を行います
  • ProcessPollExecutor :プロセスプールを指定して,非同期処理を行います.

これらのことから,Executorクラスからマルチスレッドとマルチプロセスの両方のインターフェスを提供しているといえます.

これらのクラスは3つメソッドを提供します.

  • submit(fn, *args, **kwargs) :fn関数をリソースプールで実行するようにスケジュールし, Future オブジェクトを返す
  • map(func, *iterables, timeout=None, chunksize=1) : multiprocessing.Pool.map() メソッドと同様に, func 関数をiterableなオブジェクトの要素をそれぞれに対して実行します.
  • shutdown(wait=True) : Executor をシャットダウンし,全てのリソースを解放します.

以下,実装例.

geocoding_by_concurrentfutures.py
from gmaps import Geocoding
from concurrent.futures import ThreadPoolExecutor

api = Geocoding(api_key='maruhi')

PLACES = (
    'Reykjavik', 'Vien', 'Zadar',
    'Venice', 'Wrocow', 'Bolognia',
    'Berlin', 'Dehil', 'New York',
    'Osaka'
)

POOL_SIZE = 4


def fetch_place(place):
    return api.geocode(place)[0]


def present_result(geocoded):
    print("{:s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))


def main():

    with ThreadPoolExecutor(POOL_SIZE) as executor:
        results = executor.map(fetch_place, PLACES)
        print(type(results))
        print(results)
    for result in results:
        present_result(result)


if __name__ == "__main__":
    main()
$python geocoding_by_concurrentfutures.py

<class 'generator'>
<generator object _chain_from_iterable_of_lists at 0x000001E2A3CED9C8>
Reykjavík, Iceland,  64.15, -21.94
3110 Glendale Blvd, Los Angeles, CA 90039, USA,  34.12, -118.26
Zadar, Croatia,  44.12,  15.23
Venice, Metropolitan City of Venice, Italy,  45.44,  12.32
Wrocław, Poland,  51.11,  17.04
Bologna, Metropolitan City of Bologna, Italy,  44.49,  11.34
Berlin, Germany,  52.52,  13.40
Delhi, India,  28.70,  77.10
New York, NY, USA,  40.71, -74.01
Osaka, Japan,  34.69, 135.50

Futureクラスについて

FutureクラスはExecutor.submit()関数によって生成されます.
Future オブジェクトは呼び出し可能なオブジェクトの非同期実行を管理し,処理結果を示します.登録した呼び出し可能オブジェクトの戻り値は, Future.result() メソッドで取得します.終了していない場合,結果が準備できるまでブロックします.result() メソッドによる結果の取得は,処理の終了後である必要はなく, result() は終了を待って,値を返します.

[(公式)Futureオブジェクトについて]https://docs.python.org/ja/3/library/concurrent.futures.html#future-objects

sample_concurrent_futures.py
from concurrent.futures import ThreadPoolExecutor

def loudy_return():

    print("processing")
    return 42

with ThreadPoolExecutor(1) as executor:

    future = executor.submit(loudy_return)

print(future)
print(future.result())


$python sample_concurrent_futures.py
processing
<Future at 0x27f17bd76c8 state=finished returned int>
42

##参考文献

25
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?