※コードには誤りがありましたので、コメント欄で指摘いただいたパッチも併せてご覧ください(チューニングによって大きく性能が変わるという論旨には影響ないと思います)。
はじめに
どうも、オリィ研究所(http://orylab.com/) の ryo_grid こと神林です。
今回は、自宅で大学院生時代に授業で書いたレポート(2008年だったはず)が出てきて、割と面白いかなと思ったので記事にしてみます。
手抜きだったので読みにくいですが、チューニングで大きく性能が変わるということだけ伝われば幸いです。
授業は筑波大学計算科学研究センターのHPCセミナーというやつです。
HPCはハイパフォーマンスコンピューティングの略で、スーパーコンピュータ等に代表されるような大規模計算機や並列・分散システムなどを対象に、ハードウェアやソフトウェアなどの基盤技術を研究開発したり、それらを利用した数値計算ソフト(アプリケーションソフトウェア含む)の研究開発なんかをやっている分野です(すごくざっくり説明すると)。
上記の授業は、毎年開講していて、学外の方も受講可能なので、興味のある方は行ってみるとよいでしょう。
http://www2.ccs.tsukuba.ac.jp/workshop/HPCseminar/2016/
資料はリンクが見当たらなかったので、似たようなセミナー資料の以下を見てみるとよいかと思います。
http://www.hpc.cmc.osaka-u.ac.jp/wp-content/uploads/2014/11/20140617_02_parallelcom_siryo.pdf
http://www.cc.u-tokyo.ac.jp/support/kosyu/X01/shiryou-1.pdf
以下は行列積をループアンローリング、キャッシュブロッキング、ベクトル化などを行ってチューニングした課題のお話です。
内容は間違ってる可能性もあるので鵜呑みにはしないでください(笑)
結果としては、プロセッサの論理ピーク性能の23.1%しか引き出せなかった(チューニング無しだとたったの3%強)わけですが、BLASと呼ばれるようなライブラリを使うと、めっちゃチューニングされてるので90%超えとかでたりするようです。すごいですね(一応HPCが専門だったのに他人事・・・)。
(参考:http://www.cms-initiative.jp/ja/events/0530nakata.pdf)
チューニングには以下で行っているループアンローリング、キャッシュブロッキング、ベクトル化などの他にも、レジスタブロッキングだとか、インストラクションレベルでの最適化とかいろいろあって、奥が深いです。
そこらへんに興味のある方は以下の書籍など読んでみるとよいでしょう。
https://www.amazon.co.jp/RISC%E8%B6%85%E9%AB%98%E9%80%9F%E5%8C%96%E3%83%97%E3%83%AD%E3%82%B0%E3%83%A9%E3%83%9F%E3%83%B3%E3%82%B0%E6%8A%80%E6%B3%95-%E5%AF%92%E5%B7%9D-%E5%85%89/dp/4320027507
あと、似たような記事で以下がありました。
こっちの方がちゃんと書かれています(笑)
http://int.main.jp/txt/matmul/
利用計算機
Machine: Thinkpad X61
Processor:
Intel(R) Core(TM)2 Duo CPU T7500 2.20GHz
L1 Cache unknown
L2 Cache 4MB
Memory: 2GB
OS: Ubuntu Linux 8.04 Hardy Haron
Compiler: gcc バージョン 4.2.3 (Ubuntu 4.2.3-2ubuntu7)
理論ピークFlops: 2.2GHz * 4 = 8.8GFlops
(プロセッサ全体としては2コアなので17.6GFlops)
#コンパイルオプション
icc -O3 -o matrix_tuning.out matrix_tuning.c -msse -DDEBUG
(以降の評価において、最適化オプションはO3でない場合がある。その場合は明示する)
(gccではSSEを用いたコードのコンパイルができなかったため、iccを利用した)
(DEBUGオプションを有効にすると実行時に計算結果が正しいかのチェックを行う)
必要とする浮動小数点演算数(Flops)
最適化以前のコードでは
最内ループの計算は浮動小数点演算2回で、1000回の3重ループなので
210^9(ギガ)回
最内ループ終了時の、積の配列へ加算が浮動小数点演算1回で、1000回の2重ループなので
210^6(メガ)回
従って、全体としては、210^9+210^6回の浮動小数点演算が必要となる。
最適化を行っても、総演算回数のうちの大部分を占める最内ループでの計算回数は変化しないため、大局的には総演算回数は変化しない。
計算時間の推移
以下にコードに最適化を施していった過程でどのように高速化が行われたかを示す。
以下の記述においては、字下げでネストが行われており
例えば
・foo
・bar
とあれば、fooという最適化を施した上に、barという最適化を施したことを意味する.
・課題の中で提供されていたコード(何の最適化も施していない行列積のコード)
→6.916857 sec (最適化オプションO3でコンパイル)
・ブロックサイズ50でキャッシュブロッキング
→1.441473 sec
・kkのループを5ループ分展開(ループアンローリング)
・iiのループを2ループ分展開 ----------コード1
→0.980417 sec
・ブロックサイズ50でキャッシュブロッキング(ループアンローリングなしに戻した)
→1.441473 sec
・上のコードでSSEスカラ命令を利用するようにした ----------コード2
→1.473655 sec
・配列を全て__m128dで持つようにしてSSEスカラ演算を利用 -----------コード3
→1.694964 sec
・課題の中で提供されていたコード(何の最適化も施していない行列積のコード)
→12.056244 sec (最適化オプションO0でコンパイル)
(SSEを用いたコードが、最適化オプションがO1以上の場合にSegmentation Faultになるため、以下はO0でコンパイルして測定(コード3はO1でも動くが、比較のためO0))
・SSEベクトル演算を用い、最も基本的な行列の計算をkのループでベクトル化(並列度2) ----------コード4
行列aとbは_m128d型の配列とし、行列c、つまり計算結果の配列は通常のdoubleの配列とした。
→6.136489 sec
・ブロックサイズ50でキャッシュブロッキング ---------コード5
O0で比較(O1でもSegmentation Faultになる)
→2.836396 sec
(一方、SSE無しで最後まで最適化したもの(コード1)をO0で動かすと 7.082314 sec)
実行結果
複数のコードを作成したが、出力結果は皆同様なので、一つだけ示す。
$./matrix_tuning.out
elapsed 2.844164 sec
diff is 0.000000
結論
いくつかのパターンで最適化を行った。
それらはSSEを使わないで最適化を施す方法(1)、SSEスカラ命令を使って最適化を施す方法(2)、SSEベクトル命令を使って最適化を施す方法(3)の3つに分けられる。
まず(1)ではキャッシュブロッキングとループアンローリングを用いた最適化(コード1)により7.05倍の高速化が達成できた(6.916857secから0.980417sec)
次に(2)を行った。まずは、(1)の上にさらにコード2の最適化を施したが速度は低下してしまった(1.441473secから1.173655sec)、そこで速度の低下は、演算時に、必要な行列要素を_m128d型に変換する処理が原因であると考え、行列データを全てdouble型ではなく_m128d型で保持するように修正した(コード3)。しかしながら、速度はさらに低下した。
この原因としては,_m128d型のデータ長は128bitであり、double型より2倍もサイズが大きいことで、メモリアクセスのレイテンシが増加したため、もしくは、同様の理由でキャッシュとの親和性が下がったためであると考えられる。
最後に(3)を行った。
(3)においては、最適化オプションO0の場合のみSSEベクトル命令を用いたコード全てが正しく動作したため、(1)と(2)の場合と異なり、最適化オプションO0でコンパイルして比較を行った。
まず、もっとも単純な行列計算プログラムをSSEベクトル命令を用いてベクトル化した(コード4)。
結果、1.96倍の高速化が達成できた(12.056244secから6.136489 sec)。
続いて、コード4の最適化に加え、キャッシュブロッキングを行った(コード5)。
結果、速度向上率は4.25倍に伸びた(12.056244secから2.836396sec)。
ここで(1)で最速であったコード1を最適化オプションO0で実行したところ7.082314secであった。
これはつまり、最適化オプションO0で比較すればコード1よりコード5の方が速いということを意味する。
まとめると、実際に実行可能なものとしては、コード1を最適化オプションO3でコンパイルしたものが最速であるが、(2)でのSSEベクトル命令を用いた最適化は2倍弱の高速化を実現した事実から考えれば、コード1はSSEベクトル命令によるベクトル化を施すことで、さらなる高速化が達成できると考えられる。
もしくは、コード5を最適化オプションO3で正しく動作させることができれば、コード1を最適化オプションO3で動作させた場合より速い結果が得られる可能性がある(最適化の効き具合によってはそうとも限らないが)。
最速の結果(0.980417sec)について、論理ピーク値の何パーセントを達成したか計算してみると
(2.002/8.8) * (1/0.980417) = 0.2319282
♯ 論理ピーク性能が出たとしてかかる時間÷かかった時間
で、論理ピーク値の約23.1%を達成したことになる。
#ソースコード
コード1:キャッシュブロッキング + ループアンローリング
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
#define N 1000
#define BLOCK_SIZE 50
double correct_matrix[N][N];
double second(void){
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
}
void create_model_answer(double a[N][N],double b[N][N]){
int i,j,k;
double s;
/* creating model answer */
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
s = 0.0;
for (k = 0; k < N; k++) {
s += a[i][k] * b[k][j];
}
correct_matrix[i][j] += s;
}
}
}
double compare(double a[N][N],double b[N][N],double arr[N][N]){
int i,j;
double tmp_diff;
double diff=0.0;
create_model_answer(a,b);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
tmp_diff = correct_matrix[i][j] - arr[i][j];
diff += fabs(correct_matrix[i][j] - arr[i][j]);
}
}
return diff;
}
int main(void)
{
static double a[N][N], b[N][N], c[N][N];
int i, ii,j,jj,k,kk;
double start,elapsed_time;
double s0,s1,s2,s3,s4;
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
/* a[i][j] = rand(); b[i][j] = rand(); c[i][j] = rand(); */
a[i][j] = (unsigned int)(rand(999)) >> 10;
b[i][j] = (unsigned int)(rand(999)) >> 10;
c[i][j] = (unsigned int)(rand(999)) >> 10;
#ifdef DEBUG
/* for compare */
correct_matrix[i][j] = c[i][j];
#endif
}
}
/* intended calculation */
start = second();
for (i = 0; i < N; i+=BLOCK_SIZE) {
for (j = 0; j < N; j+=BLOCK_SIZE) {
for (k = 0; k < N; k+=BLOCK_SIZE) {
for(ii=i;ii < (i + BLOCK_SIZE);ii+=2){
for(jj=j;jj < (j + BLOCK_SIZE);jj++){
s0 = 0.0;
s1 = 0.0;
for (kk = k; kk < (k + BLOCK_SIZE); kk+=5) {
s0 += a[ii][kk] * b[kk][jj];
s0 += a[ii][kk+1] * b[kk+1][jj];
s0 += a[ii][kk+2] * b[kk+2][jj];
s0 += a[ii][kk+3] * b[kk+3][jj];
s0 += a[ii][kk+4] * b[kk+4][jj];
s1 += a[ii+1][kk] * b[kk][jj];
s1 += a[ii+1][kk+1] * b[kk+1][jj];
s1 += a[ii+1][kk+2] * b[kk+2][jj];
s1 += a[ii+1][kk+3] * b[kk+3][jj];
s1 += a[ii+1][kk+4] * b[kk+4][jj];
}
c[ii][jj] += s0;
c[ii+1][jj] += s1;
}
}
}
}
}
elapsed_time = second() - start;
printf("elapsed %lf sec\n",elapsed_time);
#ifdef DEBUG
printf("diff is %lf\n",compare(a,b,c));
#endif
return 0;
}
コード2:キャッシュブロッキング + SSEスカラ演算
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
#include <xmmintrin.h> /* SSE operations */
#define N 1000
#define BLOCK_SIZE 50
double correct_matrix[N][N];
double second(void){
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
}
void create_model_answer(double a[N][N],double b[N][N]){
int i,j,k;
double s;
/* creating model answer */
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
s = 0.0;
for (k = 0; k < N; k++) {
s += a[i][k] * b[k][j];
}
correct_matrix[i][j] += s;
}
}
}
double compare(double a[N][N],double b[N][N],double arr[N][N]){
int i,j;
double tmp_diff;
double diff=0.0;
create_model_answer(a,b);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
tmp_diff = correct_matrix[i][j] - arr[i][j];
diff += fabs(correct_matrix[i][j] - arr[i][j]);
}
}
return diff;
}
int main(void)
{
static double a[N][N], b[N][N], c[N][N];
int i, ii,j,jj,k,kk;
double start,elapsed_time;
double s0,s1,s2,s3,s4;
__m128d vec_a,vec_b,vec_s;
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
a[i][j] = (unsigned int)(rand(999)) >> 10;
b[i][j] = (unsigned int)(rand(999)) >> 10;
c[i][j] = (unsigned int)(rand(999)) >> 10;
#ifdef DEBUG
/* for compare */
correct_matrix[i][j] = c[i][j];
#endif
}
}
/* intended calculation */
start = second();
for (i = 0; i < N; i+=BLOCK_SIZE) {
for (j = 0; j < N; j+=BLOCK_SIZE) {
for (k = 0; k < N; k+=BLOCK_SIZE) {
for(ii=i;ii < (i + BLOCK_SIZE);ii++){
for(jj=j;jj < (j + BLOCK_SIZE);jj++){
vec_s = _mm_set_sd(0.0);
for (kk = k; kk < (k + BLOCK_SIZE); kk++) {
vec_a = _mm_set_sd(a[ii][kk]);
vec_b = _mm_set_sd(b[kk][jj]);
vec_s = _mm_add_sd(vec_s,_mm_mul_sd(vec_a,vec_b));
}
_mm_store_sd(&s0,vec_s);
c[ii][jj] += s0;
}
}
}
}
}
elapsed_time = second() - start;
printf("elapsed %lf sec\n",elapsed_time);
#ifdef DEBUG
printf("diff is %lf\n",compare(a,b,c));
#endif
return 0;
}
コード3:キャッシュブロッキング + SSEスカラ演算(配列データを_m128d型で保持)
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
#include <xmmintrin.h> /* SSE operations */
#define N 1000
#define BLOCK_SIZE 50
double correct_matrix[N][N];
double second(void){
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
}
void create_model_answer(double a[N][N],double b[N][N]){
int i,j,k;
double s;
/* creating model answer */
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
s = 0.0;
for (k = 0; k < N; k++) {
s += a[i][k] * b[k][j];
}
correct_matrix[i][j] += s;
}
}
}
double compare(double a[N][N],double b[N][N],double arr[N][N]){
int i,j;
double tmp_diff;
double diff=0.0;
create_model_answer(a,b);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
tmp_diff = correct_matrix[i][j] - arr[i][j];
diff += fabs(correct_matrix[i][j] - arr[i][j]);
}
}
return diff;
}
int main(void)
{
static __m128d a[N][N], b[N][N], c[N][N];
int i, ii,j,jj,k,kk;
double start,elapsed_time;
double s0,s1,s2,s3,s4;
__m128d vec_a,vec_b,vec_s;
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
a[i][j] = _mm_set_sd(rand());
b[i][j] = _mm_set_sd(rand());
c[i][j] = _mm_set_sd(rand());
#ifdef DEBUG
/* for compare */
correct_matrix[i][j] = c[i][j];
#endif
}
}
/* intended calculation */
start = second();
for (i = 0; i < N; i+=BLOCK_SIZE) {
for (j = 0; j < N; j+=BLOCK_SIZE) {
for (k = 0; k < N; k+=BLOCK_SIZE) {
for(ii=i;ii < (i + BLOCK_SIZE);ii++){
for(jj=j;jj < (j + BLOCK_SIZE);jj++){
vec_s = _mm_set_sd(0.0);
for (kk = k; kk < (k + BLOCK_SIZE); kk++) {
vec_s = _mm_add_sd(vec_s,_mm_mul_sd(a[ii][kk],a[kk][jj]));
}
c[ii][jj] = _mm_add_sd(c[ii][jj],vec_s);
}
}
}
}
}
elapsed_time = second() - start;
printf("elapsed %lf sec\n",elapsed_time);
#ifdef DEBUG
printf("diff is %lf\n",compare(a,b,c));
#endif
return 0;
}
コード4:SSEベクトル演算
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
#include <xmmintrin.h> /* SSE operations */
#define N 1000
#define HALF_N 500
#define BLOCK_SIZE 50
#define HALF_BLOCK_SIZE 25
double correct_matrix[N][N];
double second(void){
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
}
void create_model_answer(double a[N][N],double b[N][N]){
int i,j,k;
double s;
/* creating model answer */
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
s = 0.0;
for (k = 0; k < N; k++) {
s += a[i][k] * b[k][j];
}
correct_matrix[i][j] += s;
}
}
}
double compare(double a[N][N],double b[N][N],double arr[N][N]){
int i,j;
double tmp_diff;
double diff=0.0;
create_model_answer(a,b);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
tmp_diff = correct_matrix[i][j] - arr[i][j];
diff += fabs(correct_matrix[i][j] - arr[i][j]);
}
}
return diff;
}
int main(void)
{
static double a_[N][N], b_[N][N]; /* for compare */
static __m128d a[N][N/2], b[N/2][N];
double c[N][N];
int i, ii,j,jj,k,kk;
double start,elapsed_time;
double s0,s1,s2,s3,s4;
double vec_s_arr[2];
__m128d vec_a,vec_b,vec_s;
for (i = 0; i < N; i++) {
for (j = 0; j < N/2; j++) {
a_[i][j*2] = (unsigned int)(rand()) >> 10;
a_[i][j*2+1] = (unsigned int)(rand()) >> 10;
a[i][j] = _mm_set_pd(a_[i][j*2],a_[i][j*2+1]);
b_[j*2][i] = (unsigned int)(rand()) >> 10;
b_[j*2+1][i] = (unsigned int)(rand()) >> 10;
b[j][i] = _mm_set_pd(b_[j*2][i],b_[j*2+1][i]);
}
for (j = 0; j < N; j++) {
c[i][j] = (unsigned int)(rand()) >> 10;
#ifdef DEBUG
/* for compare */
correct_matrix[i][j] = c[i][j];
#endif
}
}
start = second();
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
vec_s = _mm_set_pd(0.0,0.0);
for (k = 0; k < N/2; k++) {
vec_s = _mm_add_pd(vec_s,_mm_mul_pd(a[i][k],b[k][j]));
}
_mm_store_pd(vec_s_arr,vec_s);
c[i][j] += vec_s_arr[0]; /* lower element of packed values */
c[i][j] += vec_s_arr[1]; /* upper element of packed values */
}
}
elapsed_time = second() - start;
printf("elapsed %lf sec\n",elapsed_time);
#ifdef DEBUG
printf("diff is %lf\n",compare(a_,b_,c));
#endif
return 0;
}
コード5:SSEベクトル演算 + キャッシュブロッキング
#include <stdio.h>
#include <sys/time.h>
#include <math.h>
#include <xmmintrin.h> /* SSE operations */
#define N 1000
#define HALF_N 500
#define BLOCK_SIZE 50
#define HALF_BLOCK_SIZE 25
double correct_matrix[N][N];
double second(void){
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
}
void create_model_answer(double a[N][N],double b[N][N]){
int i,j,k;
double s;
/* creating model answer */
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
s = 0.0;
for (k = 0; k < N; k++) {
s += a[i][k] * b[k][j];
}
correct_matrix[i][j] += s;
}
}
}
double compare(double a[N][N],double b[N][N],double arr[N][N]){
int i,j;
double tmp_diff;
double diff=0.0;
create_model_answer(a,b);
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
tmp_diff = correct_matrix[i][j] - arr[i][j];
diff += fabs(correct_matrix[i][j] - arr[i][j]);
}
}
return diff;
}
int main(void)
{
static double a_[N][N], b_[N][N]; /* for compare */
static __m128d a[N][N/2], b[N/2][N];
double c[N][N];
int i, ii,j,jj,k,kk;
double start,elapsed_time;
double s0,s1,s2,s3,s4;
double vec_s_arr[2];
__m128d vec_a,vec_b,vec_s;
for (i = 0; i < N; i++) {
for (j = 0; j < N/2; j++) {
a_[i][j*2] = (unsigned int)(rand()) >> 10;
a_[i][j*2+1] = (unsigned int)(rand()) >> 10;
a[i][j] = _mm_set_pd(a_[i][j*2],a_[i][j*2+1]);
b_[j*2][i] = (unsigned int)(rand()) >> 10;
b_[j*2+1][i] = (unsigned int)(rand()) >> 10;
b[j][i] = _mm_set_pd(b_[j*2][i],b_[j*2+1][i]);
}
for (j = 0; j < N; j++) {
c[i][j] = (unsigned int)(rand()) >> 10;
#ifdef DEBUG
/* for compare */
correct_matrix[i][j] = c[i][j];
#endif
}
}
/* intended calculation */
start = second();
for (i = 0; i < N; i+=BLOCK_SIZE) {
for (j = 0; j < N; j+=BLOCK_SIZE) {
for (k = 0; k < HALF_N; k+=(HALF_BLOCK_SIZE)) {
for(ii=i;ii < (i + BLOCK_SIZE);ii++){
for(jj=j;jj < (j + BLOCK_SIZE);jj++){
vec_s = _mm_set_pd(0.0,0.0);
for (kk = k; kk < (k + HALF_BLOCK_SIZE); kk++) {
vec_s = _mm_add_pd(vec_s,_mm_mul_pd(a[ii][kk],b[kk][jj]));
}
_mm_store_pd(vec_s_arr,vec_s);
c[ii][jj] += vec_s_arr[0]; /* lower element of packed values*/
c[ii][jj] += vec_s_arr[1]; /* upper element of packed values*/
}
}
}
}
}
elapsed_time = second() - start;
printf("elapsed %lf sec\n",elapsed_time);
#ifdef DEBUG
printf("diff is %lf\n",compare(a_,b_,c));
#endif
return 0;
}