Python並列処理(multiprocessingとJoblib)

  • 31
    Like
  • 2
    Comment

はじめに

Python並列処理で検索するとまずでてくるのがmultiprocessingJoblibです.

両者とも様々に解説記事が上がっていますが,multiprocessingよりもJoblibの方が,

  • 並列化する関数に引数に配列以外の形が取れる
  • Ctrl+cで終了した時に子プロセスも終了してくれる
  • 自動で全体の進捗を表示してくれるオプションがある

等々便利な機能が多いため,普段はそちらを使っていました.
ただ,ひょんなことからmultiprocessingを使う機会があり,両者の速度を比較してみたところ,理由はわからなかったのですがmultiprocessingでの並列化の方が速かったため,備忘録を残しておきます.

実行環境

  • Ubuntu 16.04.1
  • Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz
  • 仮想8コア、物理4コア
  • Python 3.5.2

Joblib

インストールはpipでおこなえます.

使い方を簡単に示すと,

def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(100)]

のような関数があったとして,

from joblib import Parallel, delayed

result = Parallel(n_jobs=-1)([delayed(process)(n) for n in range(1000)])

のように並列化処理を書くことで,process関数を並列実行できます.n_jobsの中身を変えることで,利用するコア数を変更することができます.-1で利用できる最大のコア数で実行してくれ,1だとシングル実行と同じ状況になります.お手軽でいいですね.

利用できるコア数以上の数字をn_jobsに指定しても落ちることはなく,適宜利用可能コアに処理を振り分けてくれるようです.

また,n_jobs以外にも引数verboseを取ることができ,0~10の数値を指定すると指定した頻度に従って進捗を出力してくれます.

multiprocessing

multiprocessingはPythonの標準ライブラリなので特にインストールなど行わずに使うことができます.いろいろと機能は多いのですが,一番簡単な使い方を示すと

from multiprocessing import Pool
import multiprocessing as multi

p = Pool(multi.cpu_count())
p.map(process, list(range(1000)))
p.close()

のようになります.Poolで並列数を指定し,mapで関数を並列実行します.multi.cpu_count()で最大実行可能コア数を取得してくれます.なお作成したPoolはclose()で終了させないと,メモリを食って大変なことになるので気をつけてください.

使い勝手の違い

multiprocessingに比べjoblibの最大の利点は,並列化する関数の引数に配列以外を取れるという点です.

例えば,

def process(i, s):
    return [{'id': i, 'str': s, 'sum': sum(range(i*j))} for j in range(100)]

のように複数の引数を取る関数があったとして,
joblibでは,

strs = ['a', 'b', 'c']
result = Parallel(n_jobs=job)([delayed(process)(i,s) for i,s in enumerate(strs * 1000)])

のように実行できますが,同じようにmultiprocessingで走らせようとしても,

p = Pool(multi.cpu_count())
strs = ['a', 'b', 'c']
p.map(process, enumerate(strs * 1000))
p.close()

引数で怒られてエラーがはかれてしまいます.

TypeError: process() missing 1 required positional argument:

この場合は実行関数の方を,

def process(v):
    return [{'id': v[0], 'str': v[1], 'sum': sum(range(v[0]*j))} for j in range(100)]

のように,配列一つを引数にとるように変更する必要があります.

速度比較

さて,本題の速度比較ですが,
実行関数に対して,

def process(n):
    return sum([i*n for i in range(100000)])

それぞれ並列実行を試し,速度を計測します.

def usejoblib(job, num):
    Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])


def usemulti(job, num):
    p = Pool(multi.cpu_count() if job < 0 else job)
    p.map(process, list(range(num)))
    p.close()

ループ数を変えて10回計測し,それぞれ平均を出した結果がこちら(job数は8, 単位はsec)

loop_n normal Joblib multiprocessing
10 0.0441 0.113 0.0217
100 0.414 0.211 0.139
1000 4.16 1.32 1.238
10000 41.1 12.5 12.2
100000 430 123 119

並列化なしと比べると両者格段に速くなっています.両者の速度差はあまりない(若干multiprocessing版のほうが速い?)ようです.

1ループ内の処理量を上げてみると

def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]
loop_n normal Joblib multiprocessing
10 0.25 0.21 0.07
100 28.4 13.2 7.53
1000 - 737 701

こちらでも若干multiprocessingの方が速くなりましたが,理由まではわかりませんでした. この程度の差であれば実行環境や関数の処理負荷によって結果が逆転する可能性もありますが,どちらにせよ通常のループ処理よりは格段に速くなるので,Pythonでループ処理をする際には積極的に使っていきたいですね.

最後に今回実行したコードを載せておきます.

import time
from joblib import Parallel, delayed
from multiprocessing import Pool
import multiprocessing as multi
from more_itertools import flatten
import sys
import functools


def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(1000)]


#def process(n):
#    return sum([i*n for i in range(100000)])


def usejoblib(job, num):
    result =Parallel(n_jobs=job)([delayed(process)(n) for n in range(num)])
    return result


def usemulti(job, num):
    p = Pool(multi.cpu_count() if job < 0 else job)
    result = p.map(process, range(num))
    p.close()
    return result

if __name__ == '__main__':
    argv = sys.argv
    total = 0
    n = 1

    for i in range(n):
        s = time.time()
        if argv[1] == 'joblib':
            result = usejoblib(int(argv[2]),int(argv[3]))
        elif argv[1] == 'multi':
            result = usemulti(int(argv[2]),int(argv[3]))
        else:
            result = [process(j) for j in range(int(argv[3]))]
        elapsed = time.time()-s
        print('time: {0} [sec]'.format(elapsed))
        total += elapsed

    print('--------')
    print('average: {0} [sec]'.format(total/n))

    sums = functools.reduce(lambda x, y: x + y['sum'], list(flatten(result)), 0)
    print('total: {0}'.format(sums))
#    print('total: {0}'.format(sum(result)))