PythonのThread、Process等の振る舞いの確認(threading, multiprocessing, concurrent.futures)

  • 22
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

初めてQiitaに投稿です。別にこの情報は新しくもなんともないと思われますが、お試しで投稿してみます。説明も少なく、親切な投稿ではありません。

Pythonでスレッド(もしくはプロセス)を生成してやりたいことがあったので振る舞いを確認してみました。(CPythonのコードの中身を読まずに実験的にです。)

まず、本投稿とは関係ないのですが、ThreadingとProcessingの違いについての私の理解(間違っているかもしれないのでむやみに信じないでください。)に触れておきます。

  • Processは非同期のプロセスを生成するときに使用しますが、各生成プロセスごとに新しいメモリ空間を作り(実際はPickleして送ってるっぽい)、そこで処理を実行します。だから、あまり大きなメモリのコピーが発生するような場合は向かない。複数のプロセスが同時に動くことができる。
  • Threadは非同期のスレッドを生成し、同じメモリ空間上で処理を実行します。コピーが発生しないので、大きいデータを共有して実行するのに向く。しかしCPythonはGIL(ギル、Global Interpreter Lock:ごめんなさいぐぐってください)なので、I/Oの時にGILのリリースを行うとき以外は複数スレッドが同時に動くことはありません。そのため、I/O-boundなアプリケーションに向いています。

しかし、上の情報はあまり今回の投稿とは関係なく、この確認実験のモチベーションは、実際にProcessやThreadを使ってみたときに遭遇した謎の振る舞い(ドキュメント読めば書いてあるかも)を明らかにするためです。その謎の振る舞いですが、以下のとおりです。

  • スレッド/プロセス内で例外が発生したときに、呼んだAPIによっては、メインスレッドでは例外を投げることもなく、処理が走り続ける。
  • threading.active_count()関数で残存スレッドを見ると、スレッド処理が終了しているのに、何故かゾンビになって生き残り続ける奴らがいる。

これらの振る舞いを理解すべく、以下のスクリプトで振る舞いを確認してみました。今回はThread/Processについてmapで複数処理を投げた場合、startで単一処理を投げた場合、Python 3.2?から追加されたconcurrent.futures(Python 2.xではpip install futuresでインストール可)を利用して同等の処理を実行した場合について確認しています。

python_thread_process.py
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing
from multiprocessing.pool import ThreadPool
import threading
import time


def bar(i=0):
    if i == 0:
        raise ValueError("bar raise")
    return i ** 2


def main_Thread():
    thread = threading.Thread(target=bar)
    thread.start()
    thread.join()
    raise RuntimeError("Exception not caught")


def main_ThreadPool():
    p = ThreadPool(4)
    for i in p.map(bar, xrange(4)):
        print i
    raise RuntimeError("Exception not caught")


def main_ThreadPoolExecutorMap():
    with ThreadPoolExecutor(4) as ex:
        for i in ex.map(bar, xrange(4)):
            print i
    raise RuntimeError("Exception not caught")


def main_ThreadPoolExecutorSubmit():
    with ThreadPoolExecutor(4) as ex:
        s = ex.submit(bar)
        print s.result()
    raise RuntimeError("Exception not caught")


def main_Process():
    thread = multiprocessing.Process(target=bar)
    thread.start()
    thread.join()
    raise RuntimeError("Exception not caught")


def main_ProcessPool():
    p = multiprocessing.Pool(4)
    for i in p.map(bar, xrange(4)):
        print i
    raise RuntimeError("Exception not caught")


def main_ProcessPoolExecutorMap():
    with ProcessPoolExecutor(4) as ex:
        for i in ex.map(bar, xrange(4)):
            print i
    raise RuntimeError("Exception not caught")


def main_ProcessPoolExecutorSubmit():
    with ProcessPoolExecutor(4) as ex:
        s = ex.submit(bar, 0)
        print s.result()
    raise RuntimeError("Exception not caught")


def run(fun):
    ac = threading.active_count()
    try:
        fun()
    except RuntimeError:
        print fun.__name__, "[NOT raised]"
    except ValueError:
        print fun.__name__, "[RAISED]"
    time.sleep(1)
    print "Zombie thread:", threading.active_count() - ac

if __name__ == '__main__':
    run(main_Thread)
    run(main_ThreadPool)
    run(main_ThreadPoolExecutorMap)
    run(main_ThreadPoolExecutorSubmit)
    run(main_Process)
    run(main_ProcessPool)
    run(main_ProcessPoolExecutorMap)
    run(main_ProcessPoolExecutorSubmit)

各main関数では、確実にValueError例外を投げるbar関数を各種APIを使って実行させて、メインスレッドで特に例外が投げられていない場合にRuntimeErrorを投げるという仕組みになっており、run関数ではValueErrorRuntimeErrorのどちらを受け取ったかで、Thread/Processで例外がどのように扱われたかを見ています。また、ゾンビスレッドについてはthreading.active_count()を使って確認しています。(一応sleepを入れた。)

結果はコチラ。

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/takuya/anaconda/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/Users/takuya/anaconda/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "example_thread_process.py", line 10, in bar
    raise ValueError("bar raise")
ValueError: bar raise

main_Thread [NOT raised]
Zombie thread: 0
main_ThreadPool [RAISED]
Zombie thread: 7
main_ThreadPoolExecutorMap [RAISED]
Zombie thread: 0
main_ThreadPoolExecutorSubmit [RAISED]
Zombie thread: 0
Process Process-1:
Traceback (most recent call last):
  File "/Users/takuya/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Users/takuya/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "example_thread_process.py", line 10, in bar
    raise ValueError("bar raise")
ValueError: bar raise
main_Process [NOT raised]
Zombie thread: 0
main_ProcessPool [RAISED]
Zombie thread: 3
main_ProcessPoolExecutorMap [RAISED]
Zombie thread: 0
main_ProcessPoolExecutorSubmit [RAISED]
Zombie thread: 0

結果として、

  • threading.Threadmultiprocessing.Processはメインスレッドに例外を投げない。
  • multiprocessing.Poolmultiprocessing.pool.ThreadPoolはゾンビスレッドを残す。

ということがわかりました。

結論としては、concurrent.futuresを使っておくのが間違いなさそうだ、と言えるのではないでしょうか。何か間違っているところがあったら教えていただけるとありがたいです。

なお、本実験はMac OSX 10.10.2、Python 2.7.9(Anaconda)上で行いました。

gistにもスクリプトがあります)