#はじめに
これは,OpenCV Advent Calendar 2015の記事です.関連記事は,リンク先に目次としてまとめられています.
OpenCVの関数の多くは,入力データのチャネル数やビット深度に応じて呼び出す関数をスイッチしています.つまり,OpenCVの関数(やIPPの関数)は,各型とチャネル数に応じて徹底的にチューンされた関数を持っています.PythonやMatlabといった,現代に合わせた富豪的プログラミングを行う環境から真逆の発想を行っている感覚もありますが,結局,PythonやMatlabから呼び出される関数がOpenCVなどのチューンされたライブラリ関数であるため,どこかの段階では最適化された関数群を用意する必要があります.
ここでは,
- OpenCVの例
- IPPの例
- 実際に自分で関数を作る例
の3つの例を挙げて説明します.
#例1:OpenCVのボックスフィルタ
例1では,全て同じ重みで平滑化するボックスフィルタの一部であるgetColumnSumFilter
を例にとって説明します.(この関数とgetRowSumFilterをあわせて縦横のフィルタを作ってボックスフィルタにします.)下記に関数を抜粋します(一つ目のコード).この関数では,入力するMatのビット深度と出力するMatのビット深度の組み合わせごとにテンプレート関数ColumnSum
を用意して切り替える実装をしています.ColumnSumは,画像を横方向に積分する関数です.
明らかに入出力の組み合わせが爆発していますが,特定の良く使われる組み合わせに対してテンプレート関数の特殊化をしており,良く使う関数は高速化されています.例えば,int入力float出力の場合の例も表示します(2つ目のコード).
getColumnSumFilter
//smooth.cpp
cv::Ptr<cv::BaseColumnFilter> cv::getColumnSumFilter(int sumType, int dstType, int ksize,
int anchor, double scale)
{
int sdepth = CV_MAT_DEPTH(sumType), ddepth = CV_MAT_DEPTH(dstType);
CV_Assert( CV_MAT_CN(sumType) == CV_MAT_CN(dstType) );
if( anchor < 0 )
anchor = ksize/2;
if( ddepth == CV_8U && sdepth == CV_32S )
return makePtr<ColumnSum<int, uchar> >(ksize, anchor, scale);
if( ddepth == CV_8U && sdepth == CV_64F )
return makePtr<ColumnSum<double, uchar> >(ksize, anchor, scale);
if( ddepth == CV_16U && sdepth == CV_32S )
return makePtr<ColumnSum<int, ushort> >(ksize, anchor, scale);
if( ddepth == CV_16U && sdepth == CV_64F )
return makePtr<ColumnSum<double, ushort> >(ksize, anchor, scale);
if( ddepth == CV_16S && sdepth == CV_32S )
return makePtr<ColumnSum<int, short> >(ksize, anchor, scale);
if( ddepth == CV_16S && sdepth == CV_64F )
return makePtr<ColumnSum<double, short> >(ksize, anchor, scale);
if( ddepth == CV_32S && sdepth == CV_32S )
return makePtr<ColumnSum<int, int> >(ksize, anchor, scale);
if( ddepth == CV_32F && sdepth == CV_32S )
return makePtr<ColumnSum<int, float> >(ksize, anchor, scale);
if( ddepth == CV_32F && sdepth == CV_64F )
return makePtr<ColumnSum<double, float> >(ksize, anchor, scale);
if( ddepth == CV_64F && sdepth == CV_32S )
return makePtr<ColumnSum<int, double> >(ksize, anchor, scale);
if( ddepth == CV_64F && sdepth == CV_64F )
return makePtr<ColumnSum<double, double> >(ksize, anchor, scale);
CV_Error_( CV_StsNotImplemented,
("Unsupported combination of sum format (=%d), and destination format (=%d)",
sumType, dstType));
return Ptr<BaseColumnFilter>();
}
int入力float出力のtemplateの特殊化
template<>
struct ColumnSum<int, float> :
public BaseColumnFilter
{
ColumnSum( int _ksize, int _anchor, double _scale ) :
BaseColumnFilter()
{
ksize = _ksize;
anchor = _anchor;
scale = _scale;
sumCount = 0;
}
virtual void reset() { sumCount = 0; }
virtual void operator()(const uchar** src, uchar* dst, int dststep, int count, int width)
{
int i;
int* SUM;
bool haveScale = scale != 1;
double _scale = scale;
#if CV_SSE2
bool haveSSE2 = checkHardwareSupport(CV_CPU_SSE2);
#endif
if( width != (int)sum.size() )
{
sum.resize(width);
sumCount = 0;
}
SUM = &sum[0];
if( sumCount == 0 )
{
memset((void *)SUM, 0, sizeof(int) * width);
for( ; sumCount < ksize - 1; sumCount++, src++ )
{
const int* Sp = (const int*)src[0];
i = 0;
#if CV_SSE2
if(haveSSE2)
{
for( ; i < width-4; i+=4 )
{
__m128i _sum = _mm_loadu_si128((const __m128i*)(SUM+i));
__m128i _sp = _mm_loadu_si128((const __m128i*)(Sp+i));
_mm_storeu_si128((__m128i*)(SUM+i), _mm_add_epi32(_sum, _sp));
}
}
#elif CV_NEON
for( ; i <= width - 4; i+=4 )
vst1q_s32(SUM + i, vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i)));
#endif
for( ; i < width; i++ )
SUM[i] += Sp[i];
}
}
else
{
CV_Assert( sumCount == ksize-1 );
src += ksize-1;
}
for( ; count--; src++ )
{
const int * Sp = (const int*)src[0];
const int * Sm = (const int*)src[1-ksize];
float* D = (float*)dst;
if( haveScale )
{
i = 0;
#if CV_SSE2
if(haveSSE2)
{
const __m128 scale4 = _mm_set1_ps((float)_scale);
for( ; i < width-4; i+=4)
{
__m128i _sm = _mm_loadu_si128((const __m128i*)(Sm+i));
__m128i _s0 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)),
_mm_loadu_si128((const __m128i*)(Sp+i)));
_mm_storeu_ps(D+i, _mm_mul_ps(scale4, _mm_cvtepi32_ps(_s0)));
_mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm));
}
}
#elif CV_NEON
float32x4_t v_scale = vdupq_n_f32((float)_scale);
for( ; i <= width-8; i+=8 )
{
int32x4_t v_s0 = vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i));
int32x4_t v_s01 = vaddq_s32(vld1q_s32(SUM + i + 4), vld1q_s32(Sp + i + 4));
vst1q_f32(D + i, vmulq_f32(vcvtq_f32_s32(v_s0), v_scale));
vst1q_f32(D + i + 4, vmulq_f32(vcvtq_f32_s32(v_s01), v_scale));
vst1q_s32(SUM + i, vsubq_s32(v_s0, vld1q_s32(Sm + i)));
vst1q_s32(SUM + i + 4, vsubq_s32(v_s01, vld1q_s32(Sm + i + 4)));
}
#endif
for( ; i < width; i++ )
{
int s0 = SUM[i] + Sp[i];
D[i] = (float)(s0*_scale);
SUM[i] = s0 - Sm[i];
}
}
else
{
i = 0;
#if CV_SSE2
if(haveSSE2)
{
for( ; i < width-4; i+=4)
{
__m128i _sm = _mm_loadu_si128((const __m128i*)(Sm+i));
__m128i _s0 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)),
_mm_loadu_si128((const __m128i*)(Sp+i)));
_mm_storeu_ps(D+i, _mm_cvtepi32_ps(_s0));
_mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm));
}
}
#elif CV_NEON
for( ; i <= width-8; i+=8 )
{
int32x4_t v_s0 = vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i));
int32x4_t v_s01 = vaddq_s32(vld1q_s32(SUM + i + 4), vld1q_s32(Sp + i + 4));
vst1q_f32(D + i, vcvtq_f32_s32(v_s0));
vst1q_f32(D + i + 4, vcvtq_f32_s32(v_s01));
vst1q_s32(SUM + i, vsubq_s32(v_s0, vld1q_s32(Sm + i)));
vst1q_s32(SUM + i + 4, vsubq_s32(v_s01, vld1q_s32(Sm + i + 4)));
}
#endif
for( ; i < width; i++ )
{
int s0 = SUM[i] + Sp[i];
D[i] = (float)(s0);
SUM[i] = s0 - Sm[i];
}
}
dst += dststep;
}
}
double scale;
int sumCount;
std::vector<int> sum;
};
#例2:IPPの場合
実際に,OpenCV高速化のために使われているIntelのIPPライブラリ関数でも,細かく高速化しており,例えば四則演算はかなり細かく定義されています.例として,OpenCVのフォルダ内の3rdparty\ippicv\unpack\ippicv_win\include\ipp.h
内の一部を抜粋します.このように,同じようで,少しだけことなる関数をいくつも持っています.各関数の8uはuchar用,16sはshort用,16uはushort用の関数をあらわしており,C1は1チャネル用C3は3チャネル,C4は4チャネルを表しています.(下記のAC4は4チャネル目がアルファチャネル付の場合だったような記憶がありますが,詳しくは覚えていません.調べれば出てきます.)
このIPPについての具体的な話は次回に説明します.
Name: ippiAddC_8u_C1IRSfs, ippiAddC_8u_C3IRSfs, ippiAddC_8u_C4IRSfs, ippiAddC_8u_AC4IRSfs,
ippiAddC_16s_C1IRSfs, ippiAddC_16s_C3IRSfs, ippiAddC_16s_C4IRSfs, ippiAddC_16s_AC4IRSfs,
ippiAddC_16u_C1IRSfs, ippiAddC_16u_C3IRSfs, ippiAddC_16u_C4IRSfs, ippiAddC_16u_AC4IRSfs,
ippiSubC_8u_C1IRSfs, ippiSubC_8u_C3IRSfs, ippiSubC_8u_C4IRSfs, ippiSubC_8u_AC4IRSfs,
ippiSubC_16s_C1IRSfs, ippiSubC_16s_C3IRSfs, ippiSubC_16s_C4IRSfs, ippiSubC_16s_AC4IRSfs,
ippiSubC_16u_C1IRSfs, ippiSubC_16u_C3IRSfs, ippiSubC_16u_C4IRSfs, ippiSubC_16u_AC4IRSfs,
ippiMulC_8u_C1IRSfs, ippiMulC_8u_C3IRSfs, ippiMulC_8u_C4IRSfs, ippiMulC_8u_AC4IRSfs,
ippiMulC_16s_C1IRSfs, ippiMulC_16s_C3IRSfs, ippiMulC_16s_C4IRSfs, ippiMulC_16s_AC4IRSfs
ippiMulC_16u_C1IRSfs, ippiMulC_16u_C3IRSfs, ippiMulC_16u_C4IRSfs, ippiMulC_16u_AC4IRSfs
#例3:チャネル毎にオフセットを加算する関数を自作する場合
各チャネル,ビット深度毎に定義される関数は,具体的にどのように違っているのでしょうか?ここでは,チャネル毎に適当な値を加算する関数funcを作ることを想定し,それらの違いを理解していきます.
##作成する関数
まず,想定する関数は下記のように実装できます.任意のサイズ,任意のチャネルを持つ配列に対してチャネル毎にオフセットを加算する関数です.
void func(double* src, double* dest, int size, double* offset, int channels)
{
for (int i = 0; i < size; ++i)
{
for (int c = 0; c < channels; ++c)
{
dest[channels*i + c] = src[channels*i + c] + offset[c] ;
}
}
}
使い捨てのコードならば,これで十分な実装です.任意サイズ・チャネルに対応しており,double型で計算できれば,ほとんどの状況に対応可能な関数になっていると思います.しかし,広く使われるライブラリ関数としては不適格です.
もし,チューンした関数が用意されているとしたら,以下のような関数群が定義されることになります.下記のサフィックスは,8Uは8bitのビット深度を表しており8UC3は3チャネルの8bitのデータを表しています.同様にして32FC4は4チャネルの32bit float型を指しています.(引数などで類推可能なものは省略)
func_8UC1(uchar* src, uchar* dest);
func_8UC2(uchar* src, uchar* dest);
func_8UC3(uchar* src, uchar* dest);
func_8UC4(uchar* src, uchar* dest);
func_8UC1(uchar* srcdest);
func_8UC2(uchar* srcdest);
func_8UC3(uchar* srcdest);
func_8UC4(uchar* srcdest);
...
func_16SC1(short* src, short* dest);
func_16SC2(short* src, short* dest);
...
func_32SC1(int* src, int * dest);
func_32SC2(int* src, int * dest);
...
func_32FC1(float* src, float* dest);
func_32FC2(float* src, float* dest);
...
func_64FC1(double* src, double* dest);
func_64FC2(double* src, double* dest);
...
##関数内での切り替え
入力されたデータのチャネル数やビット深度は,InputArrayやMatのメソッドであるchannels()
やdepth()
,もしくは両者を取れるtype()
という基本的な関数で情報の取得ができます.さらには,inplaceかどうか(入力と出力が同じものを入れている場合)で関数を用意している場合もあります.例えば,用意された最適化関数をチャネル数やビット深度に応じて切り替える関数funcOpt
は下記のように定義することが出来ます.(depthだけ,channlesだけの分岐ですむのであれば,条件数は少なくなります.)
void funcOpt(InputArray src, InputArray offset, OutputArray dest)
{
//srcとdestが同じかチェック.
if (src.getMat().data != dest.getMat().data)
{
switch (src.type())
{
case CV_8UC1: func_8UC1(); break;
case CV_8UC2: func_8UC2(); break;
case CV_8UC3: func_8UC3(); break;
case CV_8UC4: func_8UC4(); break;
...
case CV_32FC1: func_32FC1(); break;
...
}
}
else
{
switch (src.type())
{
case CV_8UC1: func_8UC1(); break;
case CV_8UC2: func_8UC2(); break;
case CV_8UC3: func_8UC3(); break;
case CV_8UC4: func_8UC4(); break;
...
case CV_32FC1: func_32FC1(); break;
...
}
}
##具体的な実装
実際に各条件の中身を書いてみましょう.各関数を高速に動作させるために,SIMDでベクトル化して定義します.詳しくは,昔に書いた,組み込み関数(intrinsic)によるSIMD入門を参照してください.
###8bitで1チャネルの場合
ucharの型が入力されることを前提にした命令セットを発行します.
void func_8UC1(uchar* src, uchar* dest, int size, uchar offset, int channels)
{
int sse_uchar_size = size / 16;
const __m128i moffset = _mm_set1_epi8(offset);
for (int i = 0; i < sse_uchar_size; ++i)
{
__m128i ms = _mm_load_si128((const __m128i*)src);
ms = _mm_add_epi8(ms, moffset);
_mm_store_si128((__m128i*)dest, ms);
src += 16; dest += 16;
}
}
###8bitで1チャネルの場合(inplaceの場合)
また,入力と出力が同じinplaceな実装の場合,メモリアクセスやキャッシュの操作が出来ます.メモリのポインタの移動が一回ですんだり,stream命令により無駄なキャッシュを防ぐことをCPUに命令することが出来ます.
void func_incpace_8UC1(uchar* srcdest, int size, uchar offset, int channels)
{
int sse_uchar_size = size / 16;
const __m128i moffset = _mm_set1_epi8(offset);
for (int i = 0; i < sse_uchar_size; ++i)
{
__m128i ms = _mm_load_si128((const __m128i*)srcdest);
ms = _mm_add_epi8(ms, moffset);
_mm_stream_si128((__m128i*)srcdest, ms);
srcdest += 16;
}
}
###32bit浮動小数点で1チャネルの場合
浮動小数点になると命令セットが代わり,ベクトル化の処理単位も小さくなります.
void func_32FC1(float* src, float* dest, int size, float offset, int channels)
{
int sse_float_size = size / 4;
const __m128 moffset = _mm_set1_ps(offset);
for (int i = 0; i < sse_float_size; ++i)
{
__m128 ms = _mm_load_ps(src);
ms = _mm_add_ps(ms, moffset);
_mm_store_ps(dest, ms);
src += 4; dest += 4;
}
}
###32bit浮動小数点で3チャネルの場合
チャネル数が増えるとメモリのレイアウトを変える必要が出てきます.通常,画像を保存する場合,構造体配列AoS(Array of Structure)形式で保存されている場合が多く,カラーの場合,RGBの画素が並ぶRGBRGBRGBのような構造でデータが保持されています.しかし,チャネル毎に処理を変える場合,SIMD演算には都合がわるくそれを配列構造体SoA(Structure of Array),カラーの場合なら,全部R,全部G,全部Bの配列に変換するほうが効率がよくなります.
注意)この例である,RGBの3チャネルの場合が最も複雑です.2の倍数である2チャンネル,4チャンネルの場合は,より簡単にSIMDベクトル化できます.なお,この記事も参考になります.
void func_32FC3(float* src, float* dest, int size, float* offset, int channels)
{
int sse_floatc3_size = size / 12;
const __m128 moffset0 = _mm_set1_ps(offset[0]);
const __m128 moffset1 = _mm_set1_ps(offset[1]);
const __m128 moffset2 = _mm_set1_ps(offset[2]);
for (int i = 0; i < sse_floatc3_size; ++i)
{
//AoS 2 SoA
__m128 a = _mm_load_ps(src);
__m128 b = _mm_load_ps(src + 4);
__m128 c = _mm_load_ps(src + 8);
__m128 rrrr, gggg, bbbb;
__m128 aa = _mm_shuffle_ps(a, a, _MM_SHUFFLE(1, 2, 3, 0));
aa = _mm_blend_ps(aa, b, 4);
__m128 cc = _mm_shuffle_ps(c, c, _MM_SHUFFLE(1, 3, 2, 0));
aa = _mm_blend_ps(aa, cc, 8);
bbbb = aa;
aa = _mm_shuffle_ps(a, a, _MM_SHUFFLE(3, 2, 0, 1));
__m128 bb = _mm_shuffle_ps(b, b, _MM_SHUFFLE(2, 3, 0, 1));
bb = _mm_blend_ps(bb, aa, 1);
cc = _mm_shuffle_ps(c, c, _MM_SHUFFLE(2, 3, 1, 0));
bb = _mm_blend_ps(bb, cc, 8);
gggg = bb;
aa = _mm_shuffle_ps(a, a, _MM_SHUFFLE(3, 1, 0, 2));
bb = _mm_blend_ps(aa, b, 2);
cc = _mm_shuffle_ps(c, c, _MM_SHUFFLE(3, 0, 1, 2));
cc = _mm_blend_ps(bb, cc, 12);
rrrr = cc;
//add operation
bbbb = _mm_add_ps(rrrr, moffset0);
gggg = _mm_add_ps(rrrr, moffset1);
rrrr = _mm_add_ps(rrrr, moffset2);
////SoS 2 AoS
a = _mm_shuffle_ps(bbbb, bbbb, _MM_SHUFFLE(2, 3, 0, 1));
b = _mm_shuffle_ps(gggg, gggg, _MM_SHUFFLE(1, 2, 3, 0));
c = _mm_shuffle_ps(rrrr, rrrr, _MM_SHUFFLE(3, 0, 1, 2));
_mm_stream_ps((dest), _mm_blend_ps(_mm_blend_ps(b, a, 4), c, 2));
_mm_stream_ps((dest + 4), _mm_blend_ps(_mm_blend_ps(c, b, 4), a, 2));
_mm_stream_ps((dest + 8), _mm_blend_ps(_mm_blend_ps(a, c, 4), b, 2));
src += 12; dest += 12;
}
}
#理想と現実
上記のようにして,OpenCVの関数は,ビット深度やチャンネル数,inplaceかどうかに応じて関数を切り替えて最適な関数を選んでいます.
四則演算やsin, cos, exp, powなどの基礎的な演算関数は丁寧に作られていることが多いですが,現実問題として全ての関数をこのように実装すると恐ろしく実装時間がかかってしまいます.そこで,最適化された関数がない場合は,CV_ErrorやCV_Assertが呼ばれることになります(@dandelion1124 さんのこの記事とかあの記事とかを参照).もしくは,良く使うチャネル数やビット深度に応じた関数だけを用意して,後はconvertやキャストなどの型変換で対応する場合もあります.たとえば,最速で処理したい8Uの場合と任意の関数を動かすためにfloatの関数だけを最適化し,それ以外はキャストして使うように実装するのもひとつの手です.
OpenCVの関数でも,CV_8Uしか対応していない関数(ラベリングなど多数)やカラーしか受け付けない関数(cvtColorなど)などがほとんどであり,これらは,不適切な入力データが入ってくるとエラーを出力するように作られています.このような非対応のチャネル数やビット深度のものが必要となったときに,自身で実装してOpenCVへコントリビュートすれば大変喜ばれるでしょう.
#まとめ
この記事では,OpenCVでは,チャネル数,ビット深度に応じて高度に最適化された関数が呼ばれていることを説明しました.
SIMDベクトル化は20年以上の歴史があるにもかかわらず,これらの最適化を自動的にやってくれるほどコンパイラの進化は追いついておらず,まだまだ人の手でやる必要があります.
クヌース先生の高速化の原則で,「早すぎる最適化は諸悪の根源だ」という言葉は良く知られている格言だと思います.しかし,その前後に続く本当の言葉を知っているでしょうか?実際はこのように述べています.
"We should forget about small efficiencies, say about 97% of the time:
premature optimization is the root of all evil. Yet we should not pass up
our opportunities in that critical 3%”
Donald Knuth
"細かな効率のことなんかはさっぱり忘れて,時間の97%について考えよう.つまり,早すぎる最適化は諸悪の根源だ.それでも残り3%の機会を決して逃すべきではない."
そして,OpenCVのチームはさらにこう続けています.
… especially when you are doing computer vision on a cell phone
OpenCV team
...特に,モバイルでコンピュータビジョンを実現しようとするならね.
読めば分かるとおり,最適化は非常にめんどくさいものです.そのため,OpenCVのコードもぜんぜん最適化されていません.しかし,手間を惜しまなければ,OpenCVよりも速いコードは簡単にかけます.
皆様も残り**3%**の最適化に励みましょう!!
次回は,高速化のためのモジュールであるHALモジュールやその他の高速化ライブラリについて書きます.