99
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.

posted at

updated at

Organization

Pythonでプロセス間の値の共有

PythonのMultiprocessでプロセス間での値の共有

Pythonにおいて、プログラム実行中にプロセス間での値のやり取りや、あるプロセスが他のプロセスの値を参照する必要がある場合は共有メモリやサーバープロセスを使います。

Pythonでマルチプロセス

まずは、ベースとなるマルチプロセスのソースコードです。3つのプロセスを起動し、プロセスごとに指定された秒間隔で0~4を表示します。使用しているPythonのバージョンは3.6.4です。python2.7でもprintの書き方を合わせれば動きます。

multiprocess.py
from multiprocessing import Process
import time

def process1():
    for i in range(5):
        time.sleep(0.5)
        print("process1:" + str(i))
        # python2.7では「print "process1:" + str(i)」でもOK

def process2():
    for i in range(5):
        time.sleep(0.7)
        print("process2:" + str(i))

def process3():
    for i in range(5):
        time.sleep(0.9)
        print("process3:" + str(i))

if __name__ == '__main__':

    process1 = Process(target=process1, args=())
    process2 = Process(target=process2, args=())
    process3 = Process(target=process3, args=())

    process1.start()
    process2.start()
    process3.start()

    process1.join()
    process2.join()
    process3.join()

    print("process ended")

実行結果は以下のようになります。

実行結果
$ python3 multiprocess.py
process1:0
process2:0
process3:0
process1:1
process2:1
process1:2
process3:1
process1:3
process2:2
process1:4
process3:2
process2:3
process2:4
process3:3
process3:4
process ended

Process1,2,3が並列で実行されていることが分かると思います。

共有メモリ(Shared memory)

プロセス間でデータを共有メモリ上に保持するためにValueクラスとArrayクラスが提供されています。
メインプロセスでValueクラスとArrayクラスを宣言し、3つのプロセスで共有変数を参照します。
ValueクラスとArrayクラスともに第一引数に変数の型を宣言します。ctypesの型か arrayモジュールで使用されるような1文字の型コードを指定可能で、iはint型、dはdouble型のように指定することができます。
Valueクラスの第二引数には初期値を、Arrayクラスの第二引数には配列のサイズもしくは初期値を設定することができます。

multiprocess.py
import time
from multiprocessing import Value, Array, Process


def process1(count, array):
    for i in range(5):
        time.sleep(0.5)
        # Valueオブジェクトの値を操作
        count.value += 1
        # Arrayオブジェクトの値を操作
        array[count.value - 1] = count.value
        print("process1:" + str(count.value))


def process2(count, array):
    for i in range(5):
        time.sleep(0.7)
        count.value += 1
        array[count.value - 1] = count.value
        print("process2:" + str(count.value))


def process3(count, array):
    for i in range(5):
        time.sleep(0.9)
        count.value += 1
        array[count.value - 1] = count.value
        print("process3:" + str(count.value))


if __name__ == '__main__':
    # 共有メモリの作成
    # Valueオブジェクトの生成
    count = Value('i', 0)
    # Arrayオブジェクトの生成
    array = Array('i', 15)
    print("count:" + str(type(count)))
    print("array:" + str(type(array)))
    print(array[:])

    process1 = Process(target=process1, args=[count, array])
    process2 = Process(target=process2, args=[count, array])
    process3 = Process(target=process3, args=[count, array])

    process1.start()
    process2.start()
    process3.start()

    process1.join()
    process2.join()
    process3.join()

    print(array[:])
    print("process ended")

実行結果は以下のようになります。

実行結果
$ python multiprocess.py
count:<class 'multiprocessing.sharedctypes.Synchronized'>
array:<class 'multiprocessing.sharedctypes.SynchronizedArray'>
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
process1:1
process2:2
process3:3
process1:4
process2:5
process1:6
process3:7
process1:8
process2:9
process1:10
process3:11
process2:12
process2:13
process3:14
process3:15
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
process ended

ValueクラスのcountオブジェクトとArrayクラスのarrayオブジェクトを各プロセスで参照し、インクリメントと値の配列への追加を行っていることが確認できます。

サーバープロセス(Server Process)

Pythonのオブジェクトを保持して、他のプロセスがプロキシ経由でそのPythonオブジェクトを操作するManagerクラスが提供されています。
Managerクラスはlist,dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array をサポートします。

multiprocess.py
import time
from multiprocessing import Manager,Value, Process


def process1(count, array):
    for i in range(5):
        time.sleep(0.5)
        # Valueオブジェクトの値を操作
        count.value += 1
        # Listを操作
        array.append(count.value)
        print("process1:" + str(count.value))


def process2(count, array):
    for i in range(5):
        time.sleep(0.7)
        count.value += 1
        array.append(count.value)
        print("process2:" + str(count.value))


def process3(count, array):
    for i in range(5):
        time.sleep(0.9)
        count.value += 1
        array.append(count.value)
        print("process3:" + str(count.value))


if __name__ == '__main__':
    # Managerオブジェクトの作成
    with Manager() as manager

      # マネージャーからValueクラスを作成
      count = manager.Value('i', 0)
      # マネージャーからListを作成
      array = manager.list()
      print("count:" + str(type(count)))
      print("array:" + str(type(array)))
      print(array[:])

      process1 = Process(target=process1, args=[count, array])
      process2 = Process(target=process2, args=[count, array])
      process3 = Process(target=process3, args=[count, array])

      process1.start()
      process2.start()
      process3.start()

      process1.join()
      process2.join()
      process3.join()

      print(array[:])
      print("process ended")

実行結果は以下のようになります。

実行結果
python multiprocess.py
count:<class 'multiprocessing.managers.ValueProxy'>
array:<class 'multiprocessing.managers.ListProxy'>
[]
process1:1
process2:2
process3:3
process1:4
process2:5
process1:6
process3:7
process1:8
process2:9
process1:10
process3:11
process2:12
process2:13
process3:14
process3:15
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
process ended

共有メモリのサンプルコードと同じような結果となりました。Managerクラスはリストを扱うことができるため、共有メモリのArrayクラスよりも使い勝手がいいと思います。Managerオブジェクトは共有メモリのオブジェクトよりも扱える型が多く柔軟ではありますが、共有メモリよりも動作が遅いという欠点があります。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
99
Help us understand the problem. What are the problem?