4
3

More than 3 years have passed since last update.

安定ソートアルゴリズム

Last updated at Posted at 2021-02-17

はじめに

オンメモリで, それなりに実用になるインプレイスの安定ソートが欲しかったので書きました.
C#のようなまともな安定ソートがない環境に移植しやすく, 実用的な速度で動くことが理想です. 最速でなくていい.

型の特性を考慮していないですが, 複雑でないものばかりなので, 必要になったときに作ります.

Intro sort

Insertion sort

内側のループでswapを使う実装を見かけるのですが, どこが出所なのでしょうか. オブジェクトのコピーより整数のコピーが非常に遅い環境でない限り, selection sortより速いです, 整数のコピーが減るからです.

Insertion sort
template<class T, class U>
void insertionsort(u32 n, T* v, U func)
{
    for(u32 i = 1; i < n; ++i) {
        T x = move(v[i]);
        s64 j;
        for(j = i-1; 0 <= j && func(x, v[j]); --j) {
            v[j + 1] = move(v[j]);
        }
        v[j + 1] = move(x);
    }
}

Heap sort

木に対する選択ソートなので, swapを使うと遅くなります. これも, swapを使う実装を見かけるので, どこかいけてない出所があるのでしょう. どのような環境でも高速に動作するので, 安定なソートより遅い場合, 実装がどこか間違っている可能性が高いです.

Heap sort
template<class T, class U>
void heapsort(u32 n, T* v, U func)
{
    --v; //インデックスを1からにする
    u32 i, j;
    T x;
    //n/2からnのノードは子を持たないのでスキップ
    for(u32 k = n >> 1; 1<=k; --k) {
        i = k;
        x = move(v[k]);
        //ノードkより下は, 子<=親が成立
        //ノードkの値を, サブツリーを子<=親を満たすように挿入
        //木の降下による選択ソート
        while((j = i << 1) <= n) {
            if(j < n && func(v[j], v[j + 1])) {
                ++j;
            }

            if(!func(x, v[j])) {
                break;
            }
            v[i] = move(v[j]);
            i = j;
        }
        v[i] = move(x);
    }

    while(n > 1) {
        //v[1]が最大なので, v[n]を交換
        //子<=親が崩れるため, 木の降下による選択ソート
        x = move(v[n]);
        v[n] = move(v[1]);
        --n;
        i = 1;
        while((j = i << 1) <= n) {
            if(j < n && func(v[j], v[j + 1])) {
                ++j;
            }

            if(!func(x, v[j])) {
                break;
            }
            v[i] = move(v[j]);
            i = j;
        }
        v[i] = move(x);
    }
}


Intro sort

C/C++の場合, 再帰でも大して遅くないですが.

Intro sort
template<class T, class U>
void introsort(u32 n, T* v, U func)
{
    u32 depth = 0;
    u32 t = n;
    while(1 < t) {
        ++depth;
        t >>= 1;
    }
    depth = SortMaxDepth < depth ? SortMaxDepth : depth;
    struct Range
    {
        u32 offset_;
        u32 num_;
    };

    Range ranges[SortMaxDepth * 2];
    u8 levels[SortMaxDepth * 2];
    u32 count = 1;
    ranges[0] = {0, n};
    levels[0] = 0;

    while(0 < count) {
        --count;
        const Range& range = ranges[count];
        T* current = v + range.offset_;

        if(range.num_ < SortSwitchN) {
            insertionsort(range.num_, current, func);
            continue;
        }
        if(depth <= levels[count]) {
            heapsort(range.num_, current, func);
            continue;
        }

        u32 i0 = 0;
        u32 i1 = range.num_ - 1;

        T pivot = current[i0 + (i1 >> 1)];

        for(;;) {
            while(func(current[i0], pivot)) {
                ++i0;
            }

            while(func(pivot, current[i1])) {
                assert(0 < i1);
                --i1;
            }

            if(i1 <= i0) {
                break;
            }
            swap(current[i0], current[i1]);
            ++i0;
            --i1;
        }

        u32 offset = range.offset_;
        u8 level = levels[count] + 1;
        ++i1;
        assert(i1 <= range.num_);
        u32 num = range.num_ - i1;

        if(1 < i0) {
            assert(count <= (SortMaxDepth * 2));
            ranges[count] = {offset, i0};
            levels[count] = level;
            ++count;
        }

        if(1 < num) {
            assert(count <= (SortMaxDepth * 2));
            ranges[count] = {offset+i1, num};
            levels[count] = level;
            ++count;
        }
    }
}

Merge sort

このくらいの分量なら移植もしやすいと思います.
mergesort_lower/mergesort_upperがほとんど同じなので, ひとつにしたいところです.

Merge sort
template<class T, class U>
u32 mergesort_lower(const T* v, u32 start, u32 end, u32 pivot, U func)
{
    assert(start<end);
    u32 l = end - start;
    while(0 < l) {
        u32 half = l >> 1;
        u32 mid = start + half;
        if(func(v[mid], v[pivot])) {
            start = mid + 1;
            l = l - half - 1;
        } else {
            l = half;
        }
    }
    return start;
}

template<class T, class U>
u32 mergesort_upper(const T* v, u32 start, u32 end, u32 pivot, U func)
{
    assert(start<end);
    u32 l = end - start;
    while(0 < l) {
        u32 half = l >> 1;
        u32 mid = start + half;
        if(func(v[pivot], v[mid])) {
            l = half;
        } else {
            start = mid + 1;
            l = l - half - 1;
        }
    }
    return start;
}

template<class T>
void mergesort_rotate(T* v, u32 start, u32 mid, u32 end)
{
    assert(start<end);

    while(start < mid && mid < end) {
        u32 m = mid;
        while(start < mid && m < end) {
            swap(v[start], v[m]);
            ++m;
            ++start;
        }
        if(mid <= start) {
            mid = m;
        }
    }
}

template<class T, class U>
void merge(T* v, u32 start, u32 pivot, u32 end, u32 nl0, u32 nr0, u32 level, U func)
{
    if(nl0 <= 0 || nr0 <= 0) {
        return;
    }
    if((nl0 + nr0) < SortSwitchN || SortMaxDepth <= level) {
        insertionsort(nl0 + nr0, v + start, func);
        return;
    }

    u32 nl1, nr1;
    u32 cut0, cut1;
    if(nl0 <= nr0) {
        nr1 = nr0 >> 1;
        cut1 = pivot + nr1;
        cut0 = mergesort_upper(v, start, pivot, cut1, func);
        nl1 = cut0 - start;
    } else {
        nl1 = nl0 >> 1;
        cut0 = start + nl1;
        cut1 = mergesort_lower(v, pivot, end, cut0, func);
        nr1 = cut1 - pivot;
    }
    mergesort_rotate(v, cut0, pivot, cut1);
    ++level;
    u32 mid = cut0 + nr1;
    merge(v, start, cut0, mid, nl1, nr1, level, func);
    merge(v, mid, cut1, end, nl0 - nl1, nr0 - nr1, level, func);
}

template<class T, class U>
void mergesort(u32 n, T* v, u32 left, U func)
{
    if(n < SortSwitchN) {
        insertionsort(n, v + left, func);
        return;
    }
    u32 nl = n >> 1;
    mergesort(nl, v, left, func);
    mergesort(n - nl, v, left + nl, func);
    merge(v, left, left + nl, left + n, nl, n - nl, 1, func);
}

Bitonic sort

内側のループが並列実行できます.

Bitonic sort
template<class T, class U>
void bitonicsort_parallel(u32 n, T* v, U func)
{
    assert(0 <= n);
    // clang-format off
    static const u32 shifttable[] =
    {
        0, 0, 1, 1, 2, 2, 2, 2,
        3, 3, 3, 3, 3, 3, 3, 3,
    };
    // clang-format on

    u32 ret = 0;
    u32 x = n;
    if(x & 0xFFFF0000U) {
        ret += 16;
        x >>= 16;
    }

    if(x & 0xFF00U) {
        ret += 8;
        x >>= 8;
    }

    if(x & 0xF0U) {
        ret += 4;
        x >>= 4;
    }
    u32 log2n = ret + shifttable[x];
    u32 sequences = (log2n*(log2n+1))>>1;

    for(u32 seq=0; seq<sequences; ++seq){
        u32 step = seq;
        u32 size = 0;
        for(;size<step; ++size){
            step -= size+1;
        }

        u32 offsetShift = size-step;
        u32 offset = 1<<offsetShift;
        //u32 stage = offset<<1;
        //u32 stepno = 1<<(size+1);

        u32 stepShift = size+1;
        u32 stageMask = (offset<<1) - 1;

        for(u32 i=0; i<n; ++i){
            u32 csign =  (i & stageMask) < offset? 1 : 0;
            u32 idx = i + (csign<<offsetShift);
            if(idx <= i) {
                continue;
            }
            u32 cdir = ((i >> stepShift) & 1U) == 0 ? 1 : 0;
            if(csign == cdir){
                if(func(v[idx], v[i])) {
                    swap(v[i], v[idx]);
                }
            } else {
                if(func(v[i], v[idx])) {
                    swap(v[i], v[idx]);
                }
            }
        }//for(u32 i)
    }//for(u32 seq)
}

テスト

テストデータは, 1,048,576個の整数と文字列です.

Test
struct Data
{
    u32 no_;
    s32 value_;
};

Data* generate(u32 size)
{
    std::random_device seed_gen;
    std::mt19937 engine(seed_gen());
    std::uniform_int_distribution<> dist(0, size/2);

    Data* data = reinterpret_cast<Data*>(malloc(sizeof(Data) * size));
    for(u32 i = 0; i < size; ++i) {
        data[i].no_ = i;
        data[i].value_ = dist(engine);
    }
    return data;
}

template<class T>
T* duplicate(u32 size, T* src)
{
    T* data = reinterpret_cast<T*>(malloc(sizeof(T) * size));
    for(u32 i = 0; i < size; ++i) {
        data[i] = src[i];
    }
    return data;
}

typedef bool (*SortCompFuncInt)(const Data&, const Data&);
bool compInt(const Data& x0, const Data& x1)
{
    return x0.value_ < x1.value_;
}

typedef void (*SortFuncInt)(u32, Data*);

void insertionsort(u32 n, Data* data)
{
    insertionsort(n, data, compInt);
}

void heapsort(u32 n, Data* data)
{
    heapsort(n, data, compInt);
}

void introsort(u32 n, Data* data)
{
    introsort(n, data, compInt);
}

void mergesort(u32 n, Data* data)
{
    mergesort(n, data, compInt);
}

void oddeven_merge_sort(u32 n, Data* data)
{
    oddeven_merge_sort(n, data, compInt);
}

void bitonicsort(u32 n, Data* data)
{
    bitonicsort(n, data, compInt);
}

void bitonicsort_parallel(u32 n, Data* data)
{
    bitonicsort_parallel(n, data, compInt);
}

void bitonicsort_arbitrary_size(u32 n, Data* data)
{
    bitonicsort_arbitrary_size(n, data, compInt);
}

void timsort(u32 n, Data* data)
{
    Timsort<Data, SortCompFuncInt> timsort(compInt);
    timsort.sort(data, n);
}

template<class T>
void sort(u32 size, Data* data, T func, const char* name, bool checkStable)
{
    Data* data0 = duplicate(size, data);
    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
    func(size, data0);
    std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
    std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << name << ": " << duration.count() << " milliseconds" << std::endl;

    for(u32 i = 1; i < size; ++i) {
        assert(data0[i - 1].value_ <= data0[i].value_);
        if(checkStable && data0[i-1].value_ == data0[i].value_){
            assert(data0[i-1].no_<data0[i].no_);
        }
        for(s64 j=i-1; 0 <= j; --j){
            assert(data[i].no_ != data[j].no_);
        }
    }
    free(data0);
}

struct DataString
{
    u32 no_;
    std::string value_;
};

DataString* generateString(u32 size)
{
    std::random_device seed_gen;
    std::mt19937 engine(seed_gen());
    std::uniform_int_distribution<> dist(0, size / 2);

    std::stringstream ss;
    DataString* data = new DataString[size];
    for(u32 i = 0; i < size; ++i) {
        data[i].no_ = i;
        ss.str("");
        ss << std::setfill('0') << std::right << std::setw(7) << dist(engine);
        data[i].value_ = ss.str();
    }
    return data;
}

template<>
DataString* duplicate<DataString>(u32 size, DataString* src)
{
    DataString* data = new DataString[size];
    for(u32 i = 0; i < size; ++i) {
        data[i] = src[i];
    }
    return data;
}

typedef bool (*SortCompFuncString)(const DataString&, const DataString&);

bool compString(const DataString& x0, const DataString& x1)
{
    return x0.value_ < x1.value_;
}

typedef void (*SortFuncString)(u32, DataString*);

void insertionsort(u32 n, DataString* data)
{
    insertionsort(n, data, compString);
}

void heapsort(u32 n, DataString* data)
{
    heapsort(n, data, compString);
}

void introsort(u32 n, DataString* data)
{
    introsort(n, data, compString);
}

void mergesort(u32 n, DataString* data)
{
    mergesort(n, data, compString);
}

void oddeven_merge_sort(u32 n, DataString* data)
{
    oddeven_merge_sort(n, data, compString);
}

void bitonicsort(u32 n, DataString* data)
{
    bitonicsort(n, data, compString);
}

void bitonicsort_parallel(u32 n, DataString* data)
{
    bitonicsort_parallel(n, data, compString);
}

void bitonicsort_arbitrary_size(u32 n, DataString* data)
{
    bitonicsort_arbitrary_size(n, data, compString);
}

template<class T>
void sort(u32 size, DataString* data, T func, const char* name, bool checkStable)
{
    DataString* data0 = duplicate(size, data);
    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
    func(size, data0);
    std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
    std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << name << ": " << duration.count() << " milliseconds" << std::endl;

    for(u32 i = 1; i < size; ++i) {
        assert(data0[i - 1].value_ <= data0[i].value_);
        if(checkStable && data0[i - 1].value_ == data0[i].value_) {
            assert(data0[i - 1].no_ < data0[i].no_);
        }
        for(s64 j=i-1; 0 <= j; --j){
            assert(data[i].no_ != data[j].no_);
        }
    }
    delete[] data0;
}

int main(void)
{
    static const u32 N = 1024 * 1024;

    {
        Data* data = generate(N);
        sort<SortFuncInt>(N, data, heapsort, "heap", false);
        sort<SortFuncInt>(N, data, introsort, "intro", false);
        sort<SortFuncInt>(N, data, mergesort, "merge", true);
        sort<SortFuncInt>(N, data, bitonicsort_parallel, "bitonicp", false);
        sort<SortFuncInt>(N, data, timsort, "tim", true);
        free(data);
    }
    {
        DataString* data = generateString(N);
        sort<SortFuncString>(N, data, heapsort, "heap", false);
        sort<SortFuncString>(N, data, introsort, "intro", false);
        sort<SortFuncString>(N, data, mergesort, "merge", true);
        sort<SortFuncString>(N, data, bitonicsort_parallel, "bitonicp", false);
        delete[] data;
    }
    return 0;
}

環境
CPU Core i7-8700
RAM DDR4-2666 32 GB
コンパイラ Visual Studio 2019 Version 16.8.5

整数

実行時間(ミリ秒)
heap sort 105
intro sort 53
merge sort 170
bitonic sort 405
tim sort 108

merge sortとtim sortだけ安定ソートです. tim sortは速いけれど, メモリ確保が発生する場合もあるし, 何より複雑. 元々メモリ確保が前提のような, Pythonでは実用的ではないでしょうか.

文字列

実行時間(ミリ秒)
heap sort 775
intro sort 319
merge sort 1283
bitonic sort 1527

tim sortは対応できていなくてクラッシュするが, 対応する気もないのでなしです.
bitonicが例外の結果をしている以外, 比率は変わらないです. まじめに実装すれば文字列のようなデータもできるよ, ぐらいです. 文字列をソートしようなんて奇妙な人はいないと思いますが. bitonicはほぼGPU用なので, 文字列を比較することもないですし.

まとめ

まったく同じデータを複製してしまっている可能性もあるため, テストが不十分で間違っている可能性もあります. 修正する気も起きないtim sortが悪いんだよ.
これくらいなら, 対象数しだいで十分です. 標準ライブラリが, どのような入力にも対応でき, いかにまともかと改めて思います.

追記

計算機科学に関わるものとして, 一般的感覚に基づいて追記しました. 現在の汎用計算機で追記部分と異なる結果になった場合, 実装が間違っている可能性が高いです.

  • Insertion sort
    • Selection sortより速い
  • Heap sort
    • C/C++でオンメモリなら, マージソートより速い
      • メモリ確保と数値計算の速度差が大きく影響します
4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3