LoginSignup
13
12

More than 3 years have passed since last update.

高速で汎用的なスライスの操作を求めて

Last updated at Posted at 2017-12-03

高速に、汎用的に、スライスのフィルタリングをしたい。
仮に邪悪なことに手を染めることになっても。

この投稿では…

この投稿は、以前投稿した sort.Slice に学ぶ高速化のヒント をスライスのフィルタリングの実装に適用してみた、という内容になっています。
(この投稿はチュートリアル的な内容となっています)

汎用的な処理を実現するにはリフレクションの力が必要ですが、それだけでは低速になってしまいます。

この投稿では、リフレクションでの実装から始まり、段々と高速な実装にしていきます。
その経緯で少しだけ道を踏み外します。

注意書き

一部、unsafe な機能を使っています。

本来は、言語のメモリモデルにそって注意深く実装すべきものです。
が、今回はネタということで、無鉄砲に実装していきたいと思います。

リフレクションで解決してみる

まず最初に、どのようなスライスでもフィルタリングできるような関数を、リフレクションを使って実装してみます。

*[]int などのスライスへのポインタを渡したら、そのスライスの中身をフィルタリングすると言うものです。

func FilterSimpleWithReflection(ptrSlice interface{}, funcs ...FilterFunc) {
    rv := reflect.ValueOf(ptrSlice)

    // 型チェック
    if rv.Kind() != reflect.Ptr || rv.Elem().Kind() != reflect.Slice {
        panic("not a pointer to a slice")
    }

    // スライス
    sv := rv.Elem()

    // 長さの取得と、処理不要なら return
    length := rv.Elem().Len() // len(*ptrSlice)
    if length == 0 {
        return
    }

    // フィルタ関数を通過しなかったものを削除する。
    // 末尾の添字から削除していく。
    for i := length - 1; i >= 0; i-- {
        allok := true
        for _, f := range funcs {
            if !f(i) {
                allok = false
            }
        }
        if allok {
            continue
        }
        // ここまでは通過の判断

        // リフレクションを使って、スライスの要素を削除する。
        sv.Set(reflect.AppendSlice(sv.Slice(0, i), sv.Slice(i+1, sv.Len())))
    }
}

このように使います。

slice := []int{1, 2, 3, 4, 5}
FilterSimpleWithReflection(&slice, func(i int) bool { return slice[i]%2 == 0 }) // 偶数のものに絞り込みたい

実行速度は、特定の型に特化した実装には惨敗します。

>go test -bench . -benchmem
goos: windows
goarch: amd64
BenchmarkManual-4                 100000             18264 ns/op            8192 B/op          1 allocs/op
BenchmarkReflection-4              10000            128765 ns/op           72224 B/op       2002 allocs/op

およそ 1/7 の速度になっています。
アロケーションの回数も、めちゃくちゃに多いです。

ちなみに、型に特化した実装は以下の通りです。
リフレクションの使用と型チェックがなくなっています。

func FilterSimpleInt(slice *[]int, funcs ...FilterFunc) {
    length := len(*slice)
    if length == 0 {
        return
    }

    for i := length - 1; i >= 0; i-- {
        allok := true
        for _, f := range funcs {
            if !f(i) {
                allok = false
            }
        }
        if allok {
            continue
        }

        *slice = append((*slice)[0:i], (*slice)[i+1:]...)
    }
}

Type switches を使ってみる

ちょっと苦し紛れですが、速い実装があるならば部分的にそちらを使うようにしてみます。

つまり、Type switches を使い、ある程度のバリエーションの型に対応してみます。

実装としては、既存の関数を使いまわします。

func FilterSimpleWithTypeSwitches(slice interface{}, funcs ...FilterFunc) {
    switch v := slice.(type) {
    case *[]int:
        FilterSimpleInt(v, funcs...)
    default:
        FilterSimpleWithReflection(slice, funcs...)
    }
}

これで、型に特化した実装と遜色の無い速度になります。

↓先頭が今回実装したもの。

BenchmarkTypeSwitches-4           100000             18491 ns/op            8224 B/op          2 allocs/op

BenchmarkManual-4                 100000             18350 ns/op            8192 B/op          1 allocs/op
BenchmarkReflection-4              10000            128916 ns/op           72224 B/op       2002 allocs/op

ですが、コードにある通り、*[]int 以外を渡すとリフレクションを使ってしまいます。

対象の型が分かっている場合には上記の Type switches の case を増やすことで対応可能ですが、
それ以外の型を渡されると、リフレクションの実装に切り替わるため、途端にパフォーマンスが低下します。

今回は []int 向けの FilterSimpleInt を定義していますが、
これを型ごとに FilterSimpleString のような関数を定義していくのは、ちょっとやってられませんね。

何より、関数を呼び出す側の独自型を使われてしまうと、もはや対応のしようがありません。

どんな型の要素でも操作できるような仕組みがほしくなります。

邪悪な力を借りる

そんな時、私の中の悪魔が何やら囁きました。

:smiling_imp:「…要素の型が不明でも、要素のサイズが分かれば、スライスの操作はできるんじゃないかな」

つまり、こういうこと↓ではなくて…

switch v := slice.(type) {
case *[]int:
    FilterSimpleInt(v, funcs...)

代わりに、こんなことをしてみたくなったのです…

// こんな書き方はできないのですが。
switch slice.elem.(size) {
case 16: // 偶然 sizeof(struct{...}) == 16 な型も扱いたい!
    FilterSimple16(to16(slice), funcs...)
case 32: // 偶然 sizeof(struct{...}) == 32 な型も扱いたい!
    FilterSimple32(to32(slice), funcs...)

もちろん、上のような書き方をしたとしても、どんな型の要素でも対応できるわけではありません。
ですが、サイズが 16 の型には int16 以外にもありますし、独自型でサイズが 16 というものも扱えるということになります。
サイズに対するバリエーションを増やせば、劇的にカバーできる型が増えたことになります。

ま、そんなことができれば、ですが。

こんなことを考えてみる

ところで、Go では unsafe なことをすれば、強制的に型変換ができます。

func TestTryConv(t *testing.T) {
    moto := "abc"

    my := &moto // *string
    cnv := *(**int)(unsafe.Pointer(&my)) // *string -> **string -> **int (!?) -> *int

    if unsafe.Pointer(my) != unsafe.Pointer(cnv) {
        t.Errorf("%p, %v", my, cnv)
    }
}

上の例では、*string*int に変換しています。

…ということは、スライスも別のスライスの型に変換できるのではないでしょうか。

:smiling_imp:「つまり、要素のサイズさえわかってしまえば、同じサイズの要素からなるスライスに変換して操作ができるんじゃないかな」

:smiling_imp: じゃあ、こんなことをしてみましょう。

邪悪な力を使ってみる

今回行う一番の邪悪はこちらです。

func TestTrySwap(t *testing.T) {

    my := []struct{ buf string }{{"a"}, {"b"}}

    fmt.Printf("%v\n", my)  //=> [{a} {b}] 当然こうなります


    fmt.Printf("%d\n", unsafe.Sizeof(my[0])) // 16 (on 64bit)

    if unsafe.Sizeof(my[0]) == 16 {
        // 強制的に型変換
        s16 := *(*[][16]byte)(unsafe.Pointer(&my))

        // 本来とは異なる型のスライス上で、要素を交換
        s16[0], s16[1] = s16[1], s16[0]

    } else if unsafe.Sizeof(my[0]) == 8 {
        s8 := *(*[]int64)(unsafe.Pointer(&my))
        s8[0], s8[1] = s8[1], s8[0]
    }


    fmt.Printf("%v\n", my) //=> [{b} {a}]  なんと
}

なんということでしょうか、元と全く異なる型のスライス上で要素を交換できてしまいました。

※どのような環境でもこのようなことができる、もしくは今後の Go でこのようなことができる、という保証はありません。

単に要素が 8 バイトもしくは 16 バイトというだけで、同じ要素サイズを持つ別のビルトイン型のスライス型に変換しました。
そして、そのスライスを操作することで、元のスライスを操作できてしまいました。

スライス型として取得できたのですから、要素の削除はできるはずです。
要素の追加は、元の型が分からないとできませんけどね。

より具体的なシチュエーションでの邪悪な力

では、今度は「2番目の要素を削除する」というフィルタリングをやってみましょう。

func TestTryRemove(t *testing.T) {

    my := []struct{ buf string }{{"a"}, {"b"}, {"c"}}

    fmt.Printf("%v\n", my)  //=> [{a} {b} {c}]


    if unsafe.Sizeof(my[0]) == 16 {
        s16 := (*[][16]byte)(unsafe.Pointer(&my)) // ptr
        *s16 = append((*s16)[0:1], (*s16)[1+1:]...)

    } else if unsafe.Sizeof(my[0]) == 8 {
        s8 := (*[]int64)(unsafe.Pointer(&my)) // ptr
        //s8 := *(*[][8]byte)(unsafe.Pointer(&my))
        *s8 = append((*s8)[0:1], (*s8)[1+1:]...)
    }


    fmt.Printf("%v\n", my) //=> [{a} {c}]  なんとなんと
}

おー、できました。

これで、要素のサイズさえ分かれば、具体的な型が分からなくてもスライスの操作ができたことになります。

邪悪な力を使いやすくしてみる

要素サイズの場合分けがコード中にべた書きになっていますが、この部分を「スライスを受け取り、i 番目の要素を削除する関数」に括り出します。

汎用的にスライスを受け取りたいので、以下のように実装します。

  • 引数は interface{}
  • この引数にはスライスのアドレスを渡す(スライス自体を操作したいので)
  • リフレクションで要素のサイズを判断
  • もう一つの引数には、削除対象の添字 i を渡す
func EvilRemove(ptrSlice interface{}, i int) {
    v := reflect.ValueOf(ptrSlice)
    if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
        panic("want *[]xxx !!")
    }

    switch v.Type().Elem().Elem().Size() { // sizeof((*ptrSlice)[0]) == sizeof(slice[0])
    case 1:
        s := (*[]byte)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    case 2:
        s := (*[]int16)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    case 3:
        s := (*[][3]byte)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    case 4:
        s := (*[]int32)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    case 8:
        s := (*[]int64)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    case 16:
        s := (*[][16]byte)(unsafe.Pointer(v.Pointer()))
        *s = append((*s)[0:i], (*s)[i+1:]...)
    // :
    }
}

以下のように使います。

func TestEvilRemove(t *testing.T) {
    s1 := []int{1, 2, 3}
    EvilRemove(&s1, 1)
    if len(s1) != 2 || s1[0] != 1 || s1[1] != 3 {
        t.Errorf("%#v", s1)
    }

    s2 := []string{"1", "2", "3"}
    EvilRemove(&s2, 1)
    if len(s2) != 2 || s2[0] != "1" || s2[1] != "3" {
        t.Errorf("%#v", s2)
    }
}

邪悪な結果1

では、上で実装した EvilRemove を使って、フィルタリングを実装してみます。

func FilterEvil(ptrSlice interface{}, funcs ...FilterFunc) {
    v := reflect.ValueOf(ptrSlice)
    if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
        panic("want *[]xxx !!")
    }

    length := v.Elem().Len() // len(*ptrSlice)
    if length == 0 {
        return
    }

    for i := length - 1; i >= 0; i-- {
        allok := true
        for _, f := range funcs {
            if !f(i) {
                allok = false
            }
        }
        if allok {
            continue
        }

        EvilRemove(ptrSlice, i)  // *** ここ ***
    }
}

ベンチマークも取ります。

BenchmarkEvil-4                    50000             30903 ns/op            8224 B/op          2 allocs/op

BenchmarkManual-4                 100000             18289 ns/op            8192 B/op          1 allocs/op
BenchmarkReflection-4              10000            128106 ns/op           72224 B/op       2002 allocs/op
BenchmarkTypeSwitches-4           100000             18365 ns/op            8224 B/op          2 allocs/op

やはり、特定の型に特化した実装には敵いません。
ですが、リフレクションをフルに使う場合と比べると、かなり速くなりました。

邪悪な汎用化

上の FilterEvil では、要素を削除する度にリフレクションを使っていました。
ループ中は、どうせ同じ型のスライスを対象としているのですから、何度も EvilRemove を呼び出さないようにしてみましょう。

まずはそのために「スライスを受け取り、{そのスライスの i 番目の要素を削除する関数} を返す関数」を定義します。

// これは補助関数
func EvilRemover(ptrSlice interface{}) func(i int) {
    v := reflect.ValueOf(ptrSlice)
    if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
        panic("want *[]xxx !!")
    }

    switch v.Type().Elem().Elem().Size() { // sizeof((*ptrSlice)[0]) == sizeof(slice[0])
    case 1:
        s := (*[]byte)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    case 2:
        s := (*[]int16)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    case 3:
        s := (*[][3]byte)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    case 4:
        s := (*[]int32)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    case 8:
        s := (*[]int64)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    case 16:
        s := (*[][16]byte)(unsafe.Pointer(v.Pointer()))
        return func(i int) {
            *s = append((*s)[0:i], (*s)[i+1:]...)
        }
    // :
    }

    return nil
}

これを使うと、FilterEvil が次のようになります。

func FilterEvil2(ptrSlice interface{}, funcs ...FilterFunc) {
    v := reflect.ValueOf(ptrSlice)
    if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Slice {
        panic("want *[]xxx !!")
    }

    length := v.Elem().Len() // len(*ptrSlice)
    if length == 0 {
        return
    }

    remove := EvilRemover(ptrSlice)
    // ↑で ptrSlice の強制的な型変換が完了している。

    for i := length - 1; i >= 0; i-- {
        allok := true
        for _, f := range funcs {
            if !f(i) {
                allok = false
            }
        }
        if allok {
            continue
        }

        remove(i)
        // ↑は *s = append((*s)[0:i], (*s)[i+1:]...) をしているだけ。
        // s の型は、ptrSlice に応じて、既に決まっている。
        // s は ptrSlice を参照している。
    }
}

邪悪な結果2

上で実装した FilterEvil2 の力を試してみましょう。(ベンチマークをとります)

BenchmarkEvil2-4                  100000             19258 ns/op            8240 B/op          3 allocs/op

BenchmarkManual-4                 100000             18340 ns/op            8192 B/op          1 allocs/op
BenchmarkEvil-4                    50000             29220 ns/op            8224 B/op          2 allocs/op
BenchmarkReflection-4              10000            129093 ns/op           72224 B/op       2002 allocs/op
BenchmarkTypeSwitches-4           100000             18547 ns/op            8224 B/op          2 allocs/op

やりました。
5% ほどの速度差はありますが、特定の型に特化した実装に迫る速度になりました。

そこそこ汎用的に扱えるのであれば、このくらいで十分ではないでしょうか。(unsafe だけど)

あとは EvilRemover の case を充実していけば、殆どの型をカバーできたことになります。
最後に保険として、default ではリフレクションの実装になるようにしておけば最悪の場合でも動かないということはないでしょう。

まとめ

おさらい

最初はリフレクションで実装してみましたが、速度があまり出ませんでした。

次に、Type switches を試してみました。
しかし、この方法では速度は出るものの個別の型を網羅することは不可能という事実に直面しました。

そして、型そのものではなく要素のサイズに着目することで、妥協案ではありますが、より多くの型に対応する方法を試しました。
その際、unsafe の機能を使い、要素のサイズが同じビルトイン型からなるスライスに強制的に型変換する方法を採りました。

高速で汎用的な実装をするために

今回、高速で汎用的な実装をするにあたり、以下の工夫をしました。

  • リフレクションの利用を局所に抑えました。
  • 実際のスライスの操作にはリフレクションを使わず、具体的な型のスライスを使って行うようにしました。
  • その際、unsafe の機能を使い、要素のサイズごとに仮のスライス型に変換するようにしました。

正義(標準ライブラリ)の力を借りる場合

reflect.Swapper を使いましょう。

Go 標準ライブラリですし unsafe じゃないし。(Swapper の実装は unsafe を使ってはいます)
それに、(おそらく)全ての型に対応しています。

reflect.Swapper がどういうものかは、以前投稿した sort.Slice に学ぶ高速化のヒント でも触れています。よかったら参照してみて下さい。

ただ、これは本来2つの要素を交換するためのものです。
フィルタリングのように要素を削除する用途で使うには、工夫しないといけません。
単純に実装すると、どうしても上の実装よりは遅くなりがちだと思います。

機能の括り出し&サンプルパッケージ

上記の EvilXxx の部分を括り出した clise というパッケージを作ってみました。

shu-go/clise

このパッケージには、in-place でフィルタリングをする関数や、フィルタリング後の内容を新たなスライスとして作成する関数、
また、任意の型のスライスを削除・コピーする関数も含まれています。

※あくまでも実験的なものです。

簡単に紹介すると、以下の様なものです。

  • FilterSimple
    • 今回の FilterEvil2 に相当する関数。
  • FilterSimpleFast
    • 結果的に消されるものが多い場合にはこちらが速い。
  • Filter
    • reflect.Swapper を使ったサンプル。
    • 汎用的で、標準パッケージを使っているので安心。しかも速い(少し工夫をしているので)
  • CopyFiltered
    • 新たなスライスにフィルタリング後の要素をコピーする版。
  • MakeRemover
    • 今回の EvilRemover に相当する関数。添字の範囲を指定。
  • MakeCopier
    • 添字指定で「2スライス間で要素をコピーする関数」を返す関数。

フィルタリングに関しては reflect.Swapper を使った実装のほうが早くなりました。:sweat:

最後に

邪悪はいいものですよ。^^

13
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
12