5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

D言語で素数を求めるプログラムを書いた その2(アトキンの篩編)

Last updated at Posted at 2023-12-26

追記するか悩みましたが、カレンダーはまだたっぷり空いてるので別の記事にしました。

前の記事を書いた後、改めて素数を調べていたのですが
どうもアトキンの篩(Sieve of Atkin)というのを使うと効率的に計算できるらしいことがわかりました。

ところがこのふるい、日本語の解説があまりないばかりか
原著?英語版Wikiの疑似コードを見てもすごく数学数学していてチンプンカンプンでした。
そこでWikiの各言語版全部見たところポルトガル語イタリア語版
ロシア語版のC版がいい感じに読みやすいコードで書かれていたのでこれをもとに書き直してみました。
(アラビア語にもありますが、ロシア語版と同じもののようです)

とりあえず3つ全部Dで書き直しました。
なお基本的な挙動は同じはずですが、前回と揃えるため標準出力に改行させるように出力は統一してます。

ポルトガル語版ベース版

atkin_pt.d
import std;

alias N = ulong;

class SieveOfAtkin{
    N limit;
    N[] prime;
    bool[] sieve;

    this(N limit){
        this.limit = limit;
        sieve.length = limit+1;
    }

    void flip(size_t prime){
        try sieve[prime] = !sieve[prime];
        catch(Exception e){}
    }
    void invalidate(size_t prime){
        try sieve[prime] = false;
        catch(Exception e){}
    }
    bool isPrime(size_t prime){
        try return sieve[prime];
        catch(Exception e){
            return false;
        }
    }
    N[] getPrimes(){
        auto testingLimit = cast(N)(ceil(limit^^0.5));
        for(auto i=0; i<testingLimit; i++){
            for(auto j=0; j<testingLimit; j++){
                auto n = 4 * i^^2 + j^^2;
                if(n <= limit && (n % 12 == 1 || n % 12 == 5)){
                    flip(n);
                }
                n = 3 * i^^2 + j^^2;
                if(n <= limit && n % 12 == 7){
                    flip(n);
                }
                n = 3 * i^^2 - j^^2;
                if(n <= limit && i > j && n % 12 == 11){
                    flip(n);
                }
            }
        }
        for(auto i=5; i<testingLimit; i++){
            if(isPrime(i)){
                auto k = i^^2;
                foreach(j; iota(k, limit+1, k)){
                    invalidate(j);
                }
            }
        }
        // using limit+1 (type is N) instead of sieve.length (type is size_t)
        prime = [cast(N)2, 3] ~ iota(5, limit+1).filter!(a=>isPrime(a)).array;
        return prime;
    }
}

void main(string[] args){
    auto limit = args.length >= 2 ? args[1].to!N : 100;
    auto soa = new SieveOfAtkin(limit);
    soa.getPrimes.each!writeln;
}

Pythonで書かれていて読みやすいです。
クラスを利用して実装されています。
ちなみに中のiを二乗してる箇所は原文の
4*int(pow(i, 2)) + int(pow(j,2))
のように書くなら
4 * pow(i, 2) + pow(j,2)
でしょうか。
(pow関数、Cとかだと小数になるんですがDだと整数のまま扱ってくれるようですね)

# rdmd --eval='typeid(typeof(4*pow(3,2))).writeln;'
int

後述しますがこのコードには間違いがあります。

イタリア語版ベース版

atkin_it.d
import std;

alias N = ulong;

void main(string[] args){
    auto limit = args.length >= 2 ? args[1].to!N : 100;
    bool[] isPrime;
    isPrime.length = limit+1;
    for(auto x=1; x<=cast(N)(limit^^0.5); x++){
        for(auto y=1; y<=cast(N)(limit^^0.5); y++){
            auto n = 4*x^^2 + y^^2; if(n <= limit && (n%12 == 1 || n%12 == 5)){ isPrime[n] = !isPrime[n]; }
                 n = 3*x^^2 + y^^2; if(n <= limit &&  n%12 == 7              ){ isPrime[n] = !isPrime[n]; }
                 n = 3*x^^2 - y^^2; if(n <= limit && x > y &&      n%12 == 11){ isPrime[n] = !isPrime[n]; }
        }
    }
    for(auto n=5; n<=cast(N)(limit^^0.5); n++){
        if(isPrime[n]){
            foreach(k; iota(n^^2, limit+1, n^^2)){
                isPrime[k] = false;
            }
        }
    }
    writeln(2);
    writeln(3);
    foreach(n; 5..isPrime.length){
        if(isPrime[n]){
            writeln(n);
        }
    }
}

かなりコンパクトにまとまっています。
が、これ厳密にはなんの言語かわからないので困りました。

for n:=1:5 do ...

みたいな場合、これは[1,5): 1,2,3,4なのか[1,5]: 1,2,3,4,5なのでしょうか?

ロシア語Cベース版

atkin_ru_c.d
import std;

alias N = ulong;
//enum fmt = "%llu\n"; // or "%u\n" for uint

void main(string[] args){
    auto limit = args.length >= 2 ? args[1].to!N : 100;
    N sqr_lim;
    bool[] is_prime;
    is_prime.length = limit + 1;
    N x2, y2;
    N i, j;
    N n;

    // ru:グリッドの初期化
    sqr_lim = cast(N)sqrt(cast(real)limit);
    // Dは暗黙の初期化を行うので不要
    //for (i = 0; i <= limit; ++i)
    //    is_prime[i] = false;
    is_prime[2] = true;
    is_prime[3] = true;

    // ru:おそらく素数は奇数の整数です
    // ru:これらの正方形の形での表現
    // ru:x2 と y2 は i と j の 2 乗です (最適化)。
    x2 = 0;
    for (i = 1; i <= sqr_lim; ++i) {
        x2 += 2 * i - 1;
        y2 = 0;
        for (j = 1; j <= sqr_lim; ++j) {
            y2 += 2 * j - 1;

            n = 4 * x2 + y2;
            if ((n <= limit) && (n % 12 == 1 || n % 12 == 5))
                is_prime[n] = !is_prime[n];

            // ru:n = 3 * x2 + y2;
            n -= x2; // ru:最適化
            if ((n <= limit) && (n % 12 == 7))
                is_prime[n] = !is_prime[n];

            // ru:n = 3 * x2 - y2;
            n -= 2 * y2; // ru:最適化
            if ((i > j) && (n <= limit) && (n % 12 == 11))
                is_prime[n] = !is_prime[n];
        }
    }

    // ru:区間 [5, sqrt(limit)] 内の二乗素数の倍数をフィルターで除外します。
    // ru:(メインステージでは排除できません)
    for (i = 5; i <= sqr_lim; ++i) {
        if (is_prime[i]) {
            n = i * i;
            for (j = n; j <= limit; j += n)
                is_prime[j] = false;
        }
    }

    // ru:素数のリストをコンソールに出力します。
    // printfとwritelnの速度差を抑えるため統一
    //printf("2\n3\n5\n");
    writeln("2\n3\n5");
    for (i = 6; i <= limit; ++i) {  // ru:3 と 5 で割り切れるかどうかのチェックが追加されました。アルゴリズムの元のバージョンでは、その必要はありません。
        if ((is_prime[i]) && (i % 3 != 0) && (i % 5 !=  0))
            //printf(fmt, i);
            writeln(i);
    }
}

ほとんど手を加えていません。
キャスト等直すだけでそのままつかえます。
ここのコードを読んで初めて[1,5]であると気づいたわけです。
(すなわちポルトガル語版の元コードも間違っていて、例えばlimitを25にすると25は素数と言ってきます)

他のと比べるとかなり早くて驚いたのですが
foreachがforより遅い、
writelnがprintfよりかなり遅いことに気づきました。
なので原文より遅くなってしまいますがwritelnで統一しています。
アルゴリズムのベンチマークは難しいですね。

ぼくがかんがえたさいきょうの版

atkin_my.d
import std;

alias N = ulong;

void main(string[] args){
    N limit = args.length >= 2 ? args[1].to!N : 100;
    bool[] isPrime;
    isPrime.length = limit+1;
    N i, j, k, n;
    auto sqLimit = cast(N)(limit^^0.5);
    for(i=1; i<=sqLimit; i++){
        for(j=1; j<=sqLimit; j++){
            n = 4 * i * i + j * j;
            if(n > limit){
                break;
            }
            if((n % 12 == 1 || n % 12 == 5)){
                isPrime[n] = !isPrime[n];
            }
        }
        for(j=1; j<=sqLimit; j++){
            n = 3 * i * i + j * j;
            if(n > limit){
                break;
            }
            if(n % 12 == 7){
                isPrime[n] = !isPrime[n];
            }
        }
        for(j=i; j>0; j--){
            n = 3 * i * i - j * j;
            if(n > limit){
                break;
            }
            if(n % 12 == 11){
                isPrime[n] = !isPrime[n];
            }
        }
    }
    for(i=5; i<=sqLimit; i++){
        if(isPrime[i]){
            k = i * i;
            for(j=k; j<=limit; j+=k){
                isPrime[j] = false;
            }
        }
    }

    if(limit >= 2){ writeln(2); }
    if(limit >= 3){ writeln(3); }
    for(i=5; i<isPrime.length; i+=2){
        if(isPrime[i]){
            writeln(i);
        }
    }
}

ここまで出てきたコードを融合させ、かつ書いてて気づいた最適化を入れてみました。

前半の二重ループ、三種類のifをWikipedia各版は一括でループさせてますが分割して
前者二種類はそれぞれ溢れた時点でループを抜けています。
また最後の一種類はi>jの制約があるので、そもそも逆順にループさせることで比較演算を省略しています。

速度比較

$ time ./atkin_my 100000000 | wc   $ time ./atkin_ru_c 100000000 | wc   $ time ./prime3 100000000 | wc   $ time ./atkin_it 100000000 | wc  $ time ./atkin_pt 100000000 | wc
5761455 5761455 51099000           5761455 5761455 51099000             5761455 5761455 51099000         5761455 5761455 51099000          5761455 5761455 51099000

real    0m1.560s                   real    0m1.720s                     real    0m1.672s                 real    0m3.830s                  real    0m4.067s
user    0m1.511s                   user    0m1.805s                     user    0m1.751s                 user    0m3.903s                  user    0m4.106s
sys     0m0.096s                   sys     0m0.021s                     sys     0m0.026s                 sys     0m0.031s                  sys     0m0.060s

$ time ./atkin_my 1000000000 | wc  $ time ./atkin_ru_c 1000000000 | wc  $ time ./prime3 1000000000 | wc
50847534 50847534 501959790        50847534 50847534 501959790          50847534 50847534 501959790

real    0m15.891s                  real    0m18.637s                    real    0m18.456s
user    0m16.553s                  user    0m19.381s                    user    0m19.233s
sys     0m0.343s                   sys     0m0.239s                     sys     0m0.275s

$ time ./atkin_my 10000000000 | wc $ time ./atkin_ru_c 10000000000 | wc $ time ./prime3 10000000000 | wc
455052511 455052511 4948214537     455052511 455052511 4948214537       455052511 455052511 4948214537

real    2m40.030s                  real    3m10.031s                    real    3m13.885s
user    2m47.288s                  user    3m10.231s                    user    3m20.189s
sys     0m2.900s                   sys     0m5.107s                     sys     0m2.686s

前回含め、コンパイルはdmdで追加の最適化オプションなしで行ってます。
prime3.dは前回の公式エラトステネスの篩版です。
既存の参考実装より早くできたのでよかったです。

Appendix

forとforeachそしてiotaの速度

組んでて気づきましたが、どうもforeachよりforのほうが早い気がしてて
コンパイラや最適化によって違うのかと思って調べてみました。
これについては環境依存といっていいと思います。
ただref foreachは若干早いのか?
また、iotaでもforeachに渡せるのですが、これについてはかなり重くなってしまうようです。
(ところでldmdさん最適化強烈杉でしょ・・・)

foreach_speed.d
import std;
import std.datetime.stopwatch;

alias N = ulong;

void main(string[] args){
    auto limit = args.length >= 2 ? args[1].to!N : 100;
    N dummy;
    void simpleWhile(){
        N i=0;
        while(i<limit){ dummy+=i; i++; }
    }
    void simpleFor(){
        for(N i=0; i<limit; i++){ dummy+=i; }
    }
    void simpleForeach(){
        foreach(i; 0..limit){ dummy+=i; }
    }
    void refForeach(){
        foreach(ref i; 0..limit){ dummy+=i; }
    }
    void simpleForeachIota(){
        foreach(i; iota(limit)){ dummy+=i; }
    }
    void refForeachIota(){
        foreach(ref i; iota(limit)){ dummy+=i; }
    }
    auto r = benchmark!(
        simpleWhile,
        simpleFor,
        simpleForeach,
        refForeach,
        simpleForeachIota,
        refForeachIota,
    )(1);
    writeln("sW :", r[0].total!"nsecs", " or ", r[0]);
    writeln("sF :", r[1].total!"nsecs", " or ", r[1]);
    writeln("sFe:", r[2].total!"nsecs", " or ", r[2]);
    writeln("rFe:", r[3].total!"nsecs", " or ", r[3]);
    writeln("sFI:", r[4].total!"nsecs", " or ", r[4]);
    writeln("rFI:", r[5].total!"nsecs", " or ", r[5]);
    writeln(dummy);
}
$ dmd foreach_speed.d && time ./foreach_speed 1000000000
sW :633263100 or 633 ms, 263 μs, and 1 hnsec
sF :633337200 or 633 ms, 337 μs, and 2 hnsecs
sFe:707088000 or 707 ms and 88 μs
rFe:706992400 or 706 ms, 992 μs, and 4 hnsecs
sFI:4328329400 or 4 secs, 328 ms, 329 μs, and 4 hnsecs
rFI:4328019300 or 4 secs, 328 ms, 19 μs, and 3 hnsecs
2999999997000000000

real    0m11.339s
user    0m11.331s
sys     0m0.000s

$ ldmd foreach_speed.d && time ./foreach_speed 1000000000
sW :1324027600 or 1 sec, 324 ms, 27 μs, and 6 hnsecs
sF :1323619700 or 1 sec, 323 ms, 619 μs, and 7 hnsecs
sFe:1346572500 or 1 sec, 346 ms, 572 μs, and 5 hnsecs
rFe:1279434800 or 1 sec, 279 ms, 434 μs, and 8 hnsecs
sFI:3802089600 or 3 secs, 802 ms, 89 μs, and 6 hnsecs
rFI:3805484200 or 3 secs, 805 ms, 484 μs, and 2 hnsecs
2999999997000000000

real    0m12.884s
user    0m12.878s
sys     0m0.000s

$ dmd -O foreach_speed.d && time ./foreach_speed 1000000000
sW :201166600 or 201 ms, 166 μs, and 6 hnsecs
sF :201147100 or 201 ms, 147 μs, and 1 hnsec
sFe:200170100 or 200 ms, 170 μs, and 1 hnsec
rFe:180278100 or 180 ms, 278 μs, and 1 hnsec
sFI:4507250100 or 4 secs, 507 ms, 250 μs, and 1 hnsec
rFI:4507202400 or 4 secs, 507 ms, 202 μs, and 4 hnsecs
2999999997000000000

real    0m9.799s
user    0m9.794s
sys     0m0.000s

$ ldmd -O foreach_speed.d && time ./foreach_speed 1000000000
sW :400 or 4 hnsecs
sF :0 or 0 hnsecs
sFe:0 or 0 hnsecs
rFe:0 or 0 hnsecs
sFI:0 or 0 hnsecs
rFI:0 or 0 hnsecs
2999999997000000000

real    0m0.003s
user    0m0.000s
sys     0m0.003s

writelnとprintfの速度

機能の差だと思いますがprintfのほうが早いです。
計算アルゴリズムが十分早くなってくると、IOの速度も馬鹿にできません。

import std;
import std.datetime.stopwatch;

alias N = ulong;

void main(string[] args){
    auto limit = args.length >= 2 ? args[1].to!N : 100;
    void byWriteln(){
        for(N i=0; i<limit; i++){ writeln(i); }
    }
    void byPrintf(){
        for(N i=0; i<limit; i++){ printf("%llu\n", i); }
    }
    auto r = benchmark!(
        byWriteln,
        byPrintf,
    )(1);
    stderr.writeln("W:", r[0].total!"nsecs", " or ", r[0]);
    stderr.writeln("P:", r[1].total!"nsecs", " or ", r[1]);
}
$ dmd writeln_speed.d && ./writeln_speed 100000000 | wc
W:11567236100 or 11 secs, 567 ms, 236 μs, and 1 hnsec
P:2546832100 or 2 secs, 546 ms, 832 μs, and 1 hnsec
200000000 200000000 1777777780

$ ldmd writeln_speed.d && ./writeln_speed 100000000 | wc
W:4898919800 or 4 secs, 898 ms, 919 μs, and 8 hnsecs
P:2537749600 or 2 secs, 537 ms, 749 μs, and 6 hnsecs
200000000 200000000 1777777780

$ dmd writeln_speed.d && ./writeln_speed 100000000 | wc
W:11279892100 or 11 secs, 279 ms, 892 μs, and 1 hnsec
P:2558629700 or 2 secs, 558 ms, 629 μs, and 7 hnsecs
200000000 200000000 1777777780

$ ldmd writeln_speed.d && ./writeln_speed 100000000 | wc
W:4790392700 or 4 secs, 790 ms, 392 μs, and 7 hnsecs
P:2571432900 or 2 secs, 571 ms, 432 μs, and 9 hnsecs
200000000 200000000 1777777780

-covによるコードカバレッジ解析

コンパイル時に-covオプションを渡すと多少の速度低下と引き換えに
各行が何回通ったかを計測することができます。
実行後、ソース名.lstを見るとソース各行と対応する実行回数が得られます。
デッドコードの発見やコードのどこが重いのかを測るのにも有効です。

image.png

5
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?