multiprocessing
python3
高速化
並列処理

Python高速化 【multiprocessing】【並列処理】

はじめに

pythonのプログラムの実行を高速化する上で並列処理が挙げられると思います。

自分自身、並列処理をよくわかっていなかったので、調べてまとめました。

pythonで並列処理を調べてみると、よくあげられるモジュールがmultiprocessingとjoblibだったので、ここでは、multiprocessingについてまとめてみました。

まずは確認

並列処理をする上で、重要なのが自身のマシンスペックです。

まずは自分自身のマシンのCPU、コア数を確認しましょう。

macであれば下記のコマンドで確認できます。

下記なら

CPU:12-Core Intel Xeon E5

コア数:12

$ system_profiler SPHardwareDataType
Hardware:

    Hardware Overview:
      ...
      Processor Name: 12-Core Intel Xeon E5
      Number of Processors: 1
      Total Number of Cores: 12
      ...

このマシンだと、12コア24スレッドでした。24というのが総スレッド数というものです。総スレッド数が、並列化できる最大数と考えていいと思います。ただ、プログラムを回す時の最大スレッド数とは異なります。

並列処理と単純実装の実行時間比較

引数として受け取った値を二乗して返すメソッドをひたすら回すプログラムで実行時間の比較をしたいと思います。それを0~(n-1)までの数で二乗計算を行います。nの値はプログラムの通り、10,1000,100000,10000000,100000000で実験しました。

sample.py
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == "__main__":
    N = [10, 1000, 100000, 10000000, 100000000]
    print("単純実装")
    for n in N:
        start = time.time()
        for x in range(n):
            f(x)
        print("n:{} time:{}".format(n, time.time()-start))

    print("並列処理")
    for n in N:
        start = time.time()
        with Pool(processes=8) as pool:
            pool.map(f, range(n))
        print("n:{} time:{}".format(n, time.time()-start))   

上記のプログラムで並列化させている部分が下記になります。このコードは、スレッドを8でメソッドfに0,...,nを与えて並列計算しているプログラムです。processes=8でスレッド数を変更できます。指定しない場合、os.cpu_count()で返す値(総スレッド数)を使うらしいです。先ほど調べた総スレッド数です。上でも説明した通り、このos.cpu_count()は現状使用できる総スレッド数ではないことに注意しないといけません。

"""
ちなみにpool.map(f, range(n))の返り値は、
n=10の場合
[0, 1, 4,..., 81]のようなリストで返ってきます。
"""
with Pool(processes=8) as pool:
    pool.map(f, range(n))

結果

単純実装
n:10 time:5.9604644775390625e-06
n:1000 time:0.0002338886260986328
n:100000 time:0.025127172470092773
n:10000000 time:2.3324098587036133
n:100000000 time:23.278836011886597
並列処理
n:10 time:0.19309091567993164
n:1000 time:0.1981947422027588
n:100000 time:0.19348502159118652
n:10000000 time:2.2826850414276123
n:100000000 time:21.083350658416748

nが10000000になってから並列化の方が若干早くなりました。ある程度重い処理でないと並列化の効果を発揮しないようです。

ちなみに、processes=1で回すと...

並列処理
n:10 time:0.11752891540527344
n:1000 time:0.11726498603820801
n:100000 time:0.11333918571472168
n:10000000 time:4.054936170578003
n:100000000 time:58.32061314582825

もう少し複雑な処理

上の例だとあまりにも単純なので、少し複雑な例としてmovielensのデータを使って実験してみたいと思います。

movielensのデータセット

今回使うmovielensのデータセットは、943人のユーザーが、1682の映画からいくつかの映画を評価をしているデータセットです。それぞれのユーザーは最低でも20種の映画を評価しています。このデータセットをユーザーと映画の評価値の評価値に直して、ユーザーごとのcos類似度を計算するまでの時間を測ってみたいと思います。

データセットの中身

user id|item id|rating|timestamp
196 242 3   881250949
...
119 392 4   886176814
...
305 451 3   886324817

データダウンロードサイト

基準のプログラム

並列化せずに強引にリストで計算しました。

sample2.py
# 基準の実装
import time
import numpy as np

def cos_sim(v1, v2):
    v1_ = np.array(v1)
    v2_ = np.array(v2)
    return np.dot(v1_, v2_) / (np.linalg.norm(v1_) * np.linalg.norm(v2_))

class Sample:
    def __init__(self, user_size=943, item_size=1682, file_path="movielens-100k/u.data"):
        self.file_path = file_path
        # user数×アイテム数のリスト
        self.eval_table = [[0 for _ in range(item_size)] for _ in range(user_size)]
        # user数×user数のcos類似度テーブル
        self.sim_table = [[0 for _ in range(user_size)] for _ in range(user_size)]


    def distinguish_info(self, line):
        u_id, i_id, rating, timestamp = line.replace("\n", "").split("\t")
        # u_idとi_idはitemのindexを一つずらす
        return int(u_id)-1, int(i_id)-1, float(rating), timestamp


    def calc_cossim(self, target_u_id, target_user_eval):
        for u_id ,user_eval in enumerate(self.eval_table):
            self.sim_table[target_u_id][u_id] = cos_sim(target_user_eval, user_eval)


    def run(self):
        f = open(self.file_path , 'r')
        # userとitemのテーブル作成
        start = time.time()
        for line in f:
            u_id, i_id, rating, _ = self.distinguish_info(line)
            self.eval_table[u_id][i_id] = rating

        # テーブルに基づいてcos類似度作成
        for target_u_id, target_user_eval in enumerate(self.eval_table):
            self.calc_cossim(target_u_id, target_user_eval)
        time = time.time()-start
        print("総時間:{}".format(time))


if __name__ == "__main__":
    s = Sample()
    s.run()

結果

総時間:262.024218082428

並列化実装

上のプログラムに下記の二つのメソッドをクラス内に追加しました。簡単な実装の通り、ある程度の処理量がないと並列化の恩恵を受けられないと思ったので、userとitemテーブルの並列化は今回行いませんでした。なので、cos類似度の計算部分を並列化してみました。

sample2.py
class Sample:

    # ...

    def wrapper(self, args):
        self.calc_cossim(*args)

    def run_pool(self, processes=8):
        f = open(self.file_path , 'r')
        # userとitemのテーブル作成
        start = time.time()
        for line in f:
            u_id, i_id, rating, _ = self.distinguish_info(line)
            self.eval_table[u_id][i_id] = rating

        # テーブルに基づいてcos類似度作成
        # 複数の引数を扱えるように少し加工
        tmp = [(target_u_id, target_user_eval) for target_u_id, target_user_eval in enumerate(self.eval_table)] 
        # 並列処理
        with Pool(processes=processes) as pool:
            pool.map(self.wrapper, tmp)
        time = time.time()-start
        print("総時間:{}".format(time))

wrapperメソッドは、引数が二つ使えるように用意したメソッドです。そのために渡すリストも少し加工しました。

が...

これではself.sim_tableは初期状態のままで更新されていませんでした。さらに修正しました。

sample2.py
class Sample:

    # ...

    def calc_cossim(self, target_u_id, target_user_eval):
        for u_id ,user_eval in enumerate(self.eval_table):
            self.sim_table[target_u_id][u_id] = cos_sim(target_user_eval, user_eval)
        # 変更
        return self.sim_table[target_u_id]

    # ...

    def wrapper(self, args):
        # 変更
        return self.calc_cossim(*args)

    def run_pool(self, processes=8):
        f = open(self.file_path , 'r')
        # userとitemのテーブル作成
        start = time.time()
        for line in f:
            u_id, i_id, rating, _ = self.distinguish_info(line)
            self.eval_table[u_id][i_id] = rating

        # テーブルに基づいてcos類似度作成
        tmp = [(target_u_id, target_user_eval) for target_u_id, target_user_eval in enumerate(self.eval_table)] 
        with Pool(processes=processes) as pool:
            # 変更
            self.right_sim_table = pool.map(self.wrapper, tmp)
        time = time.time()-start
        print("総時間:{}".format(time))

このように変更すると正常に計算できました。プロセスの外側ではクラス変数が変更されないようなので注意が必要です。そして、重要な実行時間ですが、

結果

総時間:72.14553785324097

約3.6倍の速度向上!

Appendix

並列処理について学んでいて気になった点をまとめました。

multirocessingとjoblibの違い

multiprocessingよりもjoblibの方が...

  • 並列化する関数に引数に配列以外の形が取れる
  • Ctrl+cで終了した時に子プロセスも終了してくれる
  • 自動で全体の進捗を表示してくれるオプションがある

らしいです。

しかし、multiprocessingの方が実行速度が速いようです。

参考サイト

並列処理と並行処理

  • 並列処理:実際に処理が同時に起こっている
  • 並行処理:複数の処理が(少なくとも論理的に)同時に走っていて、処理やイベントが起きるタイミングが任意である
    • プログラムAとBが同時に走っているとする
    • a1,a2,a3,...とb1,b2,b3…という処理を並行処理で行うとする
    • 実行順:a1,b1,a2,a3,b2,...とかa1,a2,a3,b1,b2,...かもしれない
    • 決して同時ではない

参考サイト

プロセスとスレッド

並列処理する上でよく出る単語のプロセスとスレッドについて軽くまとめました。

プロセス

  • プログラムの実行単位のこと
  • プロセスからプロセスを起動できる
    • あるプロセスから起動されたプロセスを「子プロセス」
    • 「子プロセス」を起動したプロセスを「親プロセス」という
  • 親子関係のあるプロセスでも別の仮想メモリ空間を使う

スレッド

  • プログラム内で並列処理を行うための仕組み
  • プロセスがスレッドの環境(メモリとか)を準備する

マルチプロセスとマルチスレッドのイメージ

  • マルチプロセス:webブラウザを複数起動させて、それぞれで別の処理を行うイメージ
  • マルチスレッド:シューティングゲームで自分自身、敵、障害物がそれぞれで動作するイメージ

参考サイト