3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

thread+SIMD命令でnumpyを超えることができるか?(3)

Last updated at Posted at 2019-04-04

pybind11 + SIMD命令でnumpyを超えることができるか?(2)の続き
勉強不足の為か、PyBind11ではcomplex型をC++に渡すことができなかったので、諦めて普通に書きました。

変更点

gitのソースはこちら(SHA 744b00c2f81aca7f8a7aad51204775f676f8493c
複素数の内積に対応してみました。
メモリのアラインメントのチェックとかしてないので、環境によってはうまく動かないかもしれません。
また多次元配列の内積は計算方法が良く分からなかったので、対応していません。
どなたか計算方法をご存知でしたら、コメント頂けると幸いです。

速度計測

numpytest.pyの実行結果の一部
time ratioが速くなった割合になります。
だいたい速くなりました。特にfloatは1.3倍ぐらいに。
ただ、繰り返し回数が多い為か、numpyでもfloatでは誤差が大きいです。

array size =  134217728
double array inner
numpy.inner 2.665707(sec) result= 33549279.989558447
mylib.inner 2.639468(sec) result= 33549279.98955811
time ratio 1.009941
float array inner
numpy.inner 1.814914(sec) result= 33485106.0
mylib.inner 1.285761(sec) result= 33484994.0
time ratio 1.411549
complex double array inner
numpy.inner 6.509132(sec) result= (-3135.1374241635203+67107660.49349004j)
mylib.inner 5.030735(sec) result= (-3135.137423543725+67107660.493492335j)
time ratio 1.293873
complex float array inner
numpy.inner 3.350025(sec) result= (-3396+66661340j)
mylib.inner 2.547152(sec) result= (-3043.25+66661356j)
time ratio 1.315204

一応、ソース貼っておきます。
ソース貼るだけだと寂しいので、ソースの説明を追加しました。

mylib.cpp

全ソースはgithubを参照してください。

T,Uの組み合わせが間違ってないことを確認するマクロ。

//T,Uの組み合わせの確認マクロ
# define type_check() \
 static_assert(((is_same<double,U>::value) && (is_same<__m256d,T>::value)) || \
    ((is_same<float,U>::value) && (is_same<__m256,T>::value)) ||\
    ((is_same<complex<float>,U>::value) && (is_same<__m256,T>::value)) ||\
    ((is_same<complex<double>,U>::value) && (is_same<__m256d,T>::value))\
    ,"type mismatch")

YMMレジスタにメモリから読み込む。

オーバーロードで型によって、違う関数が呼ばれます。

static inline auto m256_load(const double* d1){
        return _mm256_load_pd(d1);
}
static inline auto m256_load(const complex<double>* d1){
        return _mm256_load_pd((const double*)d1);
}
static inline auto m256_load(const float* d1){
        return _mm256_load_ps(d1);
}
static inline auto m256_load(const complex<float>* d1){
        return _mm256_load_ps((const float*)d1);
}

積和計算

これもオーバロードのためにある関数。

static inline __m256d m256_fmadd(__m256d d1, __m256d d2, __m256d d3){
    return _mm256_fmadd_pd(d1,d2,d3);
}
static inline __m256 m256_fmadd(__m256 d1, __m256 d2, __m256 d3){
    return _mm256_fmadd_ps(d1,d2,d3);
}

水平加算、水平減算

これもオーバロードのためにある関数。

static inline __m256d m256_hsub(__m256d d1, __m256d d2){
    return _mm256_hsub_pd(d1,d2);
}
static inline __m256 m256_hsub(__m256 d1, __m256 d2){
    return _mm256_hsub_ps(d1,d2);
}
static inline __m256d m256_hadd(__m256d d1, __m256d d2){
    return _mm256_hadd_pd(d1,d2);
}
static inline __m256 m256_hadd(__m256 d1, __m256 d2){
    return _mm256_hadd_ps(d1,d2);
}

実部と虚部を揃える為の関数。

complex<double> の場合にし使わない。
complex<float>ではうまく組み立てられなかった。

static inline __m256d m256_blend(__m256d d1, __m256d d2){
    return _mm256_blend_pd(d1,d2,0xa);
}

YMMレジスタに0を入れる為の関数。

template <typename T>
static inline T m256_setzerp(){
    if(is_same<__m256d,T>::value){
        return (T)_mm256_setzero_pd();
    }else{
        return (T)_mm256_setzero_ps();
    }
}

SIMD命令で計算できない終端部分の積和計算を行う。

普通に積和する。

//simd無しの内積を求める関数
template <typename U>
static U calc_inner_nosimd(const U* v1, const U* v2, ssize_t size){
    U result = 0;
    //cout << "calc_inner_nosimd " << size <<"\n";
    for(ssize_t i = 0; i < size; i++){
        //cout << i << " " << v1[i] << "*" << v2[i] << "=" << r << "\n";
        result += v1[i] * v2[i];
    }
    return result;
}

実部と虚部を別々にSIMDで積和計算する。

虚部はpermuteで順番を入れ替えて積和。
複素数の掛け算は下記のようになる。
(a+bj)(c+dJ) = ac-bd + (ad+bd)j
速度向上のため、sum_realにはabとcdを、sum_imagにはadとbdを別々に求めて足していき、最後に全部を足すことにした。

例 complex<double>の場合

SIMD計算1回分のメモリには2個の複素数が下のように格納されている。

  • v1
real[0] imag[0] real[1] imag[1]
a[1] b[1] a[0] b[0]
  • v2
real[0] imag[0] real[1] imag[1]
c[1] d[1] c[0] d[0]

で、これを1回積和計算すると、sum_real/sum_imagには下のような値が入る。
途中で足したりすると計算量が増えるので、このまま四個の値を別々に足していき、最後に四個の値を入れ替えたり、足したりして答えを得る。

sum_real[3] sum_real[2] sum_real[1] sum_real[0]
a[1]*c[1] b[1]*d[1] a[0]*c[0] b[0]*d[0]
sum_imag[3] sum_imag[2] sum_imag[1] sum_imag[0]
a[1]*d[1] c[1]*d[1] a[0]*d[0] c[0]*d[0]
//実数部の積和計算
static auto muladd(const __m256d d1, const __m256d d2, __m256d sum){
    return m256_fmadd(d1,d2,sum);//sum=d1*d2+sum
}
static auto muladd(const __m256 d1, const __m256 d2, __m256 sum){
    return m256_fmadd(d1,d2,sum);//sum=d1*d2+sum
}
//虚数部の積和計算
//配列の順序を入れ替えて積和計算を行う。
//dummyは型を決定するための引数。
//doubleとfloatは虚数部がないので、そのままの値を返す。最適化で消えて無くなる。
static inline auto muladd_imag(const __m256d d1, const __m256d d2, __m256d sum, const complex<double>* dummy){
    auto d = _mm256_permute_pd(d1, 0x5);
    return m256_fmadd(d,d2,sum);
}
static inline auto muladd_imag(const __m256 d1, const __m256 d2, __m256 sum, const complex<float>* dummy){
    auto d = _mm256_permute_ps(d1, 0xb1);
    return m256_fmadd(d,d2,sum);
}
static inline auto muladd_imag(const __m256d d1, const __m256d d2, __m256d sum, const double* dummy){
    return sum;
}
static inline auto muladd_imag(const __m256 d1, const __m256 d2, __m256 sum, const float* dummy){
    return sum;
}

最後に足して答えを得る。

SIMDレジスタで別になった実部と虚部を合わせる。型によって合わせ方と足し方が違う。

//YMM regから答え取り出す。型によって結果の入り方が違うので、別関数になる。
static inline auto getResult(const double sum, const __m256d sum_real, const __m256d sum_imag){
    const double* results = (const double*)&sum_real;
    auto result = sum + results[0] + results[1] + results[2] + results[3];
    return result;
}

static inline auto getResult(const complex<double> sum, const __m256d sum_real, const __m256d sum_imag){
    auto real = m256_hsub(sum_real, sum_real);
    auto imag = m256_hadd(sum_imag, sum_imag);
    auto result_complex = m256_blend(real, imag);
    const complex<double>* results = (const complex<double>*)&result_complex;
    return sum + results[0] + results[1];
}

static inline auto getResult(const float sum, const __m256 sum_real, const __m256 sum_imag){
    const float* results = (const float*)&sum_real;
    auto result = sum + results[0] + results[1] + results[2] + results[3] 
        + results[4] + results[5] + results[6] + results[7];
    return result;
}
static inline auto getResult(const complex<float> sum, const __m256 sum_real, const __m256 sum_imag){
    auto result_real = m256_hsub(sum_real, sum_real);
    auto result_imag = m256_hadd(sum_imag, sum_imag);
    const float* real = (const float*)&result_real;
    const float* imag = (const float*)&result_imag;
    return sum +  complex<float>(real[0] + real[1] + real[4] + real[5],
                    imag[0] + imag[1] + imag[4] + imag[5]);
}

SIMD積和計算のメインループ。

//simd命令を使って、内積を求める。
template <typename T,typename U>
static U calc_inner(const U* v1, const U* v2, ssize_t size){
    type_check();
    //cout << "calc_inner size " << size << "\n";
    constexpr int step = sizeof(T) / sizeof(U);
    static_assert(step > 0,"illeagal type");
    //cout << "step " << typeid(U).name() << step << "\n";
    //simdレジスタ1個に入るデータの数
    if( size < step ){
        return calc_inner_nosimd<U>(v1,v2,size);//配列が短すぎる場合には普通に計算
    }
    auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
    size -= remain;
    auto sum_real = m256_setzerp<T>();
    T sum_imag = sum_real;//虚数部。double/floatの計算の際にはこの変数は使われない為、最適化で消える。
    for(ssize_t i = 0; i < size; i+= step){
        //cout << " i " << i << "\n";
        auto d1 = m256_load(v1);
        auto d2 = m256_load(v2);
        sum_real = muladd(d1,d2,sum_real);
        sum_imag = muladd_imag(d1,d2,sum_imag, v1);
        v1 += step;
        v2 += step;
    }
    U result = calc_inner_nosimd<U>(v1,v2,remain);//4個または8個のあまりの計算を行う。
    result = getResult(result, sum_real, sum_imag);
    return result;
}

親スレッド関数。

NUM_THREAD-1の数のスレッドを起動して、計算を割り振りスレッドの終了を待ってから結果を得る。
ここの32のマジックナンバーは調整が必要。わざわざスレッドを起こすべきかどうかの判定のループ回数。

    if( size1 < 32 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
static U calc_inner_multithread(const U* d1, const U* d2, ssize_t size1) {
    //cout << "calc_inner_multithread size " << size1 << "\n";
    if( size1 < 32 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
        return calc_inner<T,U>(d1, d2, size1);
    }
    auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
    auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
    vector<future<U>> futures;
    if( size > 0 ){
        for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
            ssize_t start = i * size;
            auto d1start = d1 + start;
            auto d2start = d2 + start;
            futures.push_back(async(launch::async,[d1start,d2start,size](){//別スレッドでcalc_innerを実行
                return calc_inner<T,U>(d1start, d2start, size);
            }));
        }
    }
    ssize_t start = (NUM_THREADS-1) * size;
    U result = calc_inner<T,U>(d1 + start, d2 + start, size + remain);//自スレッド分の計算
    for(auto&& f : futures){//他のスレッドの計算結果を集計
        result += f.get();
    }
    return result;
}

innerの本体。
渡されてくる配列の種類によって、積和計算の関数を呼び分け、結果をpythonに返す。
何故か答えをcomplex64つまり"F"で返すと、エラーとなった。
こんな結果に


SystemError: bad format char passed to Py_BuildValue

(どなたか理由がわかる方いらっしゃいませんか?)
仕方ないので、complex128="D"でpythonに答えを返している。
この行です。

            return Py_BuildValue("D", &result);//"F"ではエラーとなる。

したがって、complex64配列の内積は、complex128で帰ってくる。

static PyObject* inner(PyObject *self, PyObject *args)
{
    PyArrayObject *array1,*array2;
    if (!PyArg_ParseTuple(args, "OO", &array1, &array2 )){
        PyErr_SetString(PyExc_ValueError, "not array");
        return NULL;
    }
    auto size1 = array1->dimensions[0];
    auto size2 = array2->dimensions[0];
    if( array1->nd != 1 || array2->nd != 1 || size1 != size2 ){
        PyErr_SetString(PyExc_IndexError, "shapes not aligned");
        return NULL;
    }
    if( array1->descr->type != array2->descr->type ){
        PyErr_SetString(PyExc_IndexError, "type miss match");
        return NULL;
    }
    
    //cout << (array1->descr->type) << "\n";
    switch(array1->descr->type ){
        case 'd':{
            const double* data1 = (const double*)array1->data;
            const double* data2 = (const double*)array2->data;
            auto result = calc_inner_multithread<__m256d,double,4>(data1, data2, size1);
            //cout << "result=" << result << "\n";
            return Py_BuildValue("d", result);
        }
        case 'D':{
            const complex<double>* data1 = (const complex<double>*)array1->data;
            const complex<double>* data2 = (const complex<double>*)array2->data;
            auto result = calc_inner_multithread<__m256d,complex<double>,4>(data1, data2, size1);
            //cout << "result=" << result << "\n";
            return Py_BuildValue("D", &result);
        }
        case 'f':{
            const float* data1 = (const float*)array1->data;
            const float* data2 = (const float*)array2->data;
            auto result = calc_inner_multithread<__m256,float,4>(data1, data2, size1);
            //cout << "result=" << result << "\n";
            return Py_BuildValue("f", result);
        }
        case 'F':{
            const complex<float>* data1 = (const complex<float>*)array1->data;
            const complex<float>* data2 = (const complex<float>*)array2->data;
            complex<double> result = calc_inner_multithread<__m256,complex<float>,4>(data1, data2, size1);
            //cout << "result=" << result << "\n";
            return Py_BuildValue("D", &result);//"F"ではエラーとなる。
        }
        default:{
            PyErr_SetString(PyExc_TypeError, "type miss match");
            return NULL;
        }
    }
}

以下はpythonへの登録


static PyMethodDef methods[] = {
    {"inner", inner, METH_VARARGS},
    {NULL, NULL}
};

PyDoc_STRVAR(api_doc, "Python3 API sample.\n");

static struct PyModuleDef cmodule = {
   PyModuleDef_HEAD_INIT,
   "mylib",   /* name of module */
   api_doc, /* module documentation, may be NULL */
   -1,       /* size of per-interpreter state of the module,
                or -1 if the module keeps state in global variables. */
   methods
};

EXTERNC void* PyInit_mylib(void)
{
    return PyModule_Create(&cmodule);
}

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?