本稿について
Pythonのマルチプロセスについて学習したことを自分なりにまとめた記事になります
Pythonにおけるマルチプロセスについて
マルチプロセスとはどんな場面で用いられるか.
⇒並列処理を実現するとき,実現手段としてプロセスを分けることが挙げられます.
マルチコアCPU上でCPUの負荷の大きいタスクを実行するアプリケーションでは,現状マルチコアCPU上を活用するにはマルチプロセスを使用する必要があります.
https://docs.python.org/ja/3/faq/library.html#can-t-we-get-rid-of-the-global-interpreter-lock
Pythonにおけるマルチプロセスのメリット
- GILの制限がかからないため,より多くのリソースを活用できる
- メモリコンテキストを共有しないため,データの破損,デッドロックが発生するリスクが軽減される
##プロセスを開始するとは
マルチプロセスを使用したソースコードに言及する前に新しいプロセスを開始する方法について言及します.
あらゆるプログラミング言語においても,新しいプロセスを開始する方法はプログラムをフォークすることです.
Pythonではos.fork()
を実行することにより,メモリコンテキストが子プロセスをコピーされた後,各プロセスはそれぞれ別のアドレス空間で動作します.以下,ソース.
import os
pid_list = []
def main():
pid_list.append(os.getpid())
child_pid = os.fork()
if child_pid == 0:
pid_list.append(os.getpid())
print()
print("子: こんにちは,私は子プロセスです")
print("子: 私が知っているPID番号は %s です" % pid_list)
else:
pid_list.append(os.getpid())
print()
print("親:こんにちは,私は親プロセスです")
print("親:子プロセスのPID番号は %d です"%child_pid)
print("親:私が知っているPID番号は %s です"%pid_list)
if __name__ == "__main__":
main()
$python fork.py
親:こんにちは,私は親プロセスです
親:子プロセスのPID番号は 321 です
親:私が知っているPID番号は [320, 320] です
子: こんにちは,私は子プロセスです
子: 私が知っているPID番号は [320, 321] です
初期のプロセスは同じ320のPIDですが,子プロセスでは321を追加していることがわかり,二つのプロセス間でメモリコンテキストを共有していないことがわかります.
プロセス間通信の実装
プロセスのメモリはデフォルトでは共有されません.プロセス間で通信したい場合は,いくつかの作業が必要です.
これを簡単にするためにmultiprocessing
モジュールはプロセス間で通信する方法をいくつか提供しています.
ここで紹介するのは以下の2つの方法.
-
multiprocessing.Pipe
を使う方法 -
multiprocessing.sharedctypes
を使う方法
multiprocessing.Pipe
について
PipeクラスはUnixやLinuxのパイプと似たような概念を持っています.
multiprocessing.Pipe()
はパイプの両端を表すConnection
オブジェクトをペアで返します.下記(pipesample.py)の例では,parent_conn, child_conn = Pipe()
が該当.デフォルトであるPipe(True)
により,双方向になります.Pipe(False)
ならパイプは一方向性であり,conn1,conn2=Pipe()
とすると,conn1
はメッセージの受信専用,conn2
は送信専用になります.
またPipeクラスはpickle可能なオブジェクトを送受信します.
参考URL:https://docs.python.org/ja/2.7/library/multiprocessing.html#pipes-and-queues
from multiprocessing import Process, Pipe
class CustomClass:
pass
def work(connection):
while True:
instance = connection.recv()
if instance:
print("子:受信:{}".format(instance))
else:
return
def main():
parent_conn, child_conn = Pipe()
child = Process(target=work, args=(child_conn,))
for item in (
42,
'some string',
{'one':1},
CustomClass(),
None,
):
print("親: 送信:{}".format(item))
parent_conn.send(item)
child.start()
child.join()
if __name__ == "__main__":
main()
$python pipesample.py
親: 送信:42
親: 送信:some string
親: 送信:{'one': 1}
親: 送信:<__main__.CustomClass object at 0x7fc785a34ac8>
親: 送信:None
子:受信:42
子:受信:some string
子:受信:{'one': 1}
子:受信:<__main__.CustomClass object at 0x7fc785268978>
for item in (42,...,None,):
で生成したインスタンスを親.send()
の引数に渡してあげると,子.recv()
の受信によってペアであるプロセスにデータの状態を引き渡しています.またプロセスのアドレスが異なっていることもわかります.
multiprocessing.sharedctypes
を用いた実装
multiprocessing.sharedctypes
クラスでは,共有メモリを作成し,そこにデータ型(int型,double型等)
を入れる方法を提供しています.データの型はCの型に従います.最も基本的なものはValue(typecode_or_type, *arg, lock=True)
とArray(typecode_or_type, size_or_initializer, *, lock=True)
です.typecode_or_type
は返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。list,dictionary, Namespace, Lock等は記述が困難なため,その場合はmultiprocessing.Manager
を使用します.
参考:https://docs.python.org/ja/3/library/multiprocessing.html#sharing-state-between-processes
from multiprocessing import Process, Value, Array
def f(n,a):
n.value = 3.141592
for i in range(len(a)):
a[i] = -a[i]
if __name__ == "__main__":
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
$python valuearray.py
3.141592
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
プロセスプールを使用する
スレッドの代わりにマルチプロセスを使用すると,オーバーヘッドが大幅に増加します.特に各プロセスに独立したメモリコンテキストが存在する場合,メモリの使用量が増加します.これにより,子プロセスが大量に生成する場合においては,スレッドを用いた処理よりも弊害が大きいことになります.
マルチプロセスを用いたアプリケーションにおいて,リソースの使用率を制御するよい方法としてプロセスプールを構築することが挙げられます.
プロセスプールの基本的な考え方としてあらかじめ指定されたプロセスを用意し,そこでキューからアイテムを取り出して処理していくという様な考え方です.処理すべきタスクが到着してからプロセスを起動するというのではなく,プロセスをあらかじめ起動しておき,タスクが割り振らてからすぐに処理を開始するようにします.
Pool
クラスについて
このクラスは複数のプロセスを管理する複雑な処理をすべて負担してくれます.
下記のソースコードは,GCP(GoogleCloudPlatfrom)のGoogleMapのAPIを利用して,都市名にヒットする緯度と経度を取得しています.
POOL_SIZE=4
とすることで,並列に動作するプロセスを4つに指定しています.またPool
クラスはコンテキストマネージャーを使用できます.
from multiprocessing import Pool
from gmaps import Geocoding
api = Geocoding(api_key='ひみつ')
PLACES = (
'Reykjavik','Vien','Zadar',
'Venice','Wrocow','Bolognia',
'Berlin','Dehil','New York',
'Osaka'
)
POOL_SIZE = 4
def fetch_place(place):
return api.geocode(place)[0]
def present_result(geocoded):
print("{:s}, {:6.2f}, {:6.2f}".format(
geocoded['formatted_address'],
geocoded['geometry']['location']['lat'],
geocoded['geometry']['location']['lng'],
).encode('utf-8'))
def main():
with Pool(POOL_SIZE) as pool:
results = pool.map(fetch_place, PLACES)
for result in results:
present_result(result)
if __name__ == "__main__":
main()
$ python geocoding_by_multiprocessing.py
b'Reykjav\xc3\xadk, Iceland, 64.15, -21.94'
b'3110 Glendale Blvd, Los Angeles, CA 90039, USA, 34.12, -118.26'
b'Zadar, Croatia, 44.12, 15.23'
b'Venice, Metropolitan City of Venice, Italy, 45.44, 12.32'
b'Wroc\xc5\x82aw, Poland, 51.11, 17.04'
b'Bologna, Metropolitan City of Bologna, Italy, 44.49, 11.34'
b'Berlin, Germany, 52.52, 13.40'
b'Delhi, India, 28.70, 77.10'
b'New York, NY, USA, 40.71, -74.01'
b'Osaka, Japan, 34.69, 135.50'
所感
並列処理の勉強大変.(笑)
参考文献
- エキスパートPythonプログラミング第二版
- https://docs.python.org/ja/3/