Poolオブジェクトによる基本的なプロセス管理と、Pipeオブジェクトを使った返り値処理を記載する。
multiprocessing Documentation(accessed: 2022/01/31)
実行環境
Mac OS Monterey 12.1
Python 3.9.2
iMac (Retina 5K, 27-inch, 2017)
プロセッサ 3.4 GHz クアッドコアIntel Core i5
Pool Object
引数 processesに整数を与えるとその数のプロセスを生成して並列処理を行う。何も与えないと自動的にos.cpu_count()が返す値を使ってプロセスを生成する。
imap_unorderd(func, iterable[, chunksize])
プロセスにタスクを渡す方法は複数あるが、タスクの処理順を気にしない場合はimap_unordered()が最も速い。
funcに処理したい関数を渡し、iterableにその引数を渡す。chunksizeはタスクの分割サイズを決めるものだがunorderdの場合処理が終わったプロセスから順番にタスクが振られるので気にする必要はない。
pool = Pool()
pool.imap_unordered(api_task, params) # これだけで並列処理してくれる。
# tqdm(プログレスバー)を組み合わせたバージョン
with tqdm(total=len(params)) as t:
for _ in pool.imap_unordered(api_task, params):
t.update(1)
Pipe Object
他プロセスからの返り値を受け取るために使う。
片方の端で読み書きを同時に行うとPipe内データが破損する恐れがある。
引数にduplexを取り、 duplex=Trueで双方向性,Falseで一方向性のPipeを生成できる。
データの破損を避けるためにFalseで運用するのが無難。
from multiprocessing import Process, Pipe
def double(x, sender): # 並列処理に渡す関数
sleep(3)
y = x*x
sender.send(y)
return None
nums = range(5)
process_list = []
connections = [] # Pipe
for name in names:
receiver, sender = Pipe(False) # 受信側、送信側の順で生成される
p = Process(target=double, args=(nums, sender))
connections.append(receiver)
process_list.append(p)
p.start() # プロセスの処理を開始する。
for process in process_list: # タスクの終了待ち処理 この間プロセスはロックされる。
process.join(1) # 1秒間のタイムアウトをとる タスクひとつひとつが十分に高速に処理できる場合いらないかもしれない。(要動作確認)
results = [conn.recv() for conn in connections]
print(results)
for process in process_list:
process.terminate() # プロセスを終了させる
終わりに
プロセスの学問的理解をしたいと考えながら既に数年経ちました。
参考文献に載せたCPUのコアとスレッド、プロセスの関係性の記事がわかりやすかったです。
ハードウェアの勉強とソフトウェアの勉強は交互にすべきですね。
参考文献
multiprocessing の基本(accessed 2022/01/31)
【図解】CPUのコアとスレッドとプロセスの違い(accessed 2022/01/31)