概要
Pythonで並列処理を行うことができるmultiprocessing.Pool()
を使ってみました。
並列処理を使うと、場合によっては、
「同じプログラムを走らせても、毎回異なる結果になる」
というケースがあること(=競合状態ともいう)がわかりましたので、そちらをサンプルコード付きで紹介します。
検証内容と前提
今回は、
「100レコードを保持するデータベースのテーブルから、100のレコードidを一定の塊(チャンク)に分割して、チャンクごとに並列処理でリストに代入していく」
という並列処理を想定します。
例えば、100件のレコードがある場合で、チャンクは10とします。
すると、[0 - 9]
, [10 - 19]
...といった形でまず塊を作成していって、それが4つたまったら、並列処理で一斉にリストの中の要素を別のリストに代入していく、という流れです。
この例で言うと、[0 - 9]
, [10 - 19]
、[20 - 29]
、[30 - 39]
の4つになったら、それぞれのリストの要素を共有リスト(ここではshared_list
とします)に並列処理で代入していきます。
この時点では、0から39までの数字が共有リストに入っています。
これをMAXのレコード数を超えるまで続けます。同じように実施すると[40 - 49]
、[50 - 59]
、[60 - 69]
、[70 - 79]
です。ここで4つたまったので、これも一斉に並列処理でshared_list
へ代入。
今回は100
がMAXレコード数なので、ここで処理は終了、と言うプログラムです。
これにより、4つのリストごとに並列処理で別のリストへの代入が行われたことになります。
さて、この場合、リストの中身は綺麗に0から79まで並ぶでしょうか?
並ばない場合、毎回順番は同じになるのでしょうか?
この辺の調査として、以下のサンプルコードを用意しました。
やっていることは、上述した通りです。
サンプルコード
import multiprocessing
import time
def process_chunk(args):
chunk_data, shared_list = args
for item in chunk_data:
shared_list.append(item)
if __name__ == "__main__":
# 共有リスト
manager = multiprocessing.Manager()
shared_list = manager.list()
# データベースのデータ
total_records = 100
chunk_size = 10
num_chunks = int(total_records / chunk_size)
current_index = 0
# プロセスプールを作成。指定された数のワーカープロセスを管理し、タスクを並列に実行
pool = multiprocessing.Pool()
tasks = []
chunk_counter = 0
for chunk_id in range(num_chunks):
chunk_data = list(range(current_index, (chunk_id + 1) * chunk_size))
tasks.append((chunk_data, shared_list))
current_index = (chunk_id + 1) * chunk_size
chunk_counter += 1
if chunk_counter == 4:
pool.map(process_chunk, tasks)
tasks = []
chunk_counter = 0
if total_records > current_index:
chunk_data = list(range(current_index, total_records))
tasks.append((chunk_data, shared_list))
pool.map(process_chunk, tasks)
pool.close()
pool.join()
print(list(shared_list))
処理の補足
少々ややこしいので、補足説明。不要な方はskipしてください。
まず、multiprocessing.Manager
を使用して共有リスト(shared_list
)を作成します。
普通のリストじゃダメなのか?と思いますが、通常のリストを使用すると、複数のプロセスが同時にリストにアクセスして変更を加える際に競合状態が発生してしまいます。実際に普通のリストにして実行するとわかりますが、空のままになってしまいます。manager.list()
を利用して、複数のプロセス間で安全に共有できるリストを作成しましょう。
Manager() 関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。
引用元:https://docs.python.org/ja/3/library/multiprocessing.html#sharing-state-between-processes
次に、pool = multiprocessing.Pool()
により、並列処理で必要なプロセスプールを作成します。
指定された数のワーカープロセスを管理し、タスクを並列に実行することができます。
具体的にいくつのワーカープロセスがあるのか?を調べるには、
print(f"デフォルトのワーカープロセスの数: {pool._processes}")
と入れてあげます。すると私のマシンでは
デフォルトのワーカープロセスの数: 10
と出力されました。10個のCPUコアが利用可能であるため、デフォルトで10個のワーカープロセスが作成された、と言うことでしょう。
ここから、具体的な処理に入ります。
for
ループの最初から見ていきます。
例えば、current_index
が0、chunk_id
が0、chunk_size
が10の場合、range(0, 10)
は0から9までの整数を生成し、それをリストに変換して変数chunk_data
に代入する、という処理になります。
↓
chunk_data
は、tasks
に追加されます。
具体的には以下のような形です。第一引数に先ほど生成されたchunk_data
がありますね。第二引数には最終的に出力されるshared_list
が入っています。
[([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], <ListProxy object, typeid 'list' at 0x10d44b8b0>)]
↓
current_index
を(chunk_id + 1) * chunk_size
とすることで、先ほどの終わりの要素から次のfor
ループを回します。上記の流れで言えば、9で終わっていたので、次は10から開始されます。
また、chunk_counter
はカウンターとして+1
していきます。
↓
これを回し続け、chunk_counter
が4になったら、並列処理を開始します。
chunk_counter
が4になる、というのは、上記の流れで言えば、
・[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
・[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
・[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
・[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
の4つのリスト(chunk_data
)が生成された時点、です。
並列処理は、tasks
の中身の要素をshared_list
に適用していく、というもの。
process_chunk
関数を実施すると、chunk_data
リストの中の要素を一つずつshared_list
に入れていく、という処理が行われます。
これにより、一斉に4つのリストの中の要素が、次々に別のリストに代入されていきます。
ちなみに、ここで実行されているpool.map
は、指定された関数を並列に実行するために使用するmultiprocessing
モジュールにおけるPool
クラスのメソッドです。指定された関数(この場合はprocess_chunk
)を、指定されたイテラブル(この場合はtasks
)の各要素に対して並列に適用します。
並列処理によって、結果が変わることはあるのか?
ここで、本題の「並列処理によって、結果が変わることはあるのか?」の問いに入ります。
実際にこのコードを実行すると、結果は毎回異なることがわかります。
[10, 0, 11, 1, 12, 2, 13, 3, 14, 20, 15, 4, 21, 5, 22, 6, 23, 16, 17, 24, 18, 25, 7, 8, 26, 19, 9, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 70, 60, 71, 61, 72, 62, 73, 63, 74, 64, 75, 65, 76, 66, 77, 67, 78, 68, 79, 69, 40, 41, 50, 42, 51, 43, 52, 44, 53, 45, 54, 46, 55, 47, 56, 48, 57, 49, 58, 59]
という結果もあれば、
[0, 1, 2, 3, 4, 5, 6, 10, 7, 11, 8, 9, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 60, 70, 61, 71, 62, 72, 63, 73, 64, 74, 65, 75, 66, 76, 67, 77, 68, 78, 69, 79, 50, 51, 52, 53, 40, 54, 55, 41, 56, 42, 57, 43, 58, 44, 59, 45, 46, 47, 48, 49]
という結果もあります。
これを並列処理の「競合状態(race condition)」と呼ぶらしいです。
競合状態(race condition)とは、電子回路やコンピュータプログラムの動作が、外部の別の要因によって左右されること。同じ資源に複数の主体が同時にアクセスした場合などに生じ、正しく動作することも誤った結果になることもある不安定な状態である。
引用元:https://e-words.jp/w/%E7%AB%B6%E5%90%88%E7%8A%B6%E6%85%8B.html#google_vignette
何が起きるか: 競合状態(race condition)が生じると、マルチスレッド・プログラムの実行結果は、各スレッド上での操作実行順序に依存する。言い換えると、プログラム実行のたびに 同一入力を与えても異なる結果を出力する という非決定的(indeterminate)な動作につながる。
引用元:https://qiita.com/yohhoy/items/00c6911aa045ef5729c6
上述のサンプルコードでは、4つのリストごとにshared_list
への代入を並列処理で実施していました。
このようにすると、それぞれのプロセス(スレッド)が行うデータ追加の順序が保証されないからです。
各プロセスがshared_list.append(item)
を実行する順序が毎回異なるため、最終的なshared_list
の順序も異なるものになるのです。
ということで以上、検証でした。
これは決して「誤り」のあるプログラム、と言うわけではないのですが、毎回結果が異なると言う意味では「不安定」であると言えるでしょう。
参考
公式ドキュメントはこちら