search
LoginSignup
4
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

PythonのProcessPoolExecutorで即座にプロセスを終了させる方法

はじめに

Pythonでマルチプロセスで並列タスク実行する方法がいくつかあります。
その中の1つにconcurrent.futures.ProcessPoolExecutorがあります。

Python3.2から追加されたビルドインのクラスです。
内部ではMultiProcessingを使いつつ、IFを整理(制限)することでユーザーに優しい作りになっています。
簡単にマルチプロセスを使いたい場合にはおすすめのライブラリです。

一方で複雑な処理を行いたい場合にはIFが乏しいため、かゆい所に手が届かないということもしばしばです。
その中でもProcessPoolExecutorが行ってくれているプロセス管理に関して困ったことがあったので書き記します。

困ったこと

ProcessPoolExecutorで生成したFutureの完了を待つconcurrent.futures.as_completed(fs, timeout=None)という関数があります。
見ての通り第二引数に秒数を指定してタイムアウトさせることができ、タイムアウトした場合、concurrent.futures.TimeoutErrorが送出されます。

タイムアウトした場合、実行しているFutureや実行待ちのFutureをキャンセルすると思います。
が、キャンセルは実行中のFutureに関しては効きません。
(ドキュメントに記載されていますが、日本語ドキュメントでキャンセルの説明文だけ英語のままなのが謎です)

実行している処理が無限待ちしている、即座に終了させたいとなった場合に困ります。
(実行している処理内でIOをロックしているなどなど ← メソッドの場合は__del__で解放処理を書いてあげると理想ですね)
無限待ちに関しては作りが悪いという話もありますが、メインプロセス内のタイムアウトで制御したい場合もあるかと思います。

実際のコードはこちら。

import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
    Sleepして指定された値を返す
    Args:
        value: 指定値
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Future作成
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        # 実行
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            # 現在のfutureの状態を表示
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            # Futureをキャンセル
            for future in futures:
                if not future.running():
                    future.cancel()

    # 実行後のfutureの状態を確認
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

Workerを2つ用意して、100秒待機して指定した値を返す関数を5タスク実行する処理です。
これを実行すると、以下のようになります。

Timeout -----
2579991022224 running: True cancelled: False
2579991067424 running: True cancelled: False
2579991117104 running: True cancelled: False
2579991116480 running: False cancelled: False
2579991117536 running: False cancelled: False
Executor Shutdown -----
2579991022224 running: True cancelled: False
2579991067424 running: True cancelled: False
2579991117104 running: True cancelled: False
2579991116480 running: False cancelled: True
2579991117536 running: False cancelled: True

最初のFuture3つが実行中のまま処理が終了せずにメインプロセスが固まっています(下2つはキャンセルされている)。
これは実行中のFutureの処理を待っているためです。
今の処理だと約200秒後にメインプロセスが終了します。

解決策

内部的に管理しているプロセスにアクセスできるので、直接Killしてあげます。
実際のコードはこちら。

import time
import concurrent
from concurrent.futures.process import ProcessPoolExecutor


def test(value: int) -> int:
    """
    Sleepして指定された値を返す
    Args:
        value: 指定値
    """
    time.sleep(100)
    return value

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Future作成
        futures = []
        for index in range(5):
            future = executor.submit(test, index)
            futures.append(future)

        # 実行
        try:
            timeout = 5
            for future in concurrent.futures.as_completed(futures, timeout):
                result = future.result()
                print(result)

        except concurrent.futures.TimeoutError as _:
            # 現在のfutureの状態を表示
            print("Timeout -----")
            for future in futures:
                print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

            # Futureをキャンセル
            for future in futures:
                if not future.running():
                    future.cancel()

            # プロセスをKill
            # !! ここを追加 !!
            for process in executor._processes.values():
                process.kill()

    # 実行後のfutureの状態を確認
    print("Executor Shutdown -----")
    for future in futures:
        print(id(future), f"running: {future.running()}", f"cancelled: {future.cancelled()}")

Futureをキャンセルした後にProcessをKillしています。
ProcessPoolExecutorではProtectedプロパティになっているので本来はアクセスされない意図でしょうが...
MultiProcessingを使っている場合は普通にやっていますよね。
※パイプのデータがぶっ壊れたりするので気を付けてください。

これを実行すると、以下のようになります。

Timeout -----
1348685908624 running: True cancelled: False
1348685953824 running: True cancelled: False
1348686003552 running: True cancelled: False
1348686003888 running: False cancelled: False
1348686004128 running: False cancelled: False
Executor Shutdown -----
1348685908624 running: False cancelled: False
1348685953824 running: False cancelled: False
1348686003552 running: False cancelled: False
1348686003888 running: False cancelled: True
1348686004128 running: False cancelled: True

最初の3つのFutureのrunnningがFalseの状態になっていますね。
これでFutureの実行が止まっているので、即座にメインプロセスが終了します。

まとめ

裏技的な方法でしたがProcessPoolExecutorでプロセスを即座に終了させる方法を紹介しました。
あまり使うことはないと思いますが、稀に困ることがあると思うので参考にしてみてください。

余談

ProcessPoolExecutorの内部の処理は、~\lib\concurrent\futures\process.pyにあります。
先頭に説明文が図と共に記載されていてこれがとても分かりやすいので一読をオススメします。
ProcessPoolExecutor.png

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
4
Help us understand the problem. What are the problem?