pybind11 + SIMD命令でnumpyを超えることができるか?の続き
今度はnumpy.sumに挑戦
Pybind11 C++ のコード
(1)のC++にsumを追加しました。
全部の要素を足します。
相変わらず、対応するのは1次元配列のみ。
使うスレッドは4個に固定。
# include <immintrin.h>
# include <type_traits>
# include <thread>
# include <future>
# include <pybind11/pybind11.h>
# include <pybind11/numpy.h>
# include <iostream>
using namespace std;
namespace py = pybind11;
//T,Uの組み合わせの確認マクロ
# define type_check() static_assert(((is_same<double,U>::value) && (is_same<__m256d,T>::value)) || \
((is_same<float,U>::value) && (is_same<__m256,T>::value)),"type mismatch")
//
//simd組み込み関数をdouble/float両方で使えるようにするための、template関数
//
template <typename T,typename U>
static inline T m256_load(const U* d1){
type_check();
if(is_same<double,U>::value){
return (T)_mm256_load_pd((const double*)d1);
}else{
return (T)_mm256_load_ps((const float*)d1);
}
}
template <typename T,typename U>
static inline void m256_store(U* d1, T reg){
type_check();
if(is_same<double,U>::value){
_mm256_store_pd((double*)d1, (__m256d)reg);
}else{
_mm256_store_ps((float*)d1, (__m256)reg);
}
}
template <typename T>
static inline T m256_fmadd(T d1, T d2, T d3){
if(is_same<__m256d,T>::value){
return (T)_mm256_fmadd_pd((__m256d)d1,(__m256d)d2,(__m256d)d3);
}else{
return (T)_mm256_fmadd_ps((__m256)d1,(__m256)d2,(__m256)d3);
}
}
template <typename T>
static inline T m256_add(T d1, T d2){
if(is_same<__m256d,T>::value){
return (T)_mm256_add_pd((__m256d)d1,(__m256d)d2);
}else{
return (T)_mm256_add_ps((__m256)d1,(__m256)d2);
}
}
template <typename T>
static inline T m256_setzerp(){
if(is_same<__m256d,T>::value){
return (T)_mm256_setzero_pd();
}else{
return (T)_mm256_setzero_pd();
}
}
//simd無しの内積を求める関数
template <typename U>
static U calc_inner_nosimd(const U* v1, const U* v2, ssize_t size){
U result = 0;
for(ssize_t i = 0; i < size; i++){
result += v1[i] * v2[i];
}
return result;
}
//simd命令を使って、内積を求める。
template <typename T,typename U>
static U calc_inner(const U* v1, const U* v2, ssize_t size){
type_check();
constexpr int step = (is_same<float,U>::value) ? 8 : 4;//simdレジスタ1個に入るデータの数
if( size < step ){
return calc_inner_nosimd<U>(v1,v2,size);//配列が短すぎる場合には普通に計算
}
auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
size -= remain;
auto sum = m256_setzerp<T>();
for(ssize_t i = 0; i < size; i+= step){
auto d1 = m256_load<T,U>(v1);
auto d2 = m256_load<T,U>(v2);
sum = m256_fmadd<T>(d1,d2,sum);//sum=d1*d2+sum
v1 += step;
v2 += step;
}
U result = calc_inner_nosimd<U>(v1,v2,remain);//4個または8個のあまりの計算を行う。
U results[step];
m256_store<T,U>(results, sum);//計算結果を配列に書き出す。
for(int i=0; i < step; i++){//計算結果の集計
result += results[i];
}
return result;
}
//実験用。使ってない。
template <typename T, typename U>
U inner_single_thread(py::array_t<U> arr1, py::array_t<U> arr2) {
auto size = arr1.size();
auto *d1 = arr1.data();
auto *d2 = arr2.data();
auto result = calc_inner<T,U>(d1,d2,size);
return result;
}
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
U inner(py::array_t<U> arr1, py::array_t<U> arr2) {
auto size1 = arr1.size();
if( size1 != arr2.size() ){
throw invalid_argument("shapes not aligned");//2個の配列の長さが違う。
}
if( size1 < 16 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
return calc_inner<T,U>(arr1.data(), arr2.data(), size1);
}
auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
auto *d1 = arr1.data();
auto *d2 = arr2.data();
vector<future<U>> futures;
if( size > 0 ){
for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
ssize_t start = i * size;
auto d1start = d1 + start;
auto d2start = d2 + start;
futures.push_back(async(launch::async,[d1start,d2start,size](){//別スレッドでcalc_innerを実行
return calc_inner<T,U>(d1start, d2start, size);
}));
}
}
ssize_t start = (NUM_THREADS-1) * size;
U result = calc_inner<T,U>(d1 + start, d2 + start, size + remain);//自スレッド分の計算
for(auto&& f : futures){//他のスレッドの計算結果を集計
result += f.get();
}
return result;
}
//simd命令を使って、和を求める。
template <typename T, typename U>
static U calc_sum(const U* v1, ssize_t size){
type_check();
constexpr int step = (is_same<float,U>::value) ? 8 : 4;//simdレジスタ1個に入るデータの数
auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
size -= remain;
auto sum = m256_setzerp<T>();
for(ssize_t i = 0; i < size; i+= step){
auto d1 = m256_load<T,U>(v1);
sum = m256_add<T>(d1,sum);//sum=d1+sum
v1 += step;
}
U result = 0;
for(int i = 0; i < remain; i++ ){
result += *v1;//4個または8個のあまりの計算を行う。
}
U results[step];
m256_store<T,U>(results, sum);//計算結果を配列に書き出す。
for(int i=0; i < step; i++){//計算結果の集計
result += results[i];
}
return result;
}
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
U sum(py::array_t<U> arr1) {
type_check();
auto size1 = arr1.size();
if( size1 < 16 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
return calc_sum<T,U>(arr1.data(), size1);
}
auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
auto *d1 = arr1.data();
vector<future<U>> futures;
if( size > 0 ){
for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
ssize_t start = i * size;
auto d1start = d1 + start;
futures.push_back(async(launch::async,[d1start,size](){//別スレッドでcalc_innerを実行
return calc_sum<T,U>(d1start, size);
}));
}
}
ssize_t start = (NUM_THREADS-1) * size;
U result = calc_sum<T,U>(d1 + start, size + remain);//自スレッド分の計算
for(auto&& f : futures){//他のスレッドの計算結果を集計
result += f.get();
}
return result;
}
PYBIND11_MODULE(mylib, m) {
m.doc() = "pybind11 mylib";
m.def("inner", &inner<__m256d,double, 4>);//double配列の場合
m.def("inner", &inner<__m256,float, 4>);//float配列の場合
m.def("sum", &sum<__m256d,double, 4>);//float配列の場合
m.def("sum", &sum<__m256,float, 4>);//float配列の場合
}
計測プログラム
import numpy
import mylib
import time
import timeit
# 配列の長さ。1次元の配列です。
maxsize = 256 * 256 * 256 * 16
# 1回の計算にかかる時間を測る。
def laptime1(f,vec1):
t = time.time()
result = f(vec1)
t = time.time() - t
return t
def print_time_sum(v1):
time_numpy = 0
time_mylib = 0
#warming up
result_numpy = numpy.sum(v1)
result_mylib = mylib.sum(v1)
#できるだけ公平になるように、ループ内で交互に呼ぶ
for i in range(16):
time_mylib += laptime1(mylib.sum,v1)
time_numpy += laptime1(numpy.sum,v1)
#結果の表示
print("numpy.sum %5f(sec)" % time_numpy, "result=",result_numpy)
print("mylib.sum %5f(sec)" % time_mylib, "result=",result_mylib)
print("time ratio %5f" % (time_numpy/time_mylib))
print("double array sum size=",maxsize)
vector1 = numpy.random.rand(maxsize)
print_time_sum(vector1)
print("float array sum size=",maxsize)
vector1 = vector1.astype('float32')
print_time_sum(vector1)
計測結果
予想より速くなりました。
1.5倍
double array sum size= 268435456
numpy.sum 1.900917(sec) result= 134216471.97020514
mylib.sum 1.252315(sec) result= 134216471.97020647
time ratio 1.517922
2倍
float array sum size= 268435456
numpy.sum 1.368877(sec) result= 134216210.0
mylib.sum 0.651863(sec) result= 134217168.0
time ratio 2.099944
こんなに速くなるのは、ちょっと意外です。
何かの間違いじゃないか不安になってきました。