PythonはGIL (グローバルインタプリタロック)というものがかかっており、基本的にただコードを書いただけでは複数のCPUコアがある場合にそのリソースを全て使い切ることが出来ません。
しかし、大量のデータを処理する時などにはマシンのCPUリソースを全て使って出来るだけ高速に計算させたい時がありますね。
Python 並列処理などのキーワードで検索すると標準ライブラリによるmultiprocessingモジュールの解説がよく見られるので、こちらを利用されている方も多いのではないでしょうか。
がっつり並列処理を組み込んだシステムを作る際にはこちらのモジュールを使って作り込みたいところですが、書き捨てのコードを書くだけの時には正直ちょっと面倒です…
そこで、よりお手軽にサクっと並列処理を実行出来るJoblibというモジュールについて紹介したいと思います。
コード量が減るのはもちろんのこと、他の利点として
- 子プロセスで吐いたエラーも表示してくれる
- Ctrl+cで終了した時に子プロセスも終了してくれる
- 自動で全体の進捗を表示してくれるオプションがある
などがあります。
どれもmultiprocessingでは自前で実装しなければならないので自分は大変重宝しています。
インストール
まずはインストールです。pipで一発です。
この記事を書いた時点のバージョンは0.9.3となっています。
pip install joblib
Successfully installed joblib-0.9.3
また、テストに用いた環境は仮想4コア、物理2コアのMacBookProです。
テストに利用するコード
単純に計算時間を比較するため、以下のようなコードでテストを行います。計算内容自体に特に意味はありません。
まずは単純な計算を繰り返してtimeモジュールで計算時間を測定し、表示してみます。
# -*- coding: utf-8 -*-
from time import time
def process(n):
return sum([i*n for i in range(100000)])
start = time()
# 繰り返し計算
total = 0
for i in range(10000):
total += process(i)
print(total)
print('{}秒かかりました'.format(time() - start))
249972500250000000
78.2647480965秒かかりました
計算の答えは249972500250000000、時間は78秒かかっています。
Joblibで並列化する
上記のコードの繰り返し計算の部分のみ変更を加え、並列化してみます。
Parallelとdelayedを組み合わせることによってマルチプロセスの並列化を実現することが出来ます。
# -*- coding: utf-8 -*-
from joblib import Parallel, delayed
from time import time
def process(n):
return sum([i*n for i in range(100000)])
start = time()
# 繰り返し計算 (並列化)
r = Parallel(n_jobs=-1)( [delayed(process)(i) for i in range(10000)] )
print(sum(r))
print('{}秒かかりました'.format(time() - start))
249972500250000000
37.5521140099秒かかりました
37秒に短縮されました!計算の答えも合っています。
コードの一部を書き換えるだけでサクっと計算時間を短縮することが出来ました。
Parallelの引数のn_jobsというのが利用するコア数で、これを-1にしておくと常に利用できる最大のコア数で実行してくれます。
1にすると並列化せずに実行しているのと同じことになるので元に戻すのも簡単です。
繰り返し処理は基本的にParallelで書いてしまっても問題なさそうです。
また、引数verboseに0~10の数値を指定すると指定した頻度に従って進捗を出力してくれます。
(0で出力無し、10で最頻)
r = Parallel(n_jobs=-1, verbose=10)( [delayed(process)(i) for i in range(10000)] )
おまけ:メソッド外の変数を操作する
上記のコードの例だと並列実行されているprocessメソッド内から外部のスコープに存在する変数を参照することは可能ですが、新たな値を代入することが出来ません。
プロセス毎に参照可能なメモリ領域が異なるからです。
以下のコードは通常は動作しますが、並列化したコードではエラーとなって実行が出来ないのです。
# processメソッドの中から外部の変数numberを参照、操作している
number = 0
def process(n):
number = 3.14
return sum([i*n for i in range(100000)])
これを解決するためにはプロセス間でメモリ領域を共有した変数を作ることが必要です。
multiprocessingモジュールに用意されているので使ってみましょう。
上記のコードは下記のように実現することが出来ます。
# -*- coding: utf-8 -*-
from joblib import Parallel, delayed
from multiprocessing import Value, Array
shared_int = Value('i', 1)
def process(n):
shared_int.value = 3.14
return sum([i*n for i in range(100000)])
# 繰り返し計算 (並列化)
Parallel(n_jobs=-1)( [delayed(process)(i) for i in range(10000)] )
print(shared_int.value)
3.14
Valueクラスを使うと、intやdoubleなどの数値を共有変数として用意することが出来ます。予め第一引数で型を指定することが必要なので注意してください。
また、Array('d', [0.0, 0.0, 0.0])
などとすることで指定した型のリストを用意することも可能です。
自分でカスタムした進捗表示をしたい時などに便利そうですね!
最後に
Pythonで並列処理、意外とやってみるとすんなり実装できてびっくりしました。
筆者はTwitterで日々技術情報もろもろの投稿をしています。是非フォローしてもらえると嬉しいです。