PythonのMultiprocessでプロセス間での値の共有
Pythonにおいて、プログラム実行中にプロセス間での値のやり取りや、あるプロセスが他のプロセスの値を参照する必要がある場合は共有メモリやサーバープロセスを使います。
Pythonでマルチプロセス
まずは、ベースとなるマルチプロセスのソースコードです。3つのプロセスを起動し、プロセスごとに指定された秒間隔で0~4を表示します。使用しているPythonのバージョンは3.6.4です。python2.7でもprintの書き方を合わせれば動きます。
from multiprocessing import Process
import time
def process1():
    for i in range(5):
        time.sleep(0.5)
        print("process1:" + str(i))
        # python2.7では「print "process1:" + str(i)」でもOK
def process2():
    for i in range(5):
        time.sleep(0.7)
        print("process2:" + str(i))
def process3():
    for i in range(5):
        time.sleep(0.9)
        print("process3:" + str(i))
if __name__ == '__main__':
    process1 = Process(target=process1, args=())
    process2 = Process(target=process2, args=())
    process3 = Process(target=process3, args=())
    process1.start()
    process2.start()
    process3.start()
    process1.join()
    process2.join()
    process3.join()
    print("process ended")
実行結果は以下のようになります。
$ python3 multiprocess.py
process1:0
process2:0
process3:0
process1:1
process2:1
process1:2
process3:1
process1:3
process2:2
process1:4
process3:2
process2:3
process2:4
process3:3
process3:4
process ended
Process1,2,3が並列で実行されていることが分かると思います。
共有メモリ(Shared memory)
プロセス間でデータを共有メモリ上に保持するためにValueクラスとArrayクラスが提供されています。
メインプロセスでValueクラスとArrayクラスを宣言し、3つのプロセスで共有変数を参照します。
ValueクラスとArrayクラスともに第一引数に変数の型を宣言します。ctypesの型か arrayモジュールで使用されるような1文字の型コードを指定可能で、iはint型、dはdouble型のように指定することができます。
Valueクラスの第二引数には初期値を、Arrayクラスの第二引数には配列のサイズもしくは初期値を設定することができます。
import time
from multiprocessing import Value, Array, Process
def process1(count, array):
    for i in range(5):
        time.sleep(0.5)
        # Valueオブジェクトの値を操作
        count.value += 1
        # Arrayオブジェクトの値を操作
        array[count.value - 1] = count.value
        print("process1:" + str(count.value))
def process2(count, array):
    for i in range(5):
        time.sleep(0.7)
        count.value += 1
        array[count.value - 1] = count.value
        print("process2:" + str(count.value))
def process3(count, array):
    for i in range(5):
        time.sleep(0.9)
        count.value += 1
        array[count.value - 1] = count.value
        print("process3:" + str(count.value))
if __name__ == '__main__':
    # 共有メモリの作成
    # Valueオブジェクトの生成
    count = Value('i', 0)
    # Arrayオブジェクトの生成
    array = Array('i', 15)
    print("count:" + str(type(count)))
    print("array:" + str(type(array)))
    print(array[:])
    process1 = Process(target=process1, args=[count, array])
    process2 = Process(target=process2, args=[count, array])
    process3 = Process(target=process3, args=[count, array])
    process1.start()
    process2.start()
    process3.start()
    process1.join()
    process2.join()
    process3.join()
    print(array[:])
    print("process ended")
実行結果は以下のようになります。
$ python multiprocess.py
count:<class 'multiprocessing.sharedctypes.Synchronized'>
array:<class 'multiprocessing.sharedctypes.SynchronizedArray'>
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
process1:1
process2:2
process3:3
process1:4
process2:5
process1:6
process3:7
process1:8
process2:9
process1:10
process3:11
process2:12
process2:13
process3:14
process3:15
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
process ended
ValueクラスのcountオブジェクトとArrayクラスのarrayオブジェクトを各プロセスで参照し、インクリメントと値の配列への追加を行っていることが確認できます。
サーバープロセス(Server Process)
Pythonのオブジェクトを保持して、他のプロセスがプロキシ経由でそのPythonオブジェクトを操作するManagerクラスが提供されています。
Managerクラスはlist,dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array をサポートします。
import time
from multiprocessing import Manager,Value, Process
def process1(count, array):
    for i in range(5):
        time.sleep(0.5)
        # Valueオブジェクトの値を操作
        count.value += 1
        # Listを操作
        array.append(count.value)
        print("process1:" + str(count.value))
def process2(count, array):
    for i in range(5):
        time.sleep(0.7)
        count.value += 1
        array.append(count.value)
        print("process2:" + str(count.value))
def process3(count, array):
    for i in range(5):
        time.sleep(0.9)
        count.value += 1
        array.append(count.value)
        print("process3:" + str(count.value))
if __name__ == '__main__':
    # Managerオブジェクトの作成
    with Manager() as manager
      # マネージャーからValueクラスを作成
      count = manager.Value('i', 0)
      # マネージャーからListを作成
      array = manager.list()
      print("count:" + str(type(count)))
      print("array:" + str(type(array)))
      print(array[:])
      process1 = Process(target=process1, args=[count, array])
      process2 = Process(target=process2, args=[count, array])
      process3 = Process(target=process3, args=[count, array])
      process1.start()
      process2.start()
      process3.start()
      process1.join()
      process2.join()
      process3.join()
      print(array[:])
      print("process ended")
実行結果は以下のようになります。
python multiprocess.py
count:<class 'multiprocessing.managers.ValueProxy'>
array:<class 'multiprocessing.managers.ListProxy'>
[]
process1:1
process2:2
process3:3
process1:4
process2:5
process1:6
process3:7
process1:8
process2:9
process1:10
process3:11
process2:12
process2:13
process3:14
process3:15
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
process ended
共有メモリのサンプルコードと同じような結果となりました。Managerクラスはリストを扱うことができるため、共有メモリのArrayクラスよりも使い勝手がいいと思います。Managerオブジェクトは共有メモリのオブジェクトよりも扱える型が多く柔軟ではありますが、共有メモリよりも動作が遅いという欠点があります。
