はじめに
筆者の下記の記事で Microsoft の uCRT の qsort
のソースを読んでみた。
しかしながら,一般的には Free Software Foundation の GNU C library(glibc)のほうがメジャーな存在かもしれない。
glibc の qsort をオンライン環境で実行してみる
わざわざ GCC をインストールしなくてもオンライン環境で検証できるのは助かる1。今回は下記のサイトを使ってみた。
テストプログラムを下記に示す。以前の記事で使ったものと同じである。
※日本語のコメントは削除した。
#include <stdio.h>
#include <stdlib.h>
#define countof(a) (sizeof(a)/sizeof(a[0]))
typedef int (*SORTFUNC)(const void *, const void *);
static int table[] = {9, 15, 16, 14, 11, 12, 7, 13, 10, 0, 8, 5, 6, 1, 4, 3, 2};
static int count = 0;
static int sortfunc(int *p, int *q) {
count++;
fprintf(stdout, "%d\t%d\t%d\n", *p, *q, *p - *q);
return *p - *q;
}
int main() {
qsort(table, countof(table), sizeof(table[0]), (SORTFUNC)sortfunc);
fprintf(stdout, "%d\n", count);
return 0;
}
実行結果を以下に示す。驚くべきことに比較関数の呼び出し回数が46回と uCRT の78回よりも少ない。最初の三回の比較要素を比べても,uCRT では先頭と末尾,中央の三点でピボットとなるべき基準値をサンプリングしていたが,glibc では明らかに様相が異なる。
比較関数の呼び出し回数を比べると,.NET Framework の106回の半分以下である。
ライブラリ | 回数 |
---|---|
Array.Sort (.NET Framework) | 106 |
qsort (Microsoft uCRT) | 78 |
qsort (GNU C Library) | 46 |
結論から言うと glibc の qsort
はマージソートのようだ2。
glibc の qsort の意訳版を作る
glibc の qsort.c のソースコード3を読むとはっきりする。とはいえ,全文を掲載しても理解を妨げるので,アルゴリズムを自分なりに咀嚼し,整数配列のソートとして新たに作り直したものを掲載する。大して長くは無いのでまずは全文を掲載しよう。
/* 001 */ #include <stdlib.h>
/* 002 */ typedef int (*COMPARE)(int, int);
/* 003 */ #define TEMP_SIZE 256
/* 004 */ static void sort(int *buf, int *a, int len, COMPARE compare) {
/* 005 */ if(len < 2) return;
/* 006 */ int n1 = len / 2;
/* 007 */ int n2 = len - n1;
/* 008 */ int *p1 = a;
/* 009 */ int *p2 = &a[n1];
/* 010 */ sort(buf, p1, n1, compare);
/* 011 */ sort(buf, p2, n2, compare);
/* 012 */ int *q = buf;
/* 013 */ for(;;) {
/* 014 */ if(*p1 <= *p2) {
/* 015 */ *q++ = *p1++;
/* 016 */ if(--n1 == 0) break;
/* 017 */ } else {
/* 018 */ *q++ = *p2++;
/* 019 */ if(--n2 == 0) break;
/* 020 */ }
/* 021 */ }
/* 022 */ while(n1 > 0) {
/* 023 */ *q++ = *p1++;
/* 024 */ n1--;
/* 025 */ }
/* 026 */ for(int i = 0; i < len - n2; i++)
/* 027 */ a[i] = buf[i];
/* 028 */ }
/* 029 */ void qsort_glibc(int *a, int len, COMPARE compare) {
/* 030 */ if(len < 2) return;
/* 031 */ if(len <= TEMP_SIZE) {
/* 032 */ int tmp[TEMP_SIZE];
/* 033 */ sort(tmp, a, len, compare);
/* 034 */ } else {
/* 035 */ int *buf = (int*)malloc(sizeof(int) * len);
/* 036 */ if(buf == (int*)NULL) return;
/* 037 */ sort(buf, a, len, compare);
/* 038 */ free(buf);
/* 039 */ }
/* 040 */ }
アルゴリズム解説
それでは意訳コードを頭から解説していきたい。
1. ソート関数本体の冒頭部分
ソート関数本体 qsort_glibc()
の冒頭部分である。引数は int
型の配列 a[]
と配列の要素数 len
,比較関数 compare
の三つのみである。比較関数の型は標準ライブラリのものと少し変えており,二つの int
型の整数を直接比較する。
マージソートはソート対象の配列の他に作業用の記憶領域を必要とする外部ソートである。配列の要素数 len
が所定の大きさ TEMP_SIZE
以下であれば作業用領域をスタック上に確保するが,TEMP_SIZE
より大きければヒープ上に確保する。
なお,本自作コードではメモリの確保に失敗すると何もしないでリターンするが,本物はヒープソート(ヒープ領域のヒープとは無関係で,二分ヒープ木を用いて並べ替えを行うソート)を用いてリトライするよう作られている。本実装では簡単のためヒープソートは省略した。
/* 001 */ #include <stdlib.h>
/* 002 */ typedef int (*COMPARE)(int, int);
/* 003 */ #define TEMP_SIZE 256
/* 029 */ void qsort_glibc(int *a, int len, COMPARE compare) {
/* 030 */ if(len < 2) return;
/* 031 */ if(len <= TEMP_SIZE) {
/* 032 */ int tmp[TEMP_SIZE];
/* 033 */ sort(tmp, a, len, compare);
/* 034 */ } else {
/* 035 */ int *buf = (int*)malloc(sizeof(int) * len);
/* 036 */ if(buf == (int*)NULL) return;
/* 037 */ sort(buf, a, len, compare);
/* 038 */ free(buf);
/* 039 */ }
/* 040 */ }
2. マージソートエンジン・冒頭部
次はマージソートエンジンの冒頭部である。引数は作業用領域 buf[]
とソート対象の配列 a[]
と配列の要素数 len
,比較関数 compare
の四つである。作業用領域は十分確保されているのでオーバーフローを気にしなくてよい。
配列の要素数 len
が 2 個未満のときは何もしないでリターンする。2 個以上の場合は,配列を二つに分けてそれぞれにマージソートを行う。すなわちマージソートエンジンは再帰関数として作られている。
/* 004 */ static void sort(int *buf, int *a, int len, COMPARE compare) {
/* 005 */ if(len < 2) return;
/* 006 */ int n1 = len / 2;
/* 007 */ int n2 = len - n1;
/* 008 */ int *p1 = a;
/* 009 */ int *p2 = &a[n1];
/* 010 */ sort(buf, p1, n1, compare);
/* 011 */ sort(buf, p2, n2, compare);
3. マージソートエンジン・マージ部
二つの配列を作業用領域 buf[]
上でマージした後,元の配列 a[]
に書き戻す部分である。ここの22~27行でちょっとした工夫が行われている。
/* 012 */ int *q = buf;
/* 013 */ for(;;) {
/* 014 */ if(*p1 <= *p2) {
/* 015 */ *q++ = *p1++;
/* 016 */ if(--n1 == 0) break;
/* 017 */ } else {
/* 018 */ *q++ = *p2++;
/* 019 */ if(--n2 == 0) break;
/* 020 */ }
/* 021 */ }
/* 022 */ while(n1 > 0) {
/* 023 */ *q++ = *p1++;
/* 024 */ n1--;
/* 025 */ }
/* 026 */ for(int i = 0; i < len - n2; i++)
/* 027 */ a[i] = buf[i];
/* 028 */ }
上記22~27行の部分を教科書通りに作れば下記のようになるはずだ。つまり,上記の22~27行は転送領域が不足しているように見えるのだ。
while(n1 > 0) {
*q++ = *p1++; n1--;
}
while(n2 > 0) {
*q++ = *p2++; n2--;
}
for(int i = 0; i < len; i++)
a[i] = buf[i];
これは簡単な図を描いてみると分かる。ソート対象の配列 a[]
を前半部と後半部に分け,マージしながら作業用領域 buf[]
に転送していく。21行目終了時点で転送済みの領域を■で示す。
21行目終了時点で n1 > 0
のときの場合を図1に示す。このとき残った領域もいったん作業用領域 buf[]
の残った領域に転送してから配列 a[]
に書き戻す必要がある。
一方,21行目終了時点で n2 > 0
のときの場合を図2に示す。このとき残った領域はそのまま残してもよいことが分かる。
ということで,転送量を少しでも減らせるよう細かい工夫が行われているようだ。
また,配列 a[]
を前半部と後半部に分割する際,配列の要素数が奇数だと等分割できない。非均等に分割するのであれば,後半部の要素数を多くした方が得だと分かるだろう。これは後半部の要素数のほうが多いと,後半部の要素が残ってしまう確率が高くなるが,その場合,残った要素を転送しなくても済むからだ。実際,配列の要素数 len
が奇数の場合,前半部の要素数 n1
よりも後半部の要素数 n2
がのほうが一つだけ多くなるよう分割される。
/* 006 */ int n1 = len / 2;
/* 007 */ int n2 = len - n1;
パフォーマンス比較
さて,前回の記事で紹介した uCRT を参考にして作成した qsort_ucrt()
と本記事で紹介した glibc を参考にして作成した qsort_glibc()
ではどっちが速いだろうか?
試験条件を以下に示す。
- 200,000,000 個の 32bit 整数を昇順ソートする
- データの順序は以下の4種類
- ゼロ
- 昇順:0 → 199,999,999
- 降順:199,999,999 → 0
- 乱数:0~199,999,999 の範囲で一様乱数(重複あり)
- 測定環境:Windows10 Pro 22H2,Core i7-13700,5.1GHz,32GB
- 開発環境:Visual Studio 2022 Community Edition,32bit版コンパイラ
- 比較関数は以下に示すように整数の昇順ソートである。
static int compare(int p, int q) { return p - q; }
ソート方式の比較結果を以下に示す。あれれ?期待に反してマージソートの qsort_glibc()
のほうが遅いではないか!!
順序 | 項目 | qsort_ucrt() |
qsort_glibc() |
---|---|---|---|
ゼロ | 時間 | 0.090[秒] | 2.660[秒] |
比較回数 | 399,999,999[回] | 2,728,894,208[回] | |
転送回数 | 0[回] | 5,457,788,416[回] | |
昇順 | 時間 | 1.433[秒] | 2.656[秒] |
比較回数 | 5,363,792,412[回] | 2,728,894,208[回] | |
転送回数 | 265,782,274[回] | 5,457,788,416[回] | |
降順 | 時間 | 1.475[秒] | 3.914[秒] |
比較回数 | 5,363,792,412[回] | 2,802,670,336[回] | |
転送回数 | 465,782,278[回] | 11,063,129,088[回] | |
乱数 | 時間 | 15.505[秒] | 17.996[秒] |
比較回数 | 6,400,427,154[回] | 5,265,836,886[回] | |
転送回数 | 2,736,238,624[回] | 10,752,119,342[回] |
■qsort_ucrt,■qsort_glibc
ということで比較回数・転送回数も比べてみた。なお qsort_ucrt()
では1回のスワップあたり,データ転送2回分とカウントしている。比較回数はマージソートである qsort_glibc()
のほうが安定して少ないのは流石であるが,転送回数は圧倒的に多い。これが基本的にマージソートの遅い理由ではある。しかし,同じ qsort_glibc()
の降順と乱数で転送回数は大差ないにも関わらず,計算時間は乱数のほうが4倍近く要していることに注目されたい。降順のほうが圧倒的に速いのは,おそらくメモリアクセスがシーケンシャルアクセスに近いせいでキャッシュメモリに乗り易いからだろうと考える。一方,乱数のほうはメモリアクセスがランダムアクセスに近いせいでキャッシュミスヒットが連続しているのだろう。
考察
qsort_ucrt()
に比べて qsort_glibc()
が振るわなかった理由を考察する。
-
qsort_glibc()
はマージソートであり,そもそもデータ転送量が多いため,キャッシュメモリに収まり切らない大規模な問題では性能が出ないのかもしれない。 -
今回のプログラムはいずれも比較関数を簡素な整数比較にしたことに加え,ソート関数と比較関数を一緒にコンパイルしたことにより,最適化によって比較関数がソート関数内にインライン展開され,比較関数を呼び出すオーバーヘッドが軽減された。このせいで比較関数の呼び出し回数の多いクイックソートである
qsort_ucrt()
に有利な条件になっている。 -
qsort_glibc()
は普通に再帰関数として作成しているが,qsort_ucrt()
は再帰関数ではなくループ処理で済ませており,再帰関数の呼び出しオーバーヘッドを減らすなど細かい工夫が施されている。 -
qsort_glibc()
は配列の要素数が 2 個になってもマージソートを行っているが,qsort_ucrt()
は分割された配列の要素数が 8 個以下になると選択ソートに切り替えるなど細かい工夫が施されている。
オリジナル作者の意見
なぜ,glibc の qsort()
ではマージソートを採用したのか?という疑問に対して,オリジナル作者の mikehaertel 氏が Reddit で次のように述べている4。
I am the (original) author of the merge sort code. Someone else wrote the quick sort code. The reason we made qsort() default to using merge sort was, indeed, that it always makes fewer compares than quick sort. We thought that in the common case, people would sort small objects (numbers, pointers) rather than large objects, and that therefore the performance would be dominated by comparisons rather than data movement.
私はマージソートのコードの(オリジナルの)作者です。クイックソートのコードは他の人が書きました。qsort() のデフォルトをマージソートにしたのはクイックソートよりも比較回数を少なくするためです。一般的なケースでは人々は大きなオブジェクトではなく,数値やポインタのような小さなオブジェクトをソートするでしょうから,データ転送よりも比較のほうが性能上重要だと考えたのです。
In 1988, the dominant cost was usually function call overhead for calling the compare function. Nowdays, due to out-of-order processors with multiported caches and call/return branch predictors, the function call overhead is less noticeable, and has probably been replaced by branch misprediction costs. Assuming random data, you'd expect the compare to go either way about half the time. Many compare functions have branches, and of course so does the sort code itself. So, for different reasons than in 1988, performance is still usually dominated by the number of compares.
1988年当時,比較関数を呼び出すための関数コールのオーバーヘッドが主要なコストでした。現在ではマルチポートキャッシュとコール/リターン分岐予測を備えたアウトオブオーダーのプロセッサにより関数コールのオーバーヘッドは目立たなくなっており,おそらくは分岐予測ミスのコストが代わりに支配的になっています。ランダムなデータを仮定すると,比較は約半分の確率でどちらに転ぶか分かりません。多くの比較関数には分岐があり,もちろんソートのコード自体にも分岐があります。そのため1988年当時とは異なった理由で今でも比較回数が性能上重要なのです。
1988年といえば,当時先端の EWS ですら CPU クロックはせいぜい 25~33MHz 程度。メインメモリアクセスに対するウェイトは小さく,データ転送の負荷は少なかったものと推察される。なので1988年当時はマージソートを採用するのが正しかったかもしれない。
とはいえ,Reddit で上記の回答が行われたのは今から18年前なので,上記の回答における「現在」や「今」とは2007年のことである。2007年当時の CPU は既にクロック3GHzに達しており,1988年に比べてデータ転送の負荷は飛躍的に増えているはずなのだ。おそらく2007年の時点では,もうマージソートの優位性は失われていたと思う。
付録~ソースコードおよびメイクファイル
ベンチ用メイン
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <time.h>
#define MACROOF(arg) #arg
#define STRINGOF(macro) MACROOF(macro)
typedef int (*COMPARE)(int, int);
extern void SORTFUNC(int *a, int len, COMPARE compare);
uint32_t xorshift( void ) {
static uint64_t x = 1;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
return (uint32_t)x;
}
#ifdef COUNT
uint64_t move_count = 0;
uint64_t comp_count = 0;
#endif
static int compare(int p, int q) {
#ifdef COUNT
comp_count++;
#endif
return p - q;
}
int main(int argc, char *argv[]) {
if(argc < 3) {
fprintf(stderr, "Usage: bench(.exe) [data] [size] (debug)\n");
fprintf(stderr, " data: zero, ascend, descend, random\n");
fprintf(stderr, " func: " STRINGOF(SORTFUNC) "\n");
return -1;
}
int size = atoi(argv[2]);
int *table = malloc(sizeof(int) * size);
if(table == (int*)NULL) {
fprintf(stderr, "cannot allocate size: %d\n", size);
return -1;
}
switch(argv[1][0]) {
case 'z': for(int i = 0; i < size; i++) table[i] = 0; break;
case 'a': for(int i = 0; i < size; i++) table[i] = i; break;
case 'd': for(int i = 0; i < size; i++) table[i] = size - i - 1; break;
case 'r': for(int i = 0; i < size; i++) table[i] = xorshift() % size; break;
default:
fprintf(stderr, "illegal data: %s\n", argv[1]);
return -1;
}
#ifdef COUNT
SORTFUNC(table, size, compare);
fprintf(stdout, "comp_count: %lld\n", comp_count);
fprintf(stdout, "move_count: %lld\n", move_count);
#else
clock_t t1 = clock();
SORTFUNC(table, size, compare);
clock_t t2 = clock();
fprintf(stdout, "time: %.3f\n", (double)(t2 - t1) / CLOCKS_PER_SEC);
#endif
if(argc >= 4 && argv[3][0] == 'd')
for(int i = 0; i < size; i++) fprintf(stdout, "%d\n", table[i]);
return 0;
}
uCRT 版 qsort の意訳版
#ifdef COUNT
#include <stdint.h>
extern uint64_t move_count;
#endif
typedef int (*COMPARE)(int, int);
#define STACK_SIZE 30
static void swap(int *p, int *q) {
#ifdef COUNT
move_count += 2;
#endif
int temp = *p;
*p = *q;
*q = temp;
}
void qsort_ucrt(int *a, int len, COMPARE compare) {
if(len < 2) return;
int top_stack[STACK_SIZE];
int end_stack[STACK_SIZE];
int stack = 0;
int top = 0;
int end = len - 1;
LOOP:
int n = end - top + 1;
if(n <= 8) {
for(int e = end; e > top; e--) {
int max = top;
for(int i = top + 1; i <= e; i++)
if(compare(a[i], a[max]) > 0) max = i;
swap(&a[max], &a[e]);
}
} else {
int mid = top + n / 2;
int t = top;
int e = end;
if(compare(a[top], a[mid]) > 0) swap(&a[top], &a[mid]);
if(compare(a[top], a[end]) > 0) swap(&a[top], &a[end]);
if(compare(a[mid], a[end]) > 0) swap(&a[mid], &a[end]);
for(;;) {
if(t < mid) while(++t < mid && compare(a[t], a[mid]) <= 0);
if(t >= mid) while(++t <= end && compare(a[t], a[mid]) <= 0);
while(--e > mid && compare(a[e], a[mid]) > 0);
if(t > e) break;
swap(&a[t], &a[e]);
if(e == mid) mid = t;
}
e++;
if(e > mid) while(--e > mid && compare(a[e], a[mid]) == 0);
if(e <= mid) while(--e > top && compare(a[e], a[mid]) == 0);
if(e - top >= end - t) {
if(top < e) {
top_stack[stack] = top;
end_stack[stack] = e;
stack++;
}
if(t < end) {
top = t;
goto LOOP;
}
} else {
if(t < end) {
top_stack[stack] = t;
end_stack[stack] = end;
stack++;
}
if(top < e) {
end = e;
goto LOOP;
}
}
}
if(--stack >= 0) {
top = top_stack[stack];
end = end_stack[stack];
goto LOOP;
}
}
glibc 版 qsort の意訳版
#ifdef COUNT
#include <stdint.h>
extern uint64_t move_count;
#endif
#include <stdlib.h>
typedef int (*COMPARE)(int, int);
#define TEMP_SIZE 256
static void sort(int *buf, int *a, int len, COMPARE compare) {
if(len < 2) return;
int n1 = len / 2;
int n2 = len - n1;
int *p1 = a;
int *p2 = &a[n1];
sort(buf, p1, n1, compare);
sort(buf, p2, n2, compare);
int *q = buf;
for(;;) {
#ifdef COUNT
move_count++;
#endif
if(compare(*p1, *p2) <= 0) {
*q++ = *p1++;
if(--n1 == 0) break;
} else {
*q++ = *p2++;
if(--n2 == 0) break;
}
}
#ifdef COUNT
move_count += n1 + len - n2;
#endif
while(n1 > 0) {
*q++ = *p1++;
n1--;
}
for(int i = 0; i < len - n2; i++)
a[i] = buf[i];
}
void qsort_glibc(int *a, int len, COMPARE compare) {
if(len < 2) return;
if(len <= TEMP_SIZE) {
int tmp[TEMP_SIZE];
sort(tmp, a, len, compare);
} else {
int *buf = (int*)malloc(sizeof(int) * len);
if(buf == (int*)NULL) return;
sort(buf, a, len, compare);
free(buf);
}
}
メイクファイル
CC=cl
CFLAGS=/c /O2 /W4 /arch:AVX2 /GL /GS-
LFLAGS=/GL
LIBS=
OUTDIRS=count count\ucrt count\glibc \
clock clock\ucrt clock\glibc
target: $(OUTDIRS) \
count\ucrt\bench.exe count\glibc\bench.exe \
clock\ucrt\bench.exe clock\glibc\bench.exe
$(OUTDIRS):
@if not exist $@ mkdir $@
count\ucrt\bench.exe: count\ucrt\bench.obj count\ucrt\qsort_ucrt.obj
count\glibc\bench.exe: count\glibc\bench.obj count\glibc\qsort_glibc.obj
clock\ucrt\bench.exe: clock\ucrt\bench.obj clock\ucrt\qsort_ucrt.obj
clock\glibc\bench.exe: clock\glibc\bench.obj clock\glibc\qsort_glibc.obj
count\ucrt\bench.exe \
count\glibc\bench.exe \
clock\ucrt\bench.exe \
clock\glibc\bench.exe:
$(CC) $** /Fe$* $(LFLAGS) $(LIBS)
.c{count\ucrt}.obj::
$(CC) $(CFLAGS) $< /Focount\ucrt\ /DSORTFUNC=qsort_ucrt /DCOUNT
.c{count\glibc}.obj::
$(CC) $(CFLAGS) $< /Focount\glibc\ /DSORTFUNC=qsort_glibc /DCOUNT
.c{clock\ucrt}.obj::
$(CC) $(CFLAGS) $< /Foclock\ucrt\ /DSORTFUNC=qsort_ucrt
.c{clock\glibc}.obj::
$(CC) $(CFLAGS) $< /Foclock\glibc\ /DSORTFUNC=qsort_glibc
clean:
if exist count rd /s /q count
if exist clock rd /s /q clock
上記4つのファイルを同じフォルダに保存して nmake
を実行すると,下記のプログラムが作られる。
uCRT 版 | glibc 版 | |
---|---|---|
比較・転送回数計測用 | clock\ucrt\bench.exe |
clock\glibc\bench.exe |
時間計測用 | count\ucrt\bench.exe |
count\glibc\bench.exe |
次回
次回に続く。
- 最強コンパイラを使えばマージソートは遅くない? - Qiita
- マージソートを非再帰型に書き直す~誰でも思いつくアイディアだが~ - Qiita
- マージソートを非再帰型に書き直す2~さらなる改善を試みる~ - Qiita
- マージソートを非再帰型に書き直す3~もう限界かもしれない~ - Qiita