python の multiprocess におけるプロセス間通信を実装していたら、
大量のデータをやり取りする場合にものすごい時間がかかりました。
なので、各手法について計測してみました。
pythonのバージョンは3.7です。
結果まとめ
結果を先にいうと以下です。
send time | recv time | データ制約 | 別途書き込み管理 | 備考 | |
---|---|---|---|---|---|
Queue | 0.00s | 5.33s | pickle化できるもの | 不要 | |
Pipe | 3.12s | 5.33s | pickle化できるもの | 不要 | |
共有メモリ(Array) | 3.02s | 2.55s | 1次元配列 | 必要 | データに制約がある |
Manager | 10.19s | 10.29s | 配列・辞書他 | 不要 | |
RawArray | 5.61s(numpy含む) | 0.18s | 1次元配列 | 必要 | 早いがコードが複雑 |
File | 3.86s | 5.26s | pickle化できるもの | 必要 | ファイル経由 |
socket | 4.13s(pickle含む) | 5.34s(pickle含む) | pickle化できるもの | 必要 | コードが複雑。 |
データ数による速度の違い
まずはデータ数でどこまで速度に違いがあるかを Pipe を例に見ていきます。
評価方法としてはデータ転送は1度のみ行い、転送時の配列のサイズを変化させます。
p1 プロセスはデータを送信する側で、1度送信するだけです。
p2 プロセスはデータを待ち続け、受信したら終了するプロセスとなります。
コード
import multiprocessing as mp
import time
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=100000000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
一応データが受信できているかを確認するために assert 文も入れてあります。
実行結果
送信時間 | 受信時間 | |
---|---|---|
pipe10 | 0.0s | 0.0s |
pipe10^5 | 0.0109s | 0.0039s |
pipe10^6 | 0.0324s | 0.0708s |
pipe10^7 | 0.3240s | 0.6425s |
pipe10^8 | 3.2228s | 6.4095s |
※下4桁でまるめています。
見事にデータサイズと比例してますね。
各データ送信方法による比較
コードは、各データの送受信方法だけを抜粋して記載しています。
データサイズは 10^8 で固定です。
コードの全容は記事の最後を見てください。
参考: Pythonのthreadingとmultiprocessingを完全理解
Queue
import multiprocessing as mp
# 作成(抜粋)
q = mp.Queue()
# 送信(抜粋)
p.put(d)
# 受信(抜粋)
if not p.empty():
d = p.get()
実行結果
[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s
Pipe
Pipeは単方向の実装をしています。
import multiprocessing as mp
# 作成(抜粋)
reciver, sender = mp.Pipe(duplex=False)
# 送信(抜粋)
sender.send(d)
# 受信(抜粋)
if p.poll():
reciver = p.recv()
実行結果
[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s
共有メモリ(Array)
共有メモリ自体に書き込みが終わったかどうかを判断するものがないので、
別途 Value(共有メモリ)変数を追加しています。
import multiprocessing as mp
import ctypes
# 作成(抜粋)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
# 送信(抜粋)
p[:] = d
flag.value = True
# 受信(抜粋)
if flag.value:
d = p[:]
実行結果
[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s
Manager
import multiprocessing as mp
# 作成(抜粋)
with mp.Manager() as manager:
shared_arr = manager.list()
# 送信(抜粋)
shared_arr.append(d)
# 受信(抜粋)
if len(p) > 0:
d = p.pop()
実行結果
[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s
RawArray
以下を参考にしています。
共有メモリ(Array)と同じで別途送信完了のflag変数を用意しています。
また、numpy化にかかる時間を別で計測しています。
import multiprocessing.sharedctypes
import ctypes
import numpy as np
# 作成(抜粋)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
# 送信(抜粋)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True
# 受信(抜粋)
if flag.value:
d = np.array(p, dtype=np.uint8, copy=True)
実行結果
[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s
File
方向性を変えてメモリではなくファイル経由でやり取りする場合です。
また、共有メモリと同様に書き込み終了を flag 変数で管理しています。
import multiprocessing as mp
import pickle
import tempfile
import os
# 作成(抜粋)
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
# 送信(抜粋)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
# 受信(抜粋)
if flag.value:
with open(os.path.join(p, 'testfile'), 'r+b') as f:
d = pickle.load(f)
実行結果
[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s
socket
さらに方向性を変えてsocket通信によるやり取りです。
サーバ起動確認用に flag 変数を使用しています。
また、pickle化を別途計測としています。
import multiprocessing as mp
import socket
import pickle
# 作成(抜粋)
flag = mp.Value(ctypes.c_bool, False)
# 送信(クライアント)(抜粋)
d = pickle.dumps(d)
if flag.value:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
# 受信(サーバ)(抜粋)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
conn, addr = s.accept()
d = b""
with conn:
while True:
# データを受け取る
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
d = pickle.loads(d)
実行結果
[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s
コード全体
import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket
def queue1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.put(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def queue2(*args):
p = args[0]
key = args[1]
while True:
if p.empty():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.get()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_queue(key, size=10000*10000):
q = mp.Queue()
p1 = mp.Process(target=queue1, args=(q, key, size))
p2 = mp.Process(target=queue2, args=(q, key))
p1.start()
p2.start()
p1.join()
p2.join()
def pipe1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.send(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def pipe2(*args):
p = args[0]
key = args[1]
while True:
if not p.poll():
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.recv()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_pipe(key, size=10000*10000):
reciver, sender = mp.Pipe(duplex=False)
p1 = mp.Process(target=pipe1, args=(sender, key, size))
p2 = mp.Process(target=pipe2, args=(reciver, key))
p1.start()
p2.start()
p1.join()
p2.join()
def array1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def array2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: # データが変わるまで待つ
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p[:]
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_array(key, size=10000*10000):
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=array1, args=(arr, flag, key, size))
p2 = mp.Process(target=array2, args=(arr, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def manager1(*args):
p = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
p.append(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
def manager2(*args):
p = args[0]
key = args[1]
while True:
if len(p) == 0:
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = p.pop()
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_manager(key, size=10000*10000):
with mp.Manager() as manager:
shared_arr = manager.list()
p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
p2 = mp.Process(target=manager2, args=(shared_arr, key))
p1.start()
p2.start()
p1.join()
p2.join()
def raw1(*args):
p = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
t0 = time.time()
d = np.asarray(d)
print("[{}]cast numpy time: {}s".format(key, time.time()-t0))
print("[{}]send start".format(key))
t0 = time.time()
np.asarray(p)[:] = d
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def raw2(*args):
p = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: # データが変わるまで待つ
continue
print("[{}]recv start".format(key))
t0 = time.time()
d = np.array(p, dtype=np.uint8, copy=True)
assert d[5]==5
print("[{}]recv end: {}s".format(key, time.time()-t0))
break
def main_raw(key, size=10000*10000):
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def file1(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
size = args[3]
d = [i for i in range(size)]
print("[{}]send start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
pickle.dump(d, f)
flag.value = True
print("[{}]send end: {}s".format(key, time.time()-t0))
def file2(*args):
tmpdir = args[0]
flag = args[1]
key = args[2]
while True:
if not flag.value: # データが変わるまで待つ
continue
print("[{}]recv start".format(key))
t0 = time.time()
with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
d = pickle.load(f)
print("[{}]recv end: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_file(key, size=10000*10000):
with tempfile.TemporaryDirectory() as tmpdir:
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
p1.start()
p2.start()
p1.join()
p2.join()
def socket1(*args):
flag = args[0]
key = args[1]
size = args[2]
d = [i for i in range(size)]
t0 = time.time()
d = pickle.dumps(d)
print("[{}]pickle pack time: {}s".format(key, time.time()-t0))
while True:
if not flag.value:
continue
print("[{}]send start".format(key))
t0 = time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 50007))
s.sendall(d)
print("[{}]send end: {}s".format(key, time.time()-t0))
break
def socket2(*args):
flag = args[0]
key = args[1]
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 50007))
s.listen(1)
flag.value = True
while True:
print("[{}]server wait start".format(key))
conn, addr = s.accept()
print("[{}]recv start".format(key))
t0 = time.time()
d = b""
with conn:
while True:
# データを受け取る
data = conn.recv(1024*1024*1024)
if not data:
break
d += data
print("[{}]recv end: {}s".format(key, time.time()-t0))
t0 = time.time()
d = pickle.loads(d)
print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))
assert d[5]==5
break
def main_socket(key, size=10000*10000):
flag = mp.Value(ctypes.c_bool, False)
p1 = mp.Process(target=socket1, args=(flag, key, size))
p2 = mp.Process(target=socket2, args=(flag, key,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
main_pipe("pipe10", 10)
main_pipe("pipe10^5", 100000)
main_pipe("pipe10^6", 1000000)
main_pipe("pipe10^7", 10000000)
main_pipe("pipe10^8", 100000000)
main_queue("queue")
main_pipe("pipe")
main_array("array")
main_manager("manager")
main_raw("raw")
main_file("file")
main_socket("socket")