20
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

python3のmultiprocessingにおけるプロセス間通信の各種手法を計測してみた

Posted at

python の multiprocess におけるプロセス間通信を実装していたら、
大量のデータをやり取りする場合にものすごい時間がかかりました。

なので、各手法について計測してみました。
pythonのバージョンは3.7です。

結果まとめ

結果を先にいうと以下です。

send time recv time データ制約 別途書き込み管理 備考
Queue 0.00s 5.33s pickle化できるもの 不要
Pipe 3.12s 5.33s pickle化できるもの 不要
共有メモリ(Array) 3.02s 2.55s 1次元配列 必要 データに制約がある
Manager 10.19s 10.29s 配列・辞書他 不要
RawArray 5.61s(numpy含む) 0.18s 1次元配列 必要 早いがコードが複雑
File 3.86s 5.26s pickle化できるもの 必要 ファイル経由
socket 4.13s(pickle含む) 5.34s(pickle含む) pickle化できるもの 必要 コードが複雑。

データ数による速度の違い

まずはデータ数でどこまで速度に違いがあるかを Pipe を例に見ていきます。

評価方法としてはデータ転送は1度のみ行い、転送時の配列のサイズを変化させます。
p1 プロセスはデータを送信する側で、1度送信するだけです。
p2 プロセスはデータを待ち続け、受信したら終了するプロセスとなります。

コード

import multiprocessing as mp
import time

def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))


def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue

        print("[{}]recv start".format(key))
        t0 = time.time()
        p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break


def main_pipe(key, size=100000000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

一応データが受信できているかを確認するために assert 文も入れてあります。

実行結果

送信時間 受信時間
pipe10 0.0s 0.0s
pipe10^5 0.0109s 0.0039s
pipe10^6 0.0324s 0.0708s
pipe10^7 0.3240s 0.6425s
pipe10^8 3.2228s 6.4095s

※下4桁でまるめています。
見事にデータサイズと比例してますね。

各データ送信方法による比較

コードは、各データの送受信方法だけを抜粋して記載しています。
データサイズは 10^8 で固定です。

コードの全容は記事の最後を見てください。

参考: Pythonのthreadingとmultiprocessingを完全理解

Queue

import multiprocessing as mp

# 作成(抜粋)
q = mp.Queue()

# 送信(抜粋)
p.put(d)

# 受信(抜粋)
if not p.empty():
    d = p.get()

実行結果

[queue]send start
[queue]send end: 0.0s
[queue]recv start
[queue]recv end: 5.337700128555298s

Pipe

Pipeは単方向の実装をしています。

import multiprocessing as mp

# 作成(抜粋)
reciver, sender = mp.Pipe(duplex=False)

# 送信(抜粋)
sender.send(d)

# 受信(抜粋)
if p.poll():
    reciver = p.recv()

実行結果

[pipe]send start
[pipe]recv start
[pipe]send end: 3.121206045150757s
[pipe]recv end: 5.337015151977539s

共有メモリ(Array)

共有メモリ自体に書き込みが終わったかどうかを判断するものがないので、
別途 Value(共有メモリ)変数を追加しています。

import multiprocessing as mp
import ctypes

# 作成(抜粋)
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)

# 送信(抜粋)
p[:] = d
flag.value = True

# 受信(抜粋)
if flag.value:
    d = p[:]

実行結果

[array]send start
[array]send end: 3.0218513011932373s
[array]recv start
[array]recv end: 2.5539581775665283s

Manager

import multiprocessing as mp

# 作成(抜粋)
with mp.Manager() as manager:
    shared_arr = manager.list()
    
# 送信(抜粋)
shared_arr.append(d)

# 受信(抜粋)
if len(p) > 0:
    d = p.pop()

実行結果

[manager]send start
[manager]send end: 10.194092750549316s
[manager]recv start
[manager]recv end: 10.295690059661865s

RawArray

以下を参考にしています。

共有メモリ(Array)と同じで別途送信完了のflag変数を用意しています。
また、numpy化にかかる時間を別で計測しています。

import multiprocessing.sharedctypes
import ctypes
import numpy as np

# 作成(抜粋)
byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
flag = mp.Value(ctypes.c_bool, False)

# 送信(抜粋)
d = np.asarray(d)
np.asarray(p)[:] = d
flag.value = True

# 受信(抜粋)
if flag.value:
    d = np.array(p, dtype=np.uint8, copy=True)

実行結果

[raw]cast numpy time: 5.4573814868927s
[raw]send start
[raw]send end: 0.15658187866210938s
[raw]recv start
[raw]recv end: 0.18018245697021484s

File

方向性を変えてメモリではなくファイル経由でやり取りする場合です。
また、共有メモリと同様に書き込み終了を flag 変数で管理しています。

import multiprocessing as mp
import pickle
import tempfile
import os

# 作成(抜粋)
with tempfile.TemporaryDirectory() as tmpdir:
    flag = mp.Value(ctypes.c_bool, False)

# 送信(抜粋)
with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
    pickle.dump(d, f)
flag.value = True

# 受信(抜粋)
if flag.value:
    with open(os.path.join(p, 'testfile'), 'r+b') as f:
        d = pickle.load(f)

実行結果

[file]send start
[file]send end: 3.8698363304138184s
[file]recv start
[file]recv end: 5.267671585083008s

socket

さらに方向性を変えてsocket通信によるやり取りです。
サーバ起動確認用に flag 変数を使用しています。
また、pickle化を別途計測としています。

import multiprocessing as mp
import socket
import pickle

# 作成(抜粋)
flag = mp.Value(ctypes.c_bool, False)

# 送信(クライアント)(抜粋)
d = pickle.dumps(d)
if flag.value:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(('127.0.0.1', 50007))
        s.sendall(d)

# 受信(サーバ)(抜粋)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('127.0.0.1', 50007))
    s.listen(1)
    flag.value = True
    while True:
        conn, addr = s.accept()
        d = b""
        with conn:
            while True:
                # データを受け取る
                data = conn.recv(1024*1024*1024)
                if not data:
                    break
                d += data
        d = pickle.loads(d)

実行結果

[socket]server wait start
[socket]pickle pack time: 3.798427104949951s
[socket]send start
[socket]recv start
[socket]send end: 0.3363354206085205s
[socket]recv end: 0.5375902652740479s
[socket]pickle unpack time: 4.91701340675354s

コード全体

import multiprocessing as mp
import multiprocessing.sharedctypes
import time
import ctypes
import numpy as np
import tempfile
import pickle
import os
import socket


def queue1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.put(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def queue2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if p.empty():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.get()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_queue(key, size=10000*10000):
    q = mp.Queue()
    p1 = mp.Process(target=queue1, args=(q, key, size))
    p2 = mp.Process(target=queue2, args=(q, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()




def pipe1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p.send(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def pipe2(*args):
    p = args[0]
    key = args[1]
    
    while True:
        if not p.poll():
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.recv()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_pipe(key, size=10000*10000):
    reciver, sender = mp.Pipe(duplex=False)
    p1 = mp.Process(target=pipe1, args=(sender, key, size))
    p2 = mp.Process(target=pipe2, args=(reciver, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def array1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    p[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def array2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  # データが変わるまで待つ
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p[:]
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_array(key, size=10000*10000):
    arr = mp.Array(ctypes.c_int, [0]*size )
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=array1, args=(arr, flag, key, size))
    p2 = mp.Process(target=array2, args=(arr, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def manager1(*args):
    p = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]
    
    print("[{}]send start".format(key))
    t0 = time.time()
    p.append(d)
    print("[{}]send end: {}s".format(key, time.time()-t0))

def manager2(*args):
    p = args[0]
    key = args[1]

    while True:
        if len(p) == 0:
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = p.pop()
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_manager(key, size=10000*10000):
    with mp.Manager() as manager:
        shared_arr = manager.list()
        p1 = mp.Process(target=manager1, args=(shared_arr, key, size))
        p2 = mp.Process(target=manager2, args=(shared_arr, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def raw1(*args):
    p = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]
    
    t0 = time.time()
    d = np.asarray(d)
    print("[{}]cast numpy time: {}s".format(key, time.time()-t0))

    print("[{}]send start".format(key))
    t0 = time.time()
    np.asarray(p)[:] = d
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def raw2(*args):
    p = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  # データが変わるまで待つ
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        d = np.array(p, dtype=np.uint8, copy=True)
        assert d[5]==5
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        break

def main_raw(key, size=10000*10000):
    byte_buf = multiprocessing.sharedctypes.RawArray(ctypes.c_int, size)
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=raw1, args=(byte_buf, flag, key, size))
    p2 = mp.Process(target=raw2, args=(byte_buf, flag, key))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



def file1(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]
    size = args[3]

    d = [i for i in range(size)]

    print("[{}]send start".format(key))
    t0 = time.time()
    with open(os.path.join(tmpdir, 'testfile'), 'wb') as f:
        pickle.dump(d, f)
    flag.value = True
    print("[{}]send end: {}s".format(key, time.time()-t0))

def file2(*args):
    tmpdir = args[0]
    flag = args[1]
    key = args[2]

    while True:
        if not flag.value:  # データが変わるまで待つ
            continue
        print("[{}]recv start".format(key))
        t0 = time.time()
        with open(os.path.join(tmpdir, 'testfile'), 'rb') as f:
            d = pickle.load(f)
        print("[{}]recv end: {}s".format(key, time.time()-t0))
        assert d[5]==5
        break

def main_file(key, size=10000*10000):
    with tempfile.TemporaryDirectory() as tmpdir:
        flag = mp.Value(ctypes.c_bool, False)
        p1 = mp.Process(target=file1, args=(tmpdir, flag, key, size))
        p2 = mp.Process(target=file2, args=(tmpdir, flag, key))
        p1.start()
        p2.start()
        p1.join()
        p2.join()



def socket1(*args):
    flag = args[0]
    key = args[1]
    size = args[2]

    d = [i for i in range(size)]

    t0 = time.time()
    d = pickle.dumps(d)
    print("[{}]pickle pack time: {}s".format(key, time.time()-t0))

    while True:
        if not flag.value:
            continue
        print("[{}]send start".format(key))
        t0 = time.time()
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect(('127.0.0.1', 50007))
            s.sendall(d)
        print("[{}]send end: {}s".format(key, time.time()-t0))
        break

def socket2(*args):
    flag = args[0]
    key = args[1]

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 50007))
        s.listen(1)
        flag.value = True
        while True:
            print("[{}]server wait start".format(key))
            conn, addr = s.accept()
            print("[{}]recv start".format(key))
            t0 = time.time()

            d = b""
            with conn:
                while True:
                    # データを受け取る
                    data = conn.recv(1024*1024*1024)
                    if not data:
                        break
                    d += data
            print("[{}]recv end: {}s".format(key, time.time()-t0))

            t0 = time.time()
            d = pickle.loads(d)
            print("[{}]pickle unpack time: {}s".format(key, time.time()-t0))

            assert d[5]==5
            break

def main_socket(key, size=10000*10000):
    flag = mp.Value(ctypes.c_bool, False)
    p1 = mp.Process(target=socket1, args=(flag, key, size))
    p2 = mp.Process(target=socket2, args=(flag, key,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()



if __name__ == '__main__':
    main_pipe("pipe10", 10)
    main_pipe("pipe10^5", 100000)
    main_pipe("pipe10^6", 1000000)
    main_pipe("pipe10^7", 10000000)
    main_pipe("pipe10^8", 100000000)

    main_queue("queue")
    main_pipe("pipe")
    main_array("array")
    main_manager("manager")
    main_raw("raw")
    main_file("file")
    main_socket("socket")
20
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
20

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?