10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Python高速化【並列処理】【マルチスレッド】【マルチプロセス】

Last updated at Posted at 2021-11-14

はじめに

 pythonのプロダクトの一部処理(I/Oバウンドなプログラム)を高速化する必要があり、初めて並列処理を学習しました!今回は学習した内容をアウトプットしていきたいと思います。
初学者のため、理解不足な点が多々あると思います。気になった点やアドバイスなどありましたら、教えていただけますと幸いです。

前提知識

  • プロセス
      プログラムの実行単位、固有のメモリを持つ。

  • スレッド
     プロセス内の処理の実行単位、プロセス内のメモリを共有。

  • I/Oバウンド

    • プログラムのディスクとの入出力による負担のこと。
    • ファイルの読み書き、DBへの接続、ネットワーク通信で発生することが多い。
    • 今回高速化したいプログラムはI/Oバウンドなプログラム
  • CPUバウンド

    • CPUに負荷がかかること。
    • 数値計算のようにCPUを使い続けるような処理とかで発生することが多い。
  • GIL(グローバルインタプリタロック)

    • 例えば、二つのスレッドを同時に実行する場合、スレッドAが変数Cを書き換えている間にスレッドBも同じ変数Cを書き換えようとしてエラーが発生する。このようなスレッド間の競合を防ぐため、 スレッドA実行中はスレッドBを待機させる仕組みがGIL。
    • GILロックを取得したスレッドのみが処理を実行する。

マルチスレッドとマルチプロセス

image.png
画像引用:https://www.youtube.com/watch?v=LQRMX-1Rzew

マルチスレッド

  • 1つのプロセス内で複数のスレッドを実行。
  • スレッド同士、同じメモリ空間にあるためデータの受け渡しが容易
  • GILロックを取得したスレッドが処理を実行している間、他のスレッドは待機、ある期間経過したらGILロックが解放されてGILロックを取得した別のスレッドが処理を実行していく。スレッド同士でGILロックを取り合ってるイメージ。
  • 同時にスレッドが実行されているわけではない。
  • 高速化したい処理がI/OバウンドかCPUバウンドかで高速化できるかどうかが変わってくる。

マルチスレッドのI/OバウンドとCPUバウンド

image.png

  • I/Oバウンドの処理には効果的。
  • スレッド数がマルチプロセスより増やせるのでスケールしやすい。
    • コア数により並列化できるプロセスが決まってくるから。
  • CPUバウンドの処理には不向き。

マルチプロセス

  • 複数のプロセスを立ち上げて処理を実行
  • 全プロセスが同時に走る
  • コアが別なので、GILロックを取得する必要なし
  • スレッド生成よりプロセス生成は時間とメモリ消費が大きくなる。
    • プロセス同士、メモリを共有していないので必要なデータを複製する必要があるため。

マルチプロセスのI/OバウンドとCPUバウンド

image.png

  • GILの影響を受けない
  • CPUのコア数を超えて並列化できないのでスケールしない。
  • プロセス間通信しないとデータの受け渡しができない。

使用したライブラリ concurrent.futures

  • Python3.2 から追加されたマルチスレッド、マルチプロセス用のライブラリ。
  • マルチスレッドは ThreadPoolExecutor()

    マルチプロセスはProcessPoolExecutor()と書き換えるだけ。

サンプルコード

test1.py
# I/Oバウンドの処理をシングルスレッド、マルチスレッド、マルチプロセスでそれぞれ実行
import concurrent.futures
import os
import time


LEARGE_TEXT = 'string' * 100000000

def io_bound(file_name):
    with open(file_name, 'w+') as f:
        # ファイルにLEARGE_TEXTを書き込み
        f.write(LEARGE_TEXT)
        # ファイルの先頭に移動
        f.seek(0)
        # ファイル読み込み
        f.read()
    # ファイル削除
    os.remove(file_name)
    return 'Future is done.'

if __name__ == '__main__':
    start = time.time()
    print(io_bound('1.text'))
    print(io_bound('2.text'))
    end = time.time()
    print('シングルスレッド: TIME {:.4f}\n'.format(end - start))

    # マルチスレッド ThreadPoolExecutor
    # max_workersでスレッド数指定
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        start = time.time()
        # submitで第一引数に処理対象の関数、第二引数以降に引数指定
        future1 = executor.submit(io_bound, '1.text')
        future2 = executor.submit(io_bound, '2.text')
        print(future1.result())
        print(future2.result())
        end = time.time()
        print('マルチスレッド: TIME {:.4f}\n'.format(end - start))
    
    # マルチプロセス ProcessPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        start = time.time()
        future1 = executor.submit(io_bound, '1.text')
        future2 = executor.submit(io_bound, '2.text')
        print(future1.result())
        print(future2.result())
        end = time.time()
        print('CPUの数:', os.cpu_count())
        print('マルチプロセス: TIME {:.4f}\n'.format(end - start))

ターミナル
% python test1.py
Future is done.
Future is done.
シングルスレッド: TIME 1.6771

Future is done.
Future is done.
マルチスレッド: TIME 1.2766

Future is done.
Future is done.
CPUの数: 8
マルチプロセス: TIME 1.5025
test2.py
# CPUバウンドの処理をシングルスレッド、マルチスレッド、マルチプロセスでそれぞれ実行
import concurrent.futures
import os
import time


def cpu_bound():
    i = 0
    while i < 10000000:
        i = i + 1 - 2 + 3 - 4 + 5
    return 'Future is done.'

if __name__ == '__main__':
    start = time.time()
    print(cpu_bound())
    print(cpu_bound())
    end = time.time()
    print('シングルスレッド: TIME {:.4f}\n'.format(end - start))

    # マルチスレッド ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        start = time.time()
        # submitで第一引数に処理対象の関数、第二引数以降に引数指定
        future1 = executor.submit(cpu_bound)
        future2 = executor.submit(cpu_bound)
        print(future1.result())
        print(future2.result())
        end = time.time()
        print('マルチスレッド: TIME {:.4f}\n'.format(end - start))
    
    # マルチプロセス ProcessPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        start = time.time()
        future1 = executor.submit(cpu_bound)
        future2 = executor.submit(cpu_bound)
        print(future1.result())
        print(future2.result())
        end = time.time()
        print('CPUの数:', os.cpu_count())
        print('マルチプロセス: TIME {:.4f}\n'.format(end - start))

ターミナル
% python test2.py
Future is done.
Future is done.
シングルスレッド: TIME 1.1014

Future is done.
Future is done.
マルチスレッド: TIME 1.1036

Future is done.
Future is done.
CPUの数: 8
マルチプロセス: TIME 0.6459

  • I/Oバウンドの処理ではマルチスレッド、マルチプロセス、どちらでも高速化できる。
  • ただ、何回かに1回シングルスレッドよりマルチプロセスの方が処理時間がかかることがある。
  • おそらくプロセス生成に時間がかかっているためだと思うので、一つの処理時間が短い場合は、マルチプロセスよりマルチスレッドの方が安定して効果を発揮すると思われる。
  • CPUバウンドの処理ではシングルスレッドよりマルチスレッドの方が処理時間がかかるため、マルチプロセスで実装した方が良い。

実装結果

  • 今回私が高速化したいプログラムもI/Oバウンドなプログラムであったため、マルチスレッドで実装し、最終的に処理時間が60分から5分まで高速化できました。
  • マルチプロセスでも実装してみたのですが、プロセス間のデータのやり取りがうまくいかず、毎回エラー内容が変わるという事象が発生し、断念。マルチスレッドで実装しました。

まとめ

  • 高速化したい処理がI/OバウンドなのかCPUバウンドなのかによって、マルチスレッド、マルチプロセス、どちらが適しているのかが変わってくるということに行き着くまで試行錯誤しました。
  • 今回はマルチスレッドで高速化できましたが、マルチプロセスでも高速化を実現できるように引き続き勉強していきたいと思います。

参考文献

10
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?