python2.7

Python Multiprocessingにおける共有変数の扱い方

multiprocessingの基本的な使い方

pythonの標準ライブラリであるmultiprocessingを使うことで簡単に並列処理をすることができます。multiprocessingでは関数を定義し、その定義した関数を並列処理します。

sample.py
import multiprocessing                      
from multiprocessing import Pool   

def function(arg):                                  
        print(arg)

print("make subprocess")                                                                     
p = Pool(processes=multiprocessing.cpu_count())
print("start subprocess")
p.map(function, range(10))                                                               
print("end subprocess")
p.close()

最初にPoolクラスを生成しますが、その際に使用するコア数を指定します。今回はマシンの最大使用可能コア数を指定しました。
並列処理したい内容を定義したfunction関数をPoolクラスのmapメソッドで実行することができます。
mapメソッドの第二引数が並列処理する関数の引数となります。
終了したらPoolクラスのcloseメソッドでプロセスを閉じます。closeを忘れると並列処理していたメモリが解放されずに残り続けるので気をつけてください。

実行結果
make subprocess
start subprocess
0
1
2
3
4
5
6
7
8
9
end subprocess

共有変数の扱い方

並列処理をしているので基本的にメモリの共有は行いません。ただし、プロセス間でデータのやりとりや共有が必要な時はValueクラスかArrayクラスを用いることで、共有メモリ上でデータを共有することができます。

sample_shared_memory_Process.py
import multiprocessing
import os
from multiprocessing import  Value, Array, Process

def function(count):
        print('parent process:', os.getppid())
        print('process id:', os.getpid())    
        for i in range(10):
                count.value += 1
                print(count.value)


count = Value("d",0.0)
print("make subprocess")
p = Process(target=function, args=[count])
print("start subprocess")
p.start()
p.join()
print("end subprocess")

Valueクラスで指定したdは型のタイプで今回はdouble型です。他にもf(float)やi(int)が指定可能です。

実行結果
make subprocess
start subprocess
('parent process:', 11108)
('process id:', 11109)
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
end subprocess

実行結果の通り親プロセスと子プロセスでPIDが違うのが分かります。しかし、Processクラスで並列処理する場合subprocessが一つしか作成することができません。複数プロセスにて共有変数を使用したい場合はPoolクラスを用いますが少しややこしい書き方が必要になります。

sample_shared_memory_Pool.py
import multiprocessing
import os
from multiprocessing import Pool, Value, Array



def function(i):
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        shared_count.value += 1
        print(shared_count.value)
        shared_array[i] = -shared_array[i]

def init(count,array):
        global shared_count, shared_array
        shared_count = count
        shared_array = array

count = Value("d",0.0)
array = Array("d",range(10))
print(array[:])

print("make subprocess")
p = Pool(processes=multiprocessing.cpu_count(),
         initializer=init, initargs=(count,array))
print("start subprocess")
p.map(function, range(10))
print("end subprocess")
p.close()                                                                                             

print(array[:])

Poolクラスのprocessesでプロセスをいくつ作成するかを指定しinitializerで共有する変数をglobalで定義します。init関数の引数をinitargsで渡します。

実行結果
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
make subprocess
start subprocess
('parent process:', 11793)
('process id:', 11796)
('parent process:', 11793)
('process id:', 11795)
('parent process:', 11793)
('process id:', 11797)
('parent process:', 11793)
('process id:', 11798)
1.0
2.0
2.0
2.0
('parent process:', 11793)
('process id:', 11796)
3.0
('parent process:', 11793)
('process id:', 11795)
4.0
('parent process:', 11793)
('process id:', 11797)
5.0
('parent process:', 11793)
('process id:', 11798)
('parent process:', 11793)
('process id:', 11796)
6.0
7.0
('parent process:', 11793)
('process id:', 11795)
8.0
end subprocess
[-0.0, -1.0, -2.0, -3.0, -4.0, -5.0, -6.0, -7.0, -8.0, -9.0]

arrayに関しては正しく機能しているがcountに関しては変数にアクセスするタイミングの問題で値が正しく更新されていないことが分かります。
アクセスするタイミングの問題を解決するために変数をロックしそれぞれのプロセスからのアクセスを制御します。

sample_shared_memory_Pool_with_lock.py
import multiprocessing
import os
from multiprocessing import Pool, Value, Array



def function(i):
        shared_count.acquire()
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        shared_count.value += 1
        print(shared_count.value)
        shared_array[i] = -shared_array[i]
        shared_count.release()

def init(count,array):
        global shared_count, shared_array
        shared_count = count
        shared_array = array

count = Value("d",0.0)
array = Array("d",range(10))
print(array[:])

print("make subprocess")
p = Pool(processes=multiprocessing.cpu_count(),initializer=init, initargs=(count,array))
print("start subprocess")
p.map(function, range(10))
print("end subprocess")
p.close()                                                                                             

print(array[:])

lockしたい共有変数のacquireメソッドを使ってlockします。解放したい時はreleaseメソッドを使います。
今回は変数をインクリメントするタイミングが問題だったのでその前後でacquireとreleaseしました。

実行結果
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
make subprocess
start subprocess
('parent process:', 11803)
('process id:', 11806)
1.0
('parent process:', 11803)
('process id:', 11805)
2.0
('parent process:', 11803)
('process id:', 11807)
3.0
('parent process:', 11803)
('process id:', 11808)
4.0
('parent process:', 11803)
('process id:', 11806)
5.0
('parent process:', 11803)
('process id:', 11805)
6.0
('parent process:', 11803)
('process id:', 11807)
7.0
('parent process:', 11803)
('process id:', 11808)
8.0
('parent process:', 11803)
('process id:', 11806)
9.0
('parent process:', 11803)
('process id:', 11805)
10.0
end subprocess
[-0.0, -1.0, -2.0, -3.0, -4.0, -5.0, -6.0, -7.0, -8.0, -9.0]

これでPIDが11805-11808の四つのプロセスで正しく動作させることができました。