はじめに
オンメモリで, それなりに実用になるインプレイスの安定ソートが欲しかったので書きました.
C#のようなまともな安定ソートがない環境に移植しやすく, 実用的な速度で動くことが理想です. 最速でなくていい.
型の特性を考慮していないですが, 複雑でないものばかりなので, 必要になったときに作ります.
Intro sort
Insertion sort
内側のループでswap
を使う実装を見かけるのですが, どこが出所なのでしょうか. オブジェクトのコピーより整数のコピーが非常に遅い環境でない限り, selection sortより速いです, 整数のコピーが減るからです.
Insertion sort
template<class T, class U>
void insertionsort(u32 n, T* v, U func)
{
for(u32 i = 1; i < n; ++i) {
T x = move(v[i]);
s64 j;
for(j = i-1; 0 <= j && func(x, v[j]); --j) {
v[j + 1] = move(v[j]);
}
v[j + 1] = move(x);
}
}
Heap sort
木に対する選択ソートなので, swap
を使うと遅くなります. これも, swap
を使う実装を見かけるので, どこかいけてない出所があるのでしょう. どのような環境でも高速に動作するので, 安定なソートより遅い場合, 実装がどこか間違っている可能性が高いです.
Heap sort
template<class T, class U>
void heapsort(u32 n, T* v, U func)
{
--v; //インデックスを1からにする
u32 i, j;
T x;
//n/2からnのノードは子を持たないのでスキップ
for(u32 k = n >> 1; 1<=k; --k) {
i = k;
x = move(v[k]);
//ノードkより下は, 子<=親が成立
//ノードkの値を, サブツリーを子<=親を満たすように挿入
//木の降下による選択ソート
while((j = i << 1) <= n) {
if(j < n && func(v[j], v[j + 1])) {
++j;
}
if(!func(x, v[j])) {
break;
}
v[i] = move(v[j]);
i = j;
}
v[i] = move(x);
}
while(n > 1) {
//v[1]が最大なので, v[n]を交換
//子<=親が崩れるため, 木の降下による選択ソート
x = move(v[n]);
v[n] = move(v[1]);
--n;
i = 1;
while((j = i << 1) <= n) {
if(j < n && func(v[j], v[j + 1])) {
++j;
}
if(!func(x, v[j])) {
break;
}
v[i] = move(v[j]);
i = j;
}
v[i] = move(x);
}
}
Intro sort
C/C++の場合, 再帰でも大して遅くないですが.
Intro sort
template<class T, class U>
void introsort(u32 n, T* v, U func)
{
u32 depth = 0;
u32 t = n;
while(1 < t) {
++depth;
t >>= 1;
}
depth = SortMaxDepth < depth ? SortMaxDepth : depth;
struct Range
{
u32 offset_;
u32 num_;
};
Range ranges[SortMaxDepth * 2];
u8 levels[SortMaxDepth * 2];
u32 count = 1;
ranges[0] = {0, n};
levels[0] = 0;
while(0 < count) {
--count;
const Range& range = ranges[count];
T* current = v + range.offset_;
if(range.num_ < SortSwitchN) {
insertionsort(range.num_, current, func);
continue;
}
if(depth <= levels[count]) {
heapsort(range.num_, current, func);
continue;
}
u32 i0 = 0;
u32 i1 = range.num_ - 1;
T pivot = current[i0 + (i1 >> 1)];
for(;;) {
while(func(current[i0], pivot)) {
++i0;
}
while(func(pivot, current[i1])) {
assert(0 < i1);
--i1;
}
if(i1 <= i0) {
break;
}
swap(current[i0], current[i1]);
++i0;
--i1;
}
u32 offset = range.offset_;
u8 level = levels[count] + 1;
++i1;
assert(i1 <= range.num_);
u32 num = range.num_ - i1;
if(1 < i0) {
assert(count <= (SortMaxDepth * 2));
ranges[count] = {offset, i0};
levels[count] = level;
++count;
}
if(1 < num) {
assert(count <= (SortMaxDepth * 2));
ranges[count] = {offset+i1, num};
levels[count] = level;
++count;
}
}
}
Merge sort
このくらいの分量なら移植もしやすいと思います.
mergesort_lower/mergesort_upper
がほとんど同じなので, ひとつにしたいところです.
Merge sort
template<class T, class U>
u32 mergesort_lower(const T* v, u32 start, u32 end, u32 pivot, U func)
{
assert(start<end);
u32 l = end - start;
while(0 < l) {
u32 half = l >> 1;
u32 mid = start + half;
if(func(v[mid], v[pivot])) {
start = mid + 1;
l = l - half - 1;
} else {
l = half;
}
}
return start;
}
template<class T, class U>
u32 mergesort_upper(const T* v, u32 start, u32 end, u32 pivot, U func)
{
assert(start<end);
u32 l = end - start;
while(0 < l) {
u32 half = l >> 1;
u32 mid = start + half;
if(func(v[pivot], v[mid])) {
l = half;
} else {
start = mid + 1;
l = l - half - 1;
}
}
return start;
}
template<class T>
void mergesort_rotate(T* v, u32 start, u32 mid, u32 end)
{
assert(start<end);
while(start < mid && mid < end) {
u32 m = mid;
while(start < mid && m < end) {
swap(v[start], v[m]);
++m;
++start;
}
if(mid <= start) {
mid = m;
}
}
}
template<class T, class U>
void merge(T* v, u32 start, u32 pivot, u32 end, u32 nl0, u32 nr0, u32 level, U func)
{
if(nl0 <= 0 || nr0 <= 0) {
return;
}
if((nl0 + nr0) < SortSwitchN || SortMaxDepth <= level) {
insertionsort(nl0 + nr0, v + start, func);
return;
}
u32 nl1, nr1;
u32 cut0, cut1;
if(nl0 <= nr0) {
nr1 = nr0 >> 1;
cut1 = pivot + nr1;
cut0 = mergesort_upper(v, start, pivot, cut1, func);
nl1 = cut0 - start;
} else {
nl1 = nl0 >> 1;
cut0 = start + nl1;
cut1 = mergesort_lower(v, pivot, end, cut0, func);
nr1 = cut1 - pivot;
}
mergesort_rotate(v, cut0, pivot, cut1);
++level;
u32 mid = cut0 + nr1;
merge(v, start, cut0, mid, nl1, nr1, level, func);
merge(v, mid, cut1, end, nl0 - nl1, nr0 - nr1, level, func);
}
template<class T, class U>
void mergesort(u32 n, T* v, u32 left, U func)
{
if(n < SortSwitchN) {
insertionsort(n, v + left, func);
return;
}
u32 nl = n >> 1;
mergesort(nl, v, left, func);
mergesort(n - nl, v, left + nl, func);
merge(v, left, left + nl, left + n, nl, n - nl, 1, func);
}
Bitonic sort
内側のループが並列実行できます.
Bitonic sort
template<class T, class U>
void bitonicsort_parallel(u32 n, T* v, U func)
{
assert(0 <= n);
// clang-format off
static const u32 shifttable[] =
{
0, 0, 1, 1, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3,
};
// clang-format on
u32 ret = 0;
u32 x = n;
if(x & 0xFFFF0000U) {
ret += 16;
x >>= 16;
}
if(x & 0xFF00U) {
ret += 8;
x >>= 8;
}
if(x & 0xF0U) {
ret += 4;
x >>= 4;
}
u32 log2n = ret + shifttable[x];
u32 sequences = (log2n*(log2n+1))>>1;
for(u32 seq=0; seq<sequences; ++seq){
u32 step = seq;
u32 size = 0;
for(;size<step; ++size){
step -= size+1;
}
u32 offsetShift = size-step;
u32 offset = 1<<offsetShift;
//u32 stage = offset<<1;
//u32 stepno = 1<<(size+1);
u32 stepShift = size+1;
u32 stageMask = (offset<<1) - 1;
for(u32 i=0; i<n; ++i){
u32 csign = (i & stageMask) < offset? 1 : 0;
u32 idx = i + (csign<<offsetShift);
if(idx <= i) {
continue;
}
u32 cdir = ((i >> stepShift) & 1U) == 0 ? 1 : 0;
if(csign == cdir){
if(func(v[idx], v[i])) {
swap(v[i], v[idx]);
}
} else {
if(func(v[i], v[idx])) {
swap(v[i], v[idx]);
}
}
}//for(u32 i)
}//for(u32 seq)
}
テスト
テストデータは, 1,048,576個の整数と文字列です.
Test
struct Data
{
u32 no_;
s32 value_;
};
Data* generate(u32 size)
{
std::random_device seed_gen;
std::mt19937 engine(seed_gen());
std::uniform_int_distribution<> dist(0, size/2);
Data* data = reinterpret_cast<Data*>(malloc(sizeof(Data) * size));
for(u32 i = 0; i < size; ++i) {
data[i].no_ = i;
data[i].value_ = dist(engine);
}
return data;
}
template<class T>
T* duplicate(u32 size, T* src)
{
T* data = reinterpret_cast<T*>(malloc(sizeof(T) * size));
for(u32 i = 0; i < size; ++i) {
data[i] = src[i];
}
return data;
}
typedef bool (*SortCompFuncInt)(const Data&, const Data&);
bool compInt(const Data& x0, const Data& x1)
{
return x0.value_ < x1.value_;
}
typedef void (*SortFuncInt)(u32, Data*);
void insertionsort(u32 n, Data* data)
{
insertionsort(n, data, compInt);
}
void heapsort(u32 n, Data* data)
{
heapsort(n, data, compInt);
}
void introsort(u32 n, Data* data)
{
introsort(n, data, compInt);
}
void mergesort(u32 n, Data* data)
{
mergesort(n, data, compInt);
}
void oddeven_merge_sort(u32 n, Data* data)
{
oddeven_merge_sort(n, data, compInt);
}
void bitonicsort(u32 n, Data* data)
{
bitonicsort(n, data, compInt);
}
void bitonicsort_parallel(u32 n, Data* data)
{
bitonicsort_parallel(n, data, compInt);
}
void bitonicsort_arbitrary_size(u32 n, Data* data)
{
bitonicsort_arbitrary_size(n, data, compInt);
}
void timsort(u32 n, Data* data)
{
Timsort<Data, SortCompFuncInt> timsort(compInt);
timsort.sort(data, n);
}
template<class T>
void sort(u32 size, Data* data, T func, const char* name, bool checkStable)
{
Data* data0 = duplicate(size, data);
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
func(size, data0);
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << name << ": " << duration.count() << " milliseconds" << std::endl;
for(u32 i = 1; i < size; ++i) {
assert(data0[i - 1].value_ <= data0[i].value_);
if(checkStable && data0[i-1].value_ == data0[i].value_){
assert(data0[i-1].no_<data0[i].no_);
}
for(s64 j=i-1; 0 <= j; --j){
assert(data[i].no_ != data[j].no_);
}
}
free(data0);
}
struct DataString
{
u32 no_;
std::string value_;
};
DataString* generateString(u32 size)
{
std::random_device seed_gen;
std::mt19937 engine(seed_gen());
std::uniform_int_distribution<> dist(0, size / 2);
std::stringstream ss;
DataString* data = new DataString[size];
for(u32 i = 0; i < size; ++i) {
data[i].no_ = i;
ss.str("");
ss << std::setfill('0') << std::right << std::setw(7) << dist(engine);
data[i].value_ = ss.str();
}
return data;
}
template<>
DataString* duplicate<DataString>(u32 size, DataString* src)
{
DataString* data = new DataString[size];
for(u32 i = 0; i < size; ++i) {
data[i] = src[i];
}
return data;
}
typedef bool (*SortCompFuncString)(const DataString&, const DataString&);
bool compString(const DataString& x0, const DataString& x1)
{
return x0.value_ < x1.value_;
}
typedef void (*SortFuncString)(u32, DataString*);
void insertionsort(u32 n, DataString* data)
{
insertionsort(n, data, compString);
}
void heapsort(u32 n, DataString* data)
{
heapsort(n, data, compString);
}
void introsort(u32 n, DataString* data)
{
introsort(n, data, compString);
}
void mergesort(u32 n, DataString* data)
{
mergesort(n, data, compString);
}
void oddeven_merge_sort(u32 n, DataString* data)
{
oddeven_merge_sort(n, data, compString);
}
void bitonicsort(u32 n, DataString* data)
{
bitonicsort(n, data, compString);
}
void bitonicsort_parallel(u32 n, DataString* data)
{
bitonicsort_parallel(n, data, compString);
}
void bitonicsort_arbitrary_size(u32 n, DataString* data)
{
bitonicsort_arbitrary_size(n, data, compString);
}
template<class T>
void sort(u32 size, DataString* data, T func, const char* name, bool checkStable)
{
DataString* data0 = duplicate(size, data);
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
func(size, data0);
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << name << ": " << duration.count() << " milliseconds" << std::endl;
for(u32 i = 1; i < size; ++i) {
assert(data0[i - 1].value_ <= data0[i].value_);
if(checkStable && data0[i - 1].value_ == data0[i].value_) {
assert(data0[i - 1].no_ < data0[i].no_);
}
for(s64 j=i-1; 0 <= j; --j){
assert(data[i].no_ != data[j].no_);
}
}
delete[] data0;
}
int main(void)
{
static const u32 N = 1024 * 1024;
{
Data* data = generate(N);
sort<SortFuncInt>(N, data, heapsort, "heap", false);
sort<SortFuncInt>(N, data, introsort, "intro", false);
sort<SortFuncInt>(N, data, mergesort, "merge", true);
sort<SortFuncInt>(N, data, bitonicsort_parallel, "bitonicp", false);
sort<SortFuncInt>(N, data, timsort, "tim", true);
free(data);
}
{
DataString* data = generateString(N);
sort<SortFuncString>(N, data, heapsort, "heap", false);
sort<SortFuncString>(N, data, introsort, "intro", false);
sort<SortFuncString>(N, data, mergesort, "merge", true);
sort<SortFuncString>(N, data, bitonicsort_parallel, "bitonicp", false);
delete[] data;
}
return 0;
}
環境 | |
---|---|
CPU | Core i7-8700 |
RAM | DDR4-2666 32 GB |
コンパイラ | Visual Studio 2019 Version 16.8.5 |
整数
実行時間(ミリ秒) | |
---|---|
heap sort | 105 |
intro sort | 53 |
merge sort | 170 |
bitonic sort | 405 |
tim sort | 108 |
merge sortとtim sortだけ安定ソートです. tim sortは速いけれど, メモリ確保が発生する場合もあるし, 何より複雑. 元々メモリ確保が前提のような, Pythonでは実用的ではないでしょうか.
文字列
実行時間(ミリ秒) | |
---|---|
heap sort | 775 |
intro sort | 319 |
merge sort | 1283 |
bitonic sort | 1527 |
tim sortは対応できていなくてクラッシュするが, 対応する気もないのでなしです.
bitonicが例外の結果をしている以外, 比率は変わらないです. まじめに実装すれば文字列のようなデータもできるよ, ぐらいです. 文字列をソートしようなんて奇妙な人はいないと思いますが. bitonicはほぼGPU用なので, 文字列を比較することもないですし.
まとめ
まったく同じデータを複製してしまっている可能性もあるため, テストが不十分で間違っている可能性もあります. 修正する気も起きないtim sortが悪いんだよ.
これくらいなら, 対象数しだいで十分です. 標準ライブラリが, どのような入力にも対応でき, いかにまともかと改めて思います.
追記
計算機科学に関わるものとして, 一般的感覚に基づいて追記しました. 現在の汎用計算機で追記部分と異なる結果になった場合, 実装が間違っている可能性が高いです.
- Insertion sort
- Selection sortより速い
- Heap sort
- C/C++でオンメモリなら, マージソートより速い
- メモリ確保と数値計算の速度差が大きく影響します
- C/C++でオンメモリなら, マージソートより速い