SwiftでNDArray書く

  • 2
    いいね
  • 0
    コメント

先月のTry! Swift 2017のハッカソンで数値計算ライブラリnumswの開発に参加しました。
当日は行列演算までを対象とし、numpyライクなものを目指しているということなので後日NDArrayを実装したのですが、ナイーブな実装をしたせいでとにかく遅い!
というわけで高速なNDArray実装を考えるため実験的にライブラリNDArrayを作ってみました。
この記事では仕組みの説明と、numpyの計算速度にどの程度迫れたかを見てみます。
なお私の理解+実装力の範囲での実装なので低速な実装になっている部分もあるかと思います。指摘歓迎。

NDArrayの構造

numsw

まずnumswで書いたNDArrayですが、これはshapeelementsを持ち、elementsは常にRow major orderで並んでいるという構造になっています。
かなり分かりやすい構造なのですが、一方で部分配列をsubscriptする時やtrasnposedで次元の並びを変える際は、常にelementsを切り出したり並び替えたりする必要があります。NDArrayはたいてい要素数が多いのでこの計算効率の悪さは結構響いてきます。

NDArrayのNDArray

numpyのソース読もうとしたら読解力が足りなくて全然読めなかったので想像で作りました。
型定義部分のソース

public struct NDArray {
    public internal(set) var shape: [Int]
    var data: [Float]

    var strides: [Int]
    var baseOffset: Int

elementsの代わりにdata, strides, baseOffserを持っています。これらを使って、elementsのN次元のインデックスからdata中のインデックスを計算できます。

例:NDArray(shape: [2, 2], strides: [2, 1], baseOffsets: 4, data: [0, 1, 2, 3, 4, 5, 6, 7])
[[4, 5], [6, 7]]です。elementsdata中でのインデックスはdot(NDIndex, strides) + baseOffsetsで計算できます(dotは内積)。
例えば[1, 1]要素はdot([1, 1], [2, 1]) + 4 = 7で、data[7] = 7になります。
一般にNDArrayで大きいのはdataなので、data以外だけを書き換えることで配列操作を記述できればdataからelementsを集めるという時間のかかる処理を排除できます。

機能と仕組み

  • Flaotのみ
  • Accelerate使用、Linuxは考えない
  • 値型として働く

transposed

一番簡単なtransposedを見てみます。

このへん

extension NDArray {
    public func transposed() -> NDArray {
        let axes = [Int]((0..<shape.count).reversed())
        return transposed(axes: axes)
    }

    public func transposed(axes: [Int]) -> NDArray {
        precondition(axes.sorted() == [Int](0..<self.ndim))
        var x = self
        for (i, ax) in axes.enumerated() {
            x.strides[i] = self.strides[ax]
            x.shape[i] = self.shape[ax]
        }
        return x
    }

見ての通りshapestridesを並び替えるだけです。
CoWのおかげでdataの実体はコピーされないので計算コストはかなり安くなります。
他の機能もこんな感じで実装することを目指します。

subscriptget

このへん

func get(ndarray: NDArray, indexWithHole: [Int?]) -> NDArray {

    precondition(indexWithHole.count <= ndarray.ndim)

    // fill rest dimensions
    let expandedIndex = indexWithHole + [Int?](repeating: nil, count: ndarray.ndim - indexWithHole.count)

    let newShape = zip(ndarray.shape, expandedIndex).filter { $0.1 == nil }.map { $0.0 }
    let newStrides = zip(ndarray.strides, expandedIndex).filter { $0.1 == nil }.map { $0.0 }

    let startIndex = normalizeIndex(shape: ndarray.shape, ndIndex: expandedIndex.map { $0 ?? 0 })

    let newOffset = indexOffset(strides: ndarray.strides, ndIndex: startIndex) + ndarray.baseOffset
    return NDArray(shape: newShape, strides: newStrides, baseOffset: newOffset, data: ndarray.data)
}

subscriptはとりあえずnumpyでいうarray[0, 1]array[:, 1]のようなのができるのを実装しました。:は使えないのでnilで代用してます。
インデックスがnilの次元は0~の値を取るため、それらを0で埋めたインデックスを作ればそこがNDArrayの開始地点になるので、newOffsetをそこに指定します。あとはnilだったところを走査してやるだけでelementsがでてくるので、それらだけ残したnewShapenewStridesを作ってgetの完成です。

実装していないですがarray[1:3]のようなのやarray[1:5:2]のようなのも同様に実装できるはずです。インターフェースどうしようか迷って保留しました。

broadcast

このへん

numswには導入していない機能でBroadcastingがあります。
要するに特定軸方向に要素を繰り返してshapeを指定形へ変形してくれる機能なのですが、特定軸のストライドを0にすることで簡単に繰り返しが表現できます。
例えばNDArray(shape: [2, 1], strides: [1, 1], baseOffset: 0, data: [0, 1])shape=[2, 2]にブロードキャストする場合、単純にshape[2, 2]にしつつ変更された軸のストライドを0にして[1, 0]とします。
N次元インデックスからのdataインデックスの計算を行えば結果が[[0, 0], [1, 1]]になっていることが分かります。

elementsを集める

ここまではdataを並び替えずにNDArrayに操作を加えていくものでした。しかし実際にはRow major orderで取り出さないと扱いにくいような場合もあります。
elementsを高速に集めるためにgatherElementsを用意しています。

例1:NDArray(shape: [3, 3], strides: [3, 1], baseOffset: 0, data: [0, 1, 2, 3, 4, 5, 6, 7, 8]])
これはdata == elementsなのでそのまま取り出すだけでいいです。
その他subscript等によってelementsがそのままdataの一部分になってる場合がありますが、それはArraySliceを使って取り出します。

例2:NDArray(shape: [3, 3], strides: [1, 3], baseOffset: 0, data: [0, 1, 2, 3, 4, 5, 6, 7, 8]])
例1のtransposedですが、elements[0, 3, 6, 1, 4, 7, 2, 5, 8]となります。
全体としてはdata内に散らばっていますが、elementsを3分割してみると、[0, 3, 6], [1, 4, 7], [2, 5, 8]とそれぞれが一定間隔で並んでいることがわかります。
一定間隔で並んでいるものはcblas_scopyで高速にコピーできるので、一定間隔で並んでいないところについて走査しながら一定間隔の部分を一気にコピーしていきます。

例2の場合の実行コストが特に重いので、gatherElementsをできるだけ呼ばないように実装できると嬉しいです。

sqrt

このへん
ここからはdataを保持できないような操作になります。elementsそれぞれの平方根を取るsqrtを見てみます。
dataはもちろん持ち越せないので、基本はgatherElementsしてからvvsqrtfします。
ただし、elementsdata中で一つのブロックにまとまっている場合、stridesをうまく持ち越すことによってelements順に並び替える操作が不要になる場合があるので、そこだけ分岐しています。

add

このへん
足し算です。ブロードキャストで両辺のshapeは揃うので、あとはストライドの違いをどうやって処理するかになります。
今回の実装はcblas_saxpyを使うので、まず右辺をgatherElementsしてyとし、右辺についてはgatherElementsと同じ方式で走査しながらcblas_saxpyを呼び出していきます。
gatherElementsを挟まずvDSP_vaddをつかったほうが速いかもしれませんが未検証です。

その他実装したもの

negatesqrtとだいたい同じ。
subscriptsetaddとだいたい同じ。
その他いろいろ。

パフォーマンステスト

途中から説明がめんどくさい感じになっていましたがnumpyとの比較です。
sqrtaddについてnumpyの同じ計算と比較してみました。

環境

  • MacBook Pro (Retina, 13-inch, Early 2015)
  • 2.9 GHz Intel Core i5
  • 8 GB 1867 MHz DDR3
  • python3.6.0

sqrt1

func testSqrtPerformance1() {
    let shape = [10, 10, 10, 10, 10, 10]
    let a = NDArray.range(shape.reduce(1, *)).reshaped(shape)
    measure {
        // In [2]: a = np.arange(10**6).reshape([10]*6)
        // In [3]: timeit np.sqrt(a)
        // 100 loops, best of 3: 4.86 ms per loop
        _ = sqrt(a)
    }
}

0.002sec
いきなりnumpyの倍の速度が出ていますが、これはsqrt内のgatherElementsのコストがほぼ0で、実質Accelerateになげるだけだからです。今回は実装してないですがvecLibの各種関数は同設定だと同程度の効果が見込めるのではないでしょうか。

sqrt2

func testSqrtPerformance2() {
    let shape = [10, 10, 10, 10, 10, 10]
    let a = NDArray.range(shape.reduce(1, *)).reshaped(shape).transposed()[1]
    measure {
        // In [4]: a = np.arange(10**6).reshape([10]*6).transpose()[1]
        // In [5]: timeit np.sqrt(a)
        // 1000 loops, best of 3: 824 µs per loop
        _ = sqrt(a)
    }
}

0.012sec
elementsdata内に散在しているのでgatherElementsの計算時間が加わり、要素数は1/10に減っているのに計算時間がかなり伸びてしましました。単純にtransposedで次元をひっくりかえした場合、cblas_scopyで一度にさばけるのが最下の一次元のみになるので、(多分)最も効率が悪くなります。
一方のnumpyですが、要素が減った分ちゃんと短くなっています。
メモリ効率を無視していいならgatherElementsしないでstridesをそのまま使えばsqrt1と同じ速度は得られるのですが……。

add1

func testAddPerformance1() {
    let shape = [10, 10, 10, 10, 10, 10]
    let a = NDArray.range(shape.reduce(1, *)).reshaped(shape)
    measure {
        // In [12]: a = np.arange(10**6).reshape([10]*6).astype(float)
        // In [13]: timeit a+a
        // 100 loops, best of 3: 2.73 ms per loop
        _ = a + a
    }
}

0.003sec
sqrt1と同じくgatherElementsの時間がほぼ0なのでnumpyと同程度の速度が出ています。

add2

func testAddPerformance2() {
    let shape = [10, 10, 10, 10, 10, 10]
    let a = NDArray.range(shape.reduce(1, *)).reshaped(shape)
    let b = a.transposed()
    measure {
        // In [14]: a = np.arange(10**6).reshape([10]*6).astype(float)
        // In [15]: b = a.transpose()
        // In [16]: timeit a+b
        // 100 loops, best of 3: 13.4 ms per loop
        _ = a + b
    }
}

0.148sec
右辺にgatherElementsの時間がかかります。
numpyも少し伸びていますが10倍程の差がついてしましました。

add3

func testAddPerformance3() {
    let shape = [10, 10, 10, 10, 10, 10]
    let a = NDArray.range(shape.reduce(1, *)).reshaped(shape)
    let b = a.transposed()
    let c = a.transposed(axes: [1, 2, 3, 4, 5, 0])
    measure {
        // In [8]: a = np.arange(10**6).reshape([10]*6).astype(float)
        // In [9]: b = a.transpose()
        // In [11]: c = a.transpose([1,2,3,4,5,0])
        // In [12]: timeit b+c
        // 100 loops, best of 3: 5.95 ms per loop
        _ = b + c
    }
}

0.284sec
左辺の走査時間が更に乗ります。
numpyの計算結果は片方のストライドに寄せられるようです。私の実装では両方Row major orderに寄せているのでその辺を改善すればadd2くらいの速度にはなると思います。

まとめ

Accelerateにそのまま渡せばいいところはもちろんとして、Swiftで走査している分を入れてもそんなに悪くない印象でした。
基本的に全ての計算はgatherElementsしてから処理すれば簡単なのですが、高速化のためにそれをどこまで削れるかが勝負です。実際のところRow major orderまで並べ替えなくてももっと半端な並びで計算できることもあるので、そのへんでも抑えられそうです。
ケースごとの最適化はいくつか思いついていますが、実装は複雑だしそれを説明するのはもっと複雑なので今回はパスしました。記事の最後に書いておきます。みなさんも思いついて言語化できたらぜひ教えてください。

numswのNDArrayはLinux対応があったりジェネリックになってたりでAccelerateが使いづらいというのがあるんですが、transposedsubscriptの分は少なくとも速くなるのでそのうち持ってこうと思ってます。

あとnpy, npzファイルを扱うライブラリも作ってますので、NDArrayを実装しようという人は使ってやって下さい。
SwiftNpy

参考

The N-dimensional array (ndarray) — NumPy v1.12 Manual
2.2. Advanced NumPy — Scipy lecture notes

その他アイディア

  • gatherElementsaddの走査は排他な部分への書き込みなので容易に並列化できそう。
  • gatherElementsの取り出し先にもストライドを入れることができる(今回は1固定してる部分)。
  • gatherElementsじゃなくて使わないdataを削ぎ落とすだけの関数用意したらsqrtが早くなりそう。
  • negvDSP_negでストライド付きなのでgatherElements相当のことと同時にできるけど多重実装っぽくなるのが嫌。

04/22
並列化をはじめ幾つかの効率化を入れたところnumpyと遜色ない速度まで到達(最大2倍くらい)。
ついでに最急降下法のサンプル記述。
04/25
実用的な範囲だと並列化のオーバーヘッドが足かせになっていたようなので排除。
現在最大6倍程度。