concurrent.futures
http://docs.python.jp/3/library/concurrent.futures.html
Python3.2で新たに標準となったパッケージで、マルチスレッド、マルチプロセスによる並列タスク処理を簡単に実装できる。
Executorという基底クラスを継承する形でThreadPoolExecutor、ProcessPoolExecutorが実装されており、どちらを使ってもほぼ同じインタフェースで書ける。
インストール
Python3.2では標準パッケージなのでインストールの必要無し。
2.6以上向けにBackportが提供されている。
pip install futures
サンプルコード
import concurrent.futures
import hashlib
def digest(t): # 適当にCPU資源を消費するための関数
hash = hashlib.sha256()
for i in range(t*1000000):
hash.update('hogehoge')
return hash.hexdigest()
if __name__=='__main__':
task_list = [1,1,1,2,2,3]
# Executorオブジェクトを作成
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# Executorオブジェクトにタスクをsubmitし、同数だけfutureオブジェクトを得る。
# タスクの実行は、submit()を呼び出した瞬間から開始される。
futures = [executor.submit(digest,t) for t in task_list]
# 各futureの完了を待ち、結果を取得。
# as_completed()は、与えられたfuturesの要素を完了順にたどるイテレータを返す。
# 完了したタスクが無い場合は、ひとつ完了するまでブロックされる。
for future in concurrent.futures.as_completed(futures):
print(future.result()) # digest()の戻り値が表示される。
# すべてのタスクの完了を待ち、後始末をする。
# 完了していないタスクがあればブロックされる。
# (上でas_completedをすべてイテレートしているので、実際にはこの時点で完了していないタスクは無いはず。)
executor.shutdown()
ProcessPoolExecutor
をThreadPoolExecutor
に置き換えれば、そのままマルチプロセスでなくマルチスレッドで動作する。
注意点
-
ProcessPoolExecutorはプロセス間通信によって実現しているため、いくつか制限がある。
- 関数の引数および戻り値は、pickleを使ってシリアライズ可能なオブジェクトでなければならない。
- 関数自体もプロセス間で渡せなければならない。インスタンスメソッドはだめ。ラムダ式はOK。
- 関数の中で副作用としてグローバル変数を書き換えるなどしても、呼び元のプロセスには反映されない。
-
CPythonはGlobal Interpreter Lockを実装しているので、ひとつのプロセス内で複数のスレッドがPythonコードを同時に実行することができない。上のサンプルコードのようにPythonコードをガリガリ実行するタスクの場合はThreadPoolExecutorで並列化してもほぼ逐次処理になるので、実行時間の恩恵はあまり無い。(通信待ちやIO待ちの多い処理なら効果がある。)