0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python】並列処理モジュールmultiprocessingを使うと、毎回結果が異なるケースがあるのか?競合状態とは

Posted at

概要

Pythonで並列処理を行うことができるmultiprocessing.Pool()を使ってみました。

並列処理を使うと、場合によっては、

「同じプログラムを走らせても、毎回異なる結果になる」

というケースがあること(=競合状態ともいう)がわかりましたので、そちらをサンプルコード付きで紹介します。

検証内容と前提

今回は、

「100レコードを保持するデータベースのテーブルから、100のレコードidを一定の塊(チャンク)に分割して、チャンクごとに並列処理でリストに代入していく」

という並列処理を想定します。

例えば、100件のレコードがある場合で、チャンクは10とします。
すると、[0 - 9], [10 - 19]...といった形でまず塊を作成していって、それが4つたまったら、並列処理で一斉にリストの中の要素を別のリストに代入していく、という流れです。

この例で言うと、[0 - 9], [10 - 19][20 - 29][30 - 39]の4つになったら、それぞれのリストの要素を共有リスト(ここではshared_listとします)に並列処理で代入していきます。
この時点では、0から39までの数字が共有リストに入っています。

これをMAXのレコード数を超えるまで続けます。同じように実施すると[40 - 49][50 - 59][60 - 69][70 - 79]です。ここで4つたまったので、これも一斉に並列処理でshared_listへ代入。

今回は100がMAXレコード数なので、ここで処理は終了、と言うプログラムです。
これにより、4つのリストごとに並列処理で別のリストへの代入が行われたことになります。

さて、この場合、リストの中身は綺麗に0から79まで並ぶでしょうか?
並ばない場合、毎回順番は同じになるのでしょうか?

この辺の調査として、以下のサンプルコードを用意しました。
やっていることは、上述した通りです。

サンプルコード

import multiprocessing
import time

def process_chunk(args):
    chunk_data, shared_list = args
    for item in chunk_data:
        shared_list.append(item)

if __name__ == "__main__":
    # 共有リスト
    manager = multiprocessing.Manager()
    shared_list = manager.list()

    # データベースのデータ
    total_records = 100
    chunk_size = 10
    num_chunks = int(total_records / chunk_size)
    current_index = 0

    # プロセスプールを作成。指定された数のワーカープロセスを管理し、タスクを並列に実行
    pool = multiprocessing.Pool()
    tasks = []
    chunk_counter = 0

    for chunk_id in range(num_chunks):

        chunk_data = list(range(current_index, (chunk_id + 1) * chunk_size))

        tasks.append((chunk_data, shared_list))
        current_index = (chunk_id + 1) * chunk_size
        chunk_counter += 1

        if chunk_counter == 4:
            pool.map(process_chunk, tasks)
            tasks = []
            chunk_counter = 0

    if total_records > current_index:
        chunk_data = list(range(current_index, total_records))
        tasks.append((chunk_data, shared_list))
        pool.map(process_chunk, tasks)

    pool.close()
    pool.join()

    print(list(shared_list))

処理の補足

少々ややこしいので、補足説明。不要な方はskipしてください。

まず、multiprocessing.Managerを使用して共有リスト(shared_list)を作成します。
普通のリストじゃダメなのか?と思いますが、通常のリストを使用すると、複数のプロセスが同時にリストにアクセスして変更を加える際に競合状態が発生してしまいます。実際に普通のリストにして実行するとわかりますが、空のままになってしまいます。manager.list()を利用して、複数のプロセス間で安全に共有できるリストを作成しましょう。

Manager() 関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。
引用元:https://docs.python.org/ja/3/library/multiprocessing.html#sharing-state-between-processes

次に、pool = multiprocessing.Pool()により、並列処理で必要なプロセスプールを作成します。
指定された数のワーカープロセスを管理し、タスクを並列に実行することができます。

具体的にいくつのワーカープロセスがあるのか?を調べるには、
print(f"デフォルトのワーカープロセスの数: {pool._processes}")
と入れてあげます。すると私のマシンでは

デフォルトのワーカープロセスの数: 10

と出力されました。10個のCPUコアが利用可能であるため、デフォルトで10個のワーカープロセスが作成された、と言うことでしょう。

ここから、具体的な処理に入ります。
forループの最初から見ていきます。

例えば、current_indexが0、chunk_idが0、chunk_sizeが10の場合、range(0, 10)は0から9までの整数を生成し、それをリストに変換して変数chunk_dataに代入する、という処理になります。

chunk_dataは、tasksに追加されます。
具体的には以下のような形です。第一引数に先ほど生成されたchunk_dataがありますね。第二引数には最終的に出力されるshared_listが入っています。

[([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], <ListProxy object, typeid 'list' at 0x10d44b8b0>)]

current_index(chunk_id + 1) * chunk_sizeとすることで、先ほどの終わりの要素から次のforループを回します。上記の流れで言えば、9で終わっていたので、次は10から開始されます。

また、chunk_counterはカウンターとして+1していきます。

これを回し続け、chunk_counterが4になったら、並列処理を開始します。
chunk_counterが4になる、というのは、上記の流れで言えば、

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]

の4つのリスト(chunk_data)が生成された時点、です。

並列処理は、tasksの中身の要素をshared_listに適用していく、というもの。
process_chunk関数を実施すると、chunk_dataリストの中の要素を一つずつshared_listに入れていく、という処理が行われます。

これにより、一斉に4つのリストの中の要素が、次々に別のリストに代入されていきます。

ちなみに、ここで実行されているpool.mapは、指定された関数を並列に実行するために使用するmultiprocessingモジュールにおけるPoolクラスのメソッドです。指定された関数(この場合はprocess_chunk)を、指定されたイテラブル(この場合はtasks)の各要素に対して並列に適用します。

並列処理によって、結果が変わることはあるのか?

ここで、本題の「並列処理によって、結果が変わることはあるのか?」の問いに入ります。

実際にこのコードを実行すると、結果は毎回異なることがわかります。

[10, 0, 11, 1, 12, 2, 13, 3, 14, 20, 15, 4, 21, 5, 22, 6, 23, 16, 17, 24, 18, 25, 7, 8, 26, 19, 9, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 70, 60, 71, 61, 72, 62, 73, 63, 74, 64, 75, 65, 76, 66, 77, 67, 78, 68, 79, 69, 40, 41, 50, 42, 51, 43, 52, 44, 53, 45, 54, 46, 55, 47, 56, 48, 57, 49, 58, 59]

という結果もあれば、

[0, 1, 2, 3, 4, 5, 6, 10, 7, 11, 8, 9, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 60, 70, 61, 71, 62, 72, 63, 73, 64, 74, 65, 75, 66, 76, 67, 77, 68, 78, 69, 79, 50, 51, 52, 53, 40, 54, 55, 41, 56, 42, 57, 43, 58, 44, 59, 45, 46, 47, 48, 49]

という結果もあります。

これを並列処理の「競合状態(race condition)」と呼ぶらしいです。

競合状態(race condition)とは、電子回路やコンピュータプログラムの動作が、外部の別の要因によって左右されること。同じ資源に複数の主体が同時にアクセスした場合などに生じ、正しく動作することも誤った結果になることもある不安定な状態である。
引用元:https://e-words.jp/w/%E7%AB%B6%E5%90%88%E7%8A%B6%E6%85%8B.html#google_vignette

何が起きるか: 競合状態(race condition)が生じると、マルチスレッド・プログラムの実行結果は、各スレッド上での操作実行順序に依存する。言い換えると、プログラム実行のたびに 同一入力を与えても異なる結果を出力する という非決定的(indeterminate)な動作につながる。
引用元:https://qiita.com/yohhoy/items/00c6911aa045ef5729c6

上述のサンプルコードでは、4つのリストごとにshared_listへの代入を並列処理で実施していました。
このようにすると、それぞれのプロセス(スレッド)が行うデータ追加の順序が保証されないからです。

各プロセスがshared_list.append(item)を実行する順序が毎回異なるため、最終的なshared_listの順序も異なるものになるのです。

ということで以上、検証でした。
これは決して「誤り」のあるプログラム、と言うわけではないのですが、毎回結果が異なると言う意味では「不安定」であると言えるでしょう。

参考

公式ドキュメントはこちら

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?