参考にしました: https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/
概要
Executorでmax_workersを指定すると並列度が調整できますが、並列数を上回るペースでsubmitした場合にどうなるかというとブロックは起きません。その代わりメモリにため込むようです。この動きのため、大量に実行するとメモリを多く消費してしまうことがあります。
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(0, 1024*1024): # たくさん
executor.submit(fn, i) # つくる
# forループはすぐ終わるが、消費メモリがすごいことになっている
実際、100万ループするようなコードを書くとメモリを2GBぐらい消費します。なので、対応を考えることにしました。
内部実装と原因
内部実装を確認したところ、ThreadPoolExecutorは内部でキューを持っており、submitするとWorkItemというオブジェクトを作りキューに入れます。この内部のキューには上限が無くブロックすることもないため、際限なくsubmitできてしまいます。
ちなみに、Workerスレッドはキューに詰めたタイミングで作っていて、Workerスレッドはキューからデータを取り出し、実行、というのを無限ループでやっています。
確認コード
実際にその動きを観察してみます。例えば0.01秒かかる関数を5000回実行します。これをmax_workers=10で回してみます。
for文内での進捗としてタイムスタンプとメモリ(今回はmaxrss)を見ます。
タイムスタンプからsubmitでブロッキングは起きていないことがわかります(forループのsubmitする処理はすぐに終えてほとんどshutdown待ちになっている)
しかし、処理が進むにつれメモリ消費が増えていることがわかります。
対応
案1. キューをサイズ付きにする
最初に考えたのはこの方法です。
ThreadPoolExecutorの内部で使うキューをサイズ付きのキューにします。継承してインスタンス変数をすり替えます。
タイムスタンプからループ中にブロッキングが起きていることがわかります。しかしトータルの時間はさほど変わらずメモリの消費は非常に緩やかです。
コードは単純ですが、内部実装に手を出すのは若干いまいちな感じがしますし、ProcessPoolExecutorではこういったキューは持っていないので、この方法は使えません。
案2. Semaphoreで同時実行数を制御する
案1がいまいちだったのでどうにかできないか探していたところ参考元の記事を見つけました。
参考元を参考にPoolExecutorをラップしたクラスBoundedExecutorを作成します。API互換(map以外)なので差し替えて使えます。
内部実装はsubmit時にsemaphoreのカウントダウンを行い、ワーカーの処理が完了したときはsemaphoreのカウントアップをすること同時実行を制御します。「ワーカーの処理が完了したとき」というのは「futureのadd_done_callbackで登録した関数が完了時に呼ばれるとき」とします。(ワーカーの処理が完了した時もraise Exceptionした時もコールバックは呼ばれるので辻褄は合うはず)
こちらも、案1のときと同様な結果になりました。
ちなみに、max_workersよりも多くなるようにキューのサイズを決めたほうが良いです(コードで言えばbounded_ratio=1の箇所がbounded_ratio=2になるように引数を与えるor変える)
「並列数==キューのサイズ」にしてしまうと、キューに空きができてしまうタイミングが発生してしまい、ワーカーが遊んでしまい全体の完了が若干遅くなります。なので、少し多くなるようにするのがよいです。