LoginSignup
47
50

More than 3 years have passed since last update.

【Python】マルチプロセスについて

Last updated at Posted at 2019-11-11

本稿について

Pythonのマルチプロセスについて学習したことを自分なりにまとめた記事になります

Pythonにおけるマルチプロセスについて

マルチプロセスとはどんな場面で用いられるか.
⇒並列処理を実現するとき,実現手段としてプロセスを分けることが挙げられます.

マルチコアCPU上でCPUの負荷の大きいタスクを実行するアプリケーションでは,現状マルチコアCPU上を活用するにはマルチプロセスを使用する必要があります.

https://docs.python.org/ja/3/faq/library.html#can-t-we-get-rid-of-the-global-interpreter-lock

Pythonにおけるマルチプロセスのメリット

  • GILの制限がかからないため,より多くのリソースを活用できる
  • メモリコンテキストを共有しないため,データの破損,デッドロックが発生するリスクが軽減される

プロセスを開始するとは

マルチプロセスを使用したソースコードに言及する前に新しいプロセスを開始する方法について言及します.
あらゆるプログラミング言語においても,新しいプロセスを開始する方法はプログラムをフォークすることです.
Pythonではos.fork()を実行することにより,メモリコンテキストが子プロセスをコピーされた後,各プロセスはそれぞれ別のアドレス空間で動作します.以下,ソース.

fork.py
import os

pid_list = []

def main():
    pid_list.append(os.getpid())
    child_pid = os.fork()

    if child_pid == 0:
        pid_list.append(os.getpid())
        print()
        print("子: こんにちは,私は子プロセスです")
        print("子: 私が知っているPID番号は %s です" % pid_list)

    else:
        pid_list.append(os.getpid())
        print()
        print("親:こんにちは,私は親プロセスです")
        print("親:子プロセスのPID番号は %d です"%child_pid)
        print("親:私が知っているPID番号は %s です"%pid_list)

if __name__ == "__main__":
    main()

$python fork.py

親:こんにちは,私は親プロセスです
親:子プロセスのPID番号は 321 です
親:私が知っているPID番号は [320, 320] です

子: こんにちは,私は子プロセスです
子: 私が知っているPID番号は [320, 321] です

初期のプロセスは同じ320のPIDですが,子プロセスでは321を追加していることがわかり,二つのプロセス間でメモリコンテキストを共有していないことがわかります.

プロセス間通信の実装

プロセスのメモリはデフォルトでは共有されません.プロセス間で通信したい場合は,いくつかの作業が必要です.
これを簡単にするためにmultiprocessingモジュールはプロセス間で通信する方法をいくつか提供しています.
ここで紹介するのは以下の2つの方法.

  • multiprocessing.Pipeを使う方法
  • multiprocessing.sharedctypesを使う方法

multiprocessing.Pipeについて

PipeクラスはUnixやLinuxのパイプと似たような概念を持っています.
multiprocessing.Pipe()はパイプの両端を表すConnectionオブジェクトをペアで返します.下記(pipesample.py)の例では,parent_conn, child_conn = Pipe()が該当.デフォルトであるPipe(True)により,双方向になります.Pipe(False)ならパイプは一方向性であり,conn1,conn2=Pipe()とすると,conn1はメッセージの受信専用,conn2は送信専用になります.

またPipeクラスはpickle可能なオブジェクトを送受信します.

参考URL:https://docs.python.org/ja/2.7/library/multiprocessing.html#pipes-and-queues

pipesample.py
from multiprocessing import Process, Pipe

class CustomClass:
    pass

def work(connection):
    while True:
        instance = connection.recv()

        if instance:
            print("子:受信:{}".format(instance))

        else:
            return

def main():
    parent_conn, child_conn = Pipe()

    child = Process(target=work, args=(child_conn,))

    for item in (
        42,
        'some string',
        {'one':1},
        CustomClass(),
        None,
    ):
        print("親: 送信:{}".format(item))
        parent_conn.send(item)

    child.start()
    child.join()

if __name__ == "__main__":
    main()
$python pipesample.py
親: 送信:42
親: 送信:some string
親: 送信:{'one': 1}
親: 送信:<__main__.CustomClass object at 0x7fc785a34ac8>
親: 送信:None
子:受信:42
子:受信:some string
子:受信:{'one': 1}
子:受信:<__main__.CustomClass object at 0x7fc785268978>

for item in (42,...,None,):で生成したインスタンスを親.send()の引数に渡してあげると,子.recv()の受信によってペアであるプロセスにデータの状態を引き渡しています.またプロセスのアドレスが異なっていることもわかります.

multiprocessing.sharedctypesを用いた実装

multiprocessing.sharedctypesクラスでは,共有メモリを作成し,そこにデータ型(int型,double型等)
を入れる方法を提供しています.データの型はCの型に従います.最も基本的なものはValue(typecode_or_type, *arg, lock=True)Array(typecode_or_type, size_or_initializer, *, lock=True)です.typecode_or_typeは返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。list,dictionary, Namespace, Lock等は記述が困難なため,その場合はmultiprocessing.Managerを使用します.
参考:https://docs.python.org/ja/3/library/multiprocessing.html#sharing-state-between-processes

valuearray.py
from multiprocessing import Process, Value, Array

def f(n,a):
    n.value = 3.141592
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == "__main__":
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])
$python valuearray.py
3.141592
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

プロセスプールを使用する

スレッドの代わりにマルチプロセスを使用すると,オーバーヘッドが大幅に増加します.特に各プロセスに独立したメモリコンテキストが存在する場合,メモリの使用量が増加します.これにより,子プロセスが大量に生成する場合においては,スレッドを用いた処理よりも弊害が大きいことになります.
マルチプロセスを用いたアプリケーションにおいて,リソースの使用率を制御するよい方法としてプロセスプールを構築することが挙げられます.
プロセスプールの基本的な考え方としてあらかじめ指定されたプロセスを用意し,そこでキューからアイテムを取り出して処理していくという様な考え方です.処理すべきタスクが到着してからプロセスを起動するというのではなく,プロセスをあらかじめ起動しておき,タスクが割り振らてからすぐに処理を開始するようにします.

Poolクラスについて

このクラスは複数のプロセスを管理する複雑な処理をすべて負担してくれます.

下記のソースコードは,GCP(GoogleCloudPlatfrom)のGoogleMapのAPIを利用して,都市名にヒットする緯度と経度を取得しています.
POOL_SIZE=4とすることで,並列に動作するプロセスを4つに指定しています.またPoolクラスはコンテキストマネージャーを使用できます.

geocoding_by_multiprocessing.py

from multiprocessing import Pool

from gmaps import Geocoding

api = Geocoding(api_key='ひみつ')

PLACES = (
    'Reykjavik','Vien','Zadar',
    'Venice','Wrocow','Bolognia',
    'Berlin','Dehil','New York',
    'Osaka'
)

POOL_SIZE = 4

def fetch_place(place):
    return api.geocode(place)[0]

def present_result(geocoded):
    print("{:s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ).encode('utf-8'))

def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)

if __name__ == "__main__":
    main()

$ python geocoding_by_multiprocessing.py
b'Reykjav\xc3\xadk, Iceland,  64.15, -21.94'
b'3110 Glendale Blvd, Los Angeles, CA 90039, USA,  34.12, -118.26'
b'Zadar, Croatia,  44.12,  15.23'
b'Venice, Metropolitan City of Venice, Italy,  45.44,  12.32'
b'Wroc\xc5\x82aw, Poland,  51.11,  17.04'
b'Bologna, Metropolitan City of Bologna, Italy,  44.49,  11.34'
b'Berlin, Germany,  52.52,  13.40'
b'Delhi, India,  28.70,  77.10'
b'New York, NY, USA,  40.71, -74.01'
b'Osaka, Japan,  34.69, 135.50'

所感

並列処理の勉強大変.(笑)

参考文献

47
50
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
50