pybind11 + SIMD命令でnumpyを超えることができるか?(2)の続き
勉強不足の為か、PyBind11ではcomplex型をC++に渡すことができなかったので、諦めて普通に書きました。
変更点
gitのソースはこちら(SHA 744b00c2f81aca7f8a7aad51204775f676f8493c
複素数の内積に対応してみました。
メモリのアラインメントのチェックとかしてないので、環境によってはうまく動かないかもしれません。
また多次元配列の内積は計算方法が良く分からなかったので、対応していません。
どなたか計算方法をご存知でしたら、コメント頂けると幸いです。
速度計測
numpytest.pyの実行結果の一部
time ratioが速くなった割合になります。
だいたい速くなりました。特にfloatは1.3倍ぐらいに。
ただ、繰り返し回数が多い為か、numpyでもfloatでは誤差が大きいです。
array size = 134217728
double array inner
numpy.inner 2.665707(sec) result= 33549279.989558447
mylib.inner 2.639468(sec) result= 33549279.98955811
time ratio 1.009941
float array inner
numpy.inner 1.814914(sec) result= 33485106.0
mylib.inner 1.285761(sec) result= 33484994.0
time ratio 1.411549
complex double array inner
numpy.inner 6.509132(sec) result= (-3135.1374241635203+67107660.49349004j)
mylib.inner 5.030735(sec) result= (-3135.137423543725+67107660.493492335j)
time ratio 1.293873
complex float array inner
numpy.inner 3.350025(sec) result= (-3396+66661340j)
mylib.inner 2.547152(sec) result= (-3043.25+66661356j)
time ratio 1.315204
一応、ソース貼っておきます。
ソース貼るだけだと寂しいので、ソースの説明を追加しました。
mylib.cpp
全ソースはgithubを参照してください。
T,Uの組み合わせが間違ってないことを確認するマクロ。
//T,Uの組み合わせの確認マクロ
# define type_check() \
static_assert(((is_same<double,U>::value) && (is_same<__m256d,T>::value)) || \
((is_same<float,U>::value) && (is_same<__m256,T>::value)) ||\
((is_same<complex<float>,U>::value) && (is_same<__m256,T>::value)) ||\
((is_same<complex<double>,U>::value) && (is_same<__m256d,T>::value))\
,"type mismatch")
YMMレジスタにメモリから読み込む。
オーバーロードで型によって、違う関数が呼ばれます。
static inline auto m256_load(const double* d1){
return _mm256_load_pd(d1);
}
static inline auto m256_load(const complex<double>* d1){
return _mm256_load_pd((const double*)d1);
}
static inline auto m256_load(const float* d1){
return _mm256_load_ps(d1);
}
static inline auto m256_load(const complex<float>* d1){
return _mm256_load_ps((const float*)d1);
}
積和計算
これもオーバロードのためにある関数。
static inline __m256d m256_fmadd(__m256d d1, __m256d d2, __m256d d3){
return _mm256_fmadd_pd(d1,d2,d3);
}
static inline __m256 m256_fmadd(__m256 d1, __m256 d2, __m256 d3){
return _mm256_fmadd_ps(d1,d2,d3);
}
水平加算、水平減算
これもオーバロードのためにある関数。
static inline __m256d m256_hsub(__m256d d1, __m256d d2){
return _mm256_hsub_pd(d1,d2);
}
static inline __m256 m256_hsub(__m256 d1, __m256 d2){
return _mm256_hsub_ps(d1,d2);
}
static inline __m256d m256_hadd(__m256d d1, __m256d d2){
return _mm256_hadd_pd(d1,d2);
}
static inline __m256 m256_hadd(__m256 d1, __m256 d2){
return _mm256_hadd_ps(d1,d2);
}
実部と虚部を揃える為の関数。
complex<double> の場合にし使わない。
complex<float>ではうまく組み立てられなかった。
static inline __m256d m256_blend(__m256d d1, __m256d d2){
return _mm256_blend_pd(d1,d2,0xa);
}
YMMレジスタに0を入れる為の関数。
template <typename T>
static inline T m256_setzerp(){
if(is_same<__m256d,T>::value){
return (T)_mm256_setzero_pd();
}else{
return (T)_mm256_setzero_ps();
}
}
SIMD命令で計算できない終端部分の積和計算を行う。
普通に積和する。
//simd無しの内積を求める関数
template <typename U>
static U calc_inner_nosimd(const U* v1, const U* v2, ssize_t size){
U result = 0;
//cout << "calc_inner_nosimd " << size <<"\n";
for(ssize_t i = 0; i < size; i++){
//cout << i << " " << v1[i] << "*" << v2[i] << "=" << r << "\n";
result += v1[i] * v2[i];
}
return result;
}
実部と虚部を別々にSIMDで積和計算する。
虚部はpermuteで順番を入れ替えて積和。
複素数の掛け算は下記のようになる。
(a+bj)(c+dJ) = ac-bd + (ad+bd)j
速度向上のため、sum_realにはabとcdを、sum_imagにはadとbdを別々に求めて足していき、最後に全部を足すことにした。
例 complex<double>の場合
SIMD計算1回分のメモリには2個の複素数が下のように格納されている。
- v1
real[0] | imag[0] | real[1] | imag[1] |
---|---|---|---|
a[1] | b[1] | a[0] | b[0] |
- v2
real[0] | imag[0] | real[1] | imag[1] |
---|---|---|---|
c[1] | d[1] | c[0] | d[0] |
で、これを1回積和計算すると、sum_real/sum_imagには下のような値が入る。
途中で足したりすると計算量が増えるので、このまま四個の値を別々に足していき、最後に四個の値を入れ替えたり、足したりして答えを得る。
sum_real[3] | sum_real[2] | sum_real[1] | sum_real[0] |
---|---|---|---|
a[1]*c[1] | b[1]*d[1] | a[0]*c[0] | b[0]*d[0] |
sum_imag[3] | sum_imag[2] | sum_imag[1] | sum_imag[0] |
---|---|---|---|
a[1]*d[1] | c[1]*d[1] | a[0]*d[0] | c[0]*d[0] |
//実数部の積和計算
static auto muladd(const __m256d d1, const __m256d d2, __m256d sum){
return m256_fmadd(d1,d2,sum);//sum=d1*d2+sum
}
static auto muladd(const __m256 d1, const __m256 d2, __m256 sum){
return m256_fmadd(d1,d2,sum);//sum=d1*d2+sum
}
//虚数部の積和計算
//配列の順序を入れ替えて積和計算を行う。
//dummyは型を決定するための引数。
//doubleとfloatは虚数部がないので、そのままの値を返す。最適化で消えて無くなる。
static inline auto muladd_imag(const __m256d d1, const __m256d d2, __m256d sum, const complex<double>* dummy){
auto d = _mm256_permute_pd(d1, 0x5);
return m256_fmadd(d,d2,sum);
}
static inline auto muladd_imag(const __m256 d1, const __m256 d2, __m256 sum, const complex<float>* dummy){
auto d = _mm256_permute_ps(d1, 0xb1);
return m256_fmadd(d,d2,sum);
}
static inline auto muladd_imag(const __m256d d1, const __m256d d2, __m256d sum, const double* dummy){
return sum;
}
static inline auto muladd_imag(const __m256 d1, const __m256 d2, __m256 sum, const float* dummy){
return sum;
}
最後に足して答えを得る。
SIMDレジスタで別になった実部と虚部を合わせる。型によって合わせ方と足し方が違う。
//YMM regから答え取り出す。型によって結果の入り方が違うので、別関数になる。
static inline auto getResult(const double sum, const __m256d sum_real, const __m256d sum_imag){
const double* results = (const double*)&sum_real;
auto result = sum + results[0] + results[1] + results[2] + results[3];
return result;
}
static inline auto getResult(const complex<double> sum, const __m256d sum_real, const __m256d sum_imag){
auto real = m256_hsub(sum_real, sum_real);
auto imag = m256_hadd(sum_imag, sum_imag);
auto result_complex = m256_blend(real, imag);
const complex<double>* results = (const complex<double>*)&result_complex;
return sum + results[0] + results[1];
}
static inline auto getResult(const float sum, const __m256 sum_real, const __m256 sum_imag){
const float* results = (const float*)&sum_real;
auto result = sum + results[0] + results[1] + results[2] + results[3]
+ results[4] + results[5] + results[6] + results[7];
return result;
}
static inline auto getResult(const complex<float> sum, const __m256 sum_real, const __m256 sum_imag){
auto result_real = m256_hsub(sum_real, sum_real);
auto result_imag = m256_hadd(sum_imag, sum_imag);
const float* real = (const float*)&result_real;
const float* imag = (const float*)&result_imag;
return sum + complex<float>(real[0] + real[1] + real[4] + real[5],
imag[0] + imag[1] + imag[4] + imag[5]);
}
SIMD積和計算のメインループ。
//simd命令を使って、内積を求める。
template <typename T,typename U>
static U calc_inner(const U* v1, const U* v2, ssize_t size){
type_check();
//cout << "calc_inner size " << size << "\n";
constexpr int step = sizeof(T) / sizeof(U);
static_assert(step > 0,"illeagal type");
//cout << "step " << typeid(U).name() << step << "\n";
//simdレジスタ1個に入るデータの数
if( size < step ){
return calc_inner_nosimd<U>(v1,v2,size);//配列が短すぎる場合には普通に計算
}
auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
size -= remain;
auto sum_real = m256_setzerp<T>();
T sum_imag = sum_real;//虚数部。double/floatの計算の際にはこの変数は使われない為、最適化で消える。
for(ssize_t i = 0; i < size; i+= step){
//cout << " i " << i << "\n";
auto d1 = m256_load(v1);
auto d2 = m256_load(v2);
sum_real = muladd(d1,d2,sum_real);
sum_imag = muladd_imag(d1,d2,sum_imag, v1);
v1 += step;
v2 += step;
}
U result = calc_inner_nosimd<U>(v1,v2,remain);//4個または8個のあまりの計算を行う。
result = getResult(result, sum_real, sum_imag);
return result;
}
親スレッド関数。
NUM_THREAD-1の数のスレッドを起動して、計算を割り振りスレッドの終了を待ってから結果を得る。
ここの32のマジックナンバーは調整が必要。わざわざスレッドを起こすべきかどうかの判定のループ回数。
if( size1 < 32 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
static U calc_inner_multithread(const U* d1, const U* d2, ssize_t size1) {
//cout << "calc_inner_multithread size " << size1 << "\n";
if( size1 < 32 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
return calc_inner<T,U>(d1, d2, size1);
}
auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
vector<future<U>> futures;
if( size > 0 ){
for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
ssize_t start = i * size;
auto d1start = d1 + start;
auto d2start = d2 + start;
futures.push_back(async(launch::async,[d1start,d2start,size](){//別スレッドでcalc_innerを実行
return calc_inner<T,U>(d1start, d2start, size);
}));
}
}
ssize_t start = (NUM_THREADS-1) * size;
U result = calc_inner<T,U>(d1 + start, d2 + start, size + remain);//自スレッド分の計算
for(auto&& f : futures){//他のスレッドの計算結果を集計
result += f.get();
}
return result;
}
innerの本体。
渡されてくる配列の種類によって、積和計算の関数を呼び分け、結果をpythonに返す。
何故か答えをcomplex64つまり"F"で返すと、エラーとなった。
こんな結果に
SystemError: bad format char passed to Py_BuildValue
(どなたか理由がわかる方いらっしゃいませんか?)
仕方ないので、complex128="D"でpythonに答えを返している。
この行です。
return Py_BuildValue("D", &result);//"F"ではエラーとなる。
したがって、complex64配列の内積は、complex128で帰ってくる。
static PyObject* inner(PyObject *self, PyObject *args)
{
PyArrayObject *array1,*array2;
if (!PyArg_ParseTuple(args, "OO", &array1, &array2 )){
PyErr_SetString(PyExc_ValueError, "not array");
return NULL;
}
auto size1 = array1->dimensions[0];
auto size2 = array2->dimensions[0];
if( array1->nd != 1 || array2->nd != 1 || size1 != size2 ){
PyErr_SetString(PyExc_IndexError, "shapes not aligned");
return NULL;
}
if( array1->descr->type != array2->descr->type ){
PyErr_SetString(PyExc_IndexError, "type miss match");
return NULL;
}
//cout << (array1->descr->type) << "\n";
switch(array1->descr->type ){
case 'd':{
const double* data1 = (const double*)array1->data;
const double* data2 = (const double*)array2->data;
auto result = calc_inner_multithread<__m256d,double,4>(data1, data2, size1);
//cout << "result=" << result << "\n";
return Py_BuildValue("d", result);
}
case 'D':{
const complex<double>* data1 = (const complex<double>*)array1->data;
const complex<double>* data2 = (const complex<double>*)array2->data;
auto result = calc_inner_multithread<__m256d,complex<double>,4>(data1, data2, size1);
//cout << "result=" << result << "\n";
return Py_BuildValue("D", &result);
}
case 'f':{
const float* data1 = (const float*)array1->data;
const float* data2 = (const float*)array2->data;
auto result = calc_inner_multithread<__m256,float,4>(data1, data2, size1);
//cout << "result=" << result << "\n";
return Py_BuildValue("f", result);
}
case 'F':{
const complex<float>* data1 = (const complex<float>*)array1->data;
const complex<float>* data2 = (const complex<float>*)array2->data;
complex<double> result = calc_inner_multithread<__m256,complex<float>,4>(data1, data2, size1);
//cout << "result=" << result << "\n";
return Py_BuildValue("D", &result);//"F"ではエラーとなる。
}
default:{
PyErr_SetString(PyExc_TypeError, "type miss match");
return NULL;
}
}
}
以下はpythonへの登録
static PyMethodDef methods[] = {
{"inner", inner, METH_VARARGS},
{NULL, NULL}
};
PyDoc_STRVAR(api_doc, "Python3 API sample.\n");
static struct PyModuleDef cmodule = {
PyModuleDef_HEAD_INIT,
"mylib", /* name of module */
api_doc, /* module documentation, may be NULL */
-1, /* size of per-interpreter state of the module,
or -1 if the module keeps state in global variables. */
methods
};
EXTERNC void* PyInit_mylib(void)
{
return PyModule_Create(&cmodule);
}