はじめに
この記事は,前回のAviUtlの拡張編集のスクリプトに関する記事 についた @yumetodo さんのコメント「並列化でC++11のstd::threadを使っていますが、一般にスレッドを作るコストは重いので、C++17の並列アルゴリズムないし、Intel TBBを使ってあげるとスレッドを内部で使いまわしてくれます」を受けて試してみたことの報告です.
サポート状況
まず,私は (おそらく多くの AviUtl ユーザーと異なり) MSYS2 上の g++ を使って AviUtl のプラグインおよび拡張編集スクリプトのコンパイルを行っています.なので C++17 のサポート状況 を確認すると,GCC で「Parallelism TS の標準化」は-ltbb
とのリンクが必須とのこと.なのでとりあえず TBB だけ考えればいいかなとなりました.(現在の MSYS2 の pacman で最新にした g++ のバージョンは 9.2.0 です)
ところが,MSYS2 の pacman でインストールできる Intel TBB を使うと,動的リンクしかできず,実行時に「tbb.dll」「libgcc_s_dw2-1.dll」「libstdc++-6.dll」「libwinpthread-1.dll」の4つもの DLL が必要で,パスの関係上,スクリプトフォルダではなく,aviutl.exe のルートなどにこれらを配置する必要があり,自分で使うのはともかく,配布用としてはかなり難があると言えます.
なので,スレッドを使いまわしさえすればいいのであれば,自前でそのような実装をすればよいだろうと考えました.似たようなことをしている例は探せば色々ありますが,とりあえずそれらは見ずに自分でやってみます.
実装例
もともとの,ナイーブに std::thread を使う例
#include <thread>
namespace KSA {
class ClipResize {
// 省略
public:
static void
invoke_interpolate(ClipResize *p, int y_start, int y_end)
{
for (int dy=y_start; dy<y_end; dy++) {
for (int dx=0; dx<(p->x.dest_size); dx++) {
// 出力側の (dx, dy) 座標の画素値を内挿計算する
p->interpolate(dx, dy);
}
}
}
};
static int
ksa_clip_resize(lua_State *L)
{
// 引数受け取りとか重み計算とかを省略
// スレッドの配列を確保
std::unique_ptr<std::unique_ptr<std::thread>[]> threads(
new std::unique_ptr<std::thread>[n_th] );
// スレッド番号ごとに引数をズラして,それぞれのスレッドをたてる
for (int t=0; t<n_th; t++) {
threads[t].reset(new std::thread(ClipResize::invoke_interpolate, p.get(),
( t*(p->y.dest_size) )/n_th, ( (t+1)*(p->y.dest_size) )/n_th));
}
// 全部のスレッドの終了を待ち合わせる
for (int t=0; t<n_th; t++) {
threads[t]->join();
}
return 0;
}
};
自前 ThreadPool の例
汎用的な実装としては,ジョブをキューで管理するものが主流のようだが,同期処理しかしないので,同期までを行うThreadPool::invoke
が使えればいいかな的な実装.データは関数オブジェクトに埋め込むほうが C++ 的だったかも知れないが,C 言語脳だったので,生ポインタをvoid *
にキャストして渡すという古めかしいパターンを使用.
なお,私のメイン PC では正常に動作するが,サブ PC では,AviUtl の終了時に static 変数のスレッドプールが正常に破棄されないらしく,ゾンビプロセスが残ってしまう事象が発生.回避策は不明.
以下は重要部分の抜粋,全体は feature/#3 ブランチ を参照.
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <cstdint>
namespace KSA {
class ThreadPool {
private:
enum class State { NotInitialized, JobPrepared, JobFinished };
class Thread {
public:
std::thread thread;
State state;
std::mutex mx;
std::condition_variable cv;
Thread() : state(State::NotInitialized)
{
}
};
std::unique_ptr<Thread[]> threads;
std::size_t size;
std::function<void(void *, std::size_t, std::size_t)> func;
void *data;
std::size_t n;
bool terminate;
void
listen(Thread *th, const std::size_t id)
{
for (;;) {
{
// ジョブが投入されるまで待機
std::unique_lock<std::mutex> lk(th->mx);
th->cv.wait(lk, [&]{ return (th->state==State::JobPrepared); });
}
if ( terminate ) {
// プールが破棄されるときにようやく返る
return;
}
// ジョブを実行
func(data, id, n);
// ジョブの終了を通知
{
std::lock_guard<std::mutex> lk(th->mx);
th->state = State::JobFinished;
}
th->cv.notify_one();
}
}
public:
ThreadPool() : size(std::thread::hardware_concurrency()), terminate(false)
{
threads.reset(new Thread[size]);
for (std::size_t i=0; i<size; i++) {
threads[i].thread = std::thread(listen, this, &threads[i], i);
}
}
~ThreadPool()
{
{
// std::thread を正常終了させるため,listen 関数を返させる
for (std::size_t i=0; i<size; i++) {
threads[i].mx.lock();
threads[i].state = State::JobPrepared;
}
terminate = true;
for (std::size_t i=0; i<size; i++) {
threads[i].mx.unlock();
threads[i].cv.notify_all();
}
}
for (std::size_t i=0; i<size; i++) {
// join でもどっちでもいいかな
threads[i].thread.detach();
}
}
void
invoke(std::function<void(void *, std::size_t, std::size_t)> f, void *p, std::size_t m)
{
// ジョブの準備
func = f;
data = p;
if ( size < m ) {
n = size;
} else {
n = m;
}
for (std::size_t i=0; i<n; i++) {
// ジョブ準備の完了をワーカーに通知
{
std::lock_guard<std::mutex> lk(threads[i].mx);
threads[i].state = State::JobPrepared;
}
threads[i].cv.notify_one();
}
for (std::size_t i=0; i<n; i++) {
// ジョブの完了を待ち合わせ
std::unique_lock<std::mutex> lk(threads[i].mx);
threads[i].cv.wait(lk, [&]{ return (threads[i].state==State::JobFinished); });
}
}
};
static std::unique_ptr<ThreadPool> TP;
class ClipResize {
// 略
static void
invoke_interpolate(void *data, std::size_t t, std::size_t n_th)
{
ClipResize *p = static_cast<ClipResize *>(data);
std::size_t y_start = ( t*(p->y.dest_size) )/n_th;
std::size_t y_end = ( (t+1)*(p->y.dest_size) )/n_th;
for (std::size_t dy=y_start; dy<y_end; dy++) {
for (int dx=0; dx<(p->x.dest_size); dx++) {
p->interpolate(dx, dy);
}
}
}
};
static int
ksa_clip_resize(lua_State *L)
{
// 略
TP->invoke(ClipResize::invoke_interpolate, p.get(), n_th);
return 0;
}
};
// 略
extern "C" {
int
luaopen_ksa_ext(lua_State *L)
{
KSA::TP.reset(new KSA::ThreadPool()); // エントリポイントでスレッドプールを作る
luaL_register(L, "ksa_ext", ksa_ext);
return 1;
}
}
Intel TBB を使う例
全体は feature/#3-tbb ブランチ を参照.
class ClipResize {
// 略
class InvokeInterpolate {
private:
ClipResize *const cr_data;
public:
void
operator()( const tbb::blocked_range<size_t>& r )
const {
ClipResize *p = cr_data;
for (std::size_t dy=r.begin(); dy<r.end(); dy++) {
for (int dx=0; dx<(p->x.dest_size); dx++) {
p->interpolate(dx, dy);
}
}
}
InvokeInterpolate( ClipResize *data ) : cr_data(data)
{
}
};
};
static int
ksa_clip_resize(lua_State *L)
{
// 略
parallel_for(tbb::blocked_range<size_t>(0, p->y.dest_size), ClipResize::InvokeInterpolate(p.get()));
return 0;
}
速度計測
大体の速さがわかればいいので,適当な動画に拡縮エフェクトを掛けて NVEnc で出力したときの「Aviutl 平均フレーム取得時間 (ms)」を見ることにします.一応ばらつきを考えて 3 回ずつくらい計測しておきましょう.
実装法 | 1 回目 | 2 回目 | 3 回目 |
---|---|---|---|
生 std::thread | 126.127 | 120.902 | 120.951 |
自前 ThreadPool | 122.053 | 126.009 | 121.103 |
Intel TBB | 112.265 | 113.690 | 110.645 |
リサイズなし | 16.259 | 16.118 | 16.115 |
基本効果>リサイズ | 21.211 | 20.849 | 21.232 |
なんと,もともとのstd::thread
をナイーブに使った場合と,これを使い回すようにした自前 ThreadPool は全く変わらないという結果に.その他にも最適化されているであろう Intel TBB を使うと少し速くはなりますが,スレッド作成に伴うオーバーヘッドとは別の部分の差と思われます.例えば私の実装では分割の均等性への配慮がイマイチな所,TBB はもう少しうまくやっているとか.
この結果を考えると,g++ (9.2.0) ではstd::thread
は内部的にはスレッドを使いまわす仕様であり,future
を含め,スレッド生成に伴うオーバーヘッドを気にせずに使えるということなのでしょうか.
「リサイズなし」はリサイズエフェクト以外の,フレーム生成にかかる時間の目安,「基本効果>リサイズ」はデフォルトのリサイズエフェクトです.「リサイズなし」をオフセットと考えて差し引いて考えると,私の実装における Lanczos3 リサイズはデフォルトのリサイズに比べて (今回の計測に使ったテスト動画とテスト環境では) 20 倍強も遅いことになります.
結論
- Intel TBB を使うと少し速くなるものの,DLL が必要となって取り回しが面倒になることを上回るほどのものではない.
- 自前 ThreadPool は,実装を少し変えると parallel_for と同様の引数で使えるようにでき,使う側のコードは簡単になるものの,環境によってゾンビプロセスが残る問題がある.
以上のことから,元のstd::thread
をそのまま使うのがよいだろうということに.スレッド生成からjoin
までを一体化したラッパー関数は作ってもいいかも知れませんが,明示的にstd::thread
を使い回す面倒な実装までは必要ないでしょう.