経緯
唐突に、「pybind11+SIMD命令+スレッドを使えば、numpyを使うより速く計算できるようになるんじゃね?」
と、思い立ち試してみました。
試したこと
まずは簡単そうなnumpy.innerから挑戦
とりあえず、1次元配列同士の内積を求めることしかできません。
使い方
パッケージ名はmylibという名前で、下記のようにコンパイルすると、mylib.cpython-37m-darwin.so という名前のファイルができます。このファイルをカレントに置いておけば、pyファイルからmylibとしてimport可能になります。1次元以外の配列を渡すとどうなるかは試してません。
clang++ -mavx2 -mfma -O3 -Wall -shared -std=c++11 -fPIC `python -m pybind11 --includes` -undefined dynamic_lookup mylib.cpp -o mylib`python3-config --extension-suffix`
Pybind11 C++ のコード
スレッドを4個使用し、SIMD命令を使って書いてあります。
# include <immintrin.h>
# include <type_traits>
# include <thread>
# include <future>
# include <pybind11/pybind11.h>
# include <pybind11/numpy.h>
using namespace std;
namespace py = pybind11;
//T,Uの組み合わせの確認マクロ
# define type_check() static_assert(((is_same<double,U>::value) && (is_same<__m256d,T>::value)) || \
((is_same<float,U>::value) && (is_same<__m256,T>::value)),"type mismatch")
//
//simd組み込み関数をdouble/float両方で使えるようにするための、template関数
//
template <typename T,typename U>
static inline T m256_load(const U* d1){
type_check();
if(is_same<double,U>::value){
return (T)_mm256_load_pd((const double*)d1);
}else{
return (T)_mm256_load_ps((const float*)d1);
}
}
template <typename T,typename U>
static inline void m256_store(U* d1, T reg){
type_check();
if(is_same<double,U>::value){
_mm256_store_pd((double*)d1, (__m256d)reg);
}else{
_mm256_store_ps((float*)d1, (__m256)reg);
}
}
template <typename T>
static inline T m256_fmadd(T d1, T d2, T d3){
if(is_same<__m256d,T>::value){
return (T)_mm256_fmadd_pd((__m256d)d1,(__m256d)d2,(__m256d)d3);
}else{
return (T)_mm256_fmadd_ps((__m256)d1,(__m256)d2,(__m256)d3);
}
}
template <typename T>
static inline T m256_setzerp(){
if(is_same<__m256d,T>::value){
return (T)_mm256_setzero_pd();
}else{
return (T)_mm256_setzero_pd();
}
}
//simd無しの内積を求める関数
template <typename U>
static U calc_inner_nosimd(const U* v1, const U* v2, ssize_t size){
U result = 0;
for(ssize_t i = 0; i < size; i++){
result += v1[i] * v2[i];
}
return result;
}
//simd命令を使って、内積を求める。
template <typename T,typename U>
static U calc_inner(const U* v1, const U* v2, ssize_t size){
type_check();
constexpr int step = (is_same<float,U>::value) ? 8 : 4;//simdレジスタ1個に入るデータの数
if( size < step ){
return calc_inner_nosimd<U>(v1,v2,size);//配列が短すぎる場合には普通に計算
}
auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
size -= remain;
auto sum = m256_setzerp<T>();
for(ssize_t i = 0; i < size; i+= step){
auto d1 = m256_load<T,U>(v1);
auto d2 = m256_load<T,U>(v2);
sum = m256_fmadd<T>(d1,d2,sum);//sum=d1*d2+sum
v1 += step;
v2 += step;
}
U result = calc_inner_nosimd<U>(v1,v2,remain);//4個または8個のあまりの計算を行う。
U results[step];
m256_store<T,U>(results, sum);//計算結果を配列に書き出す。
for(int i=0; i < step; i++){//計算結果の集計
result += results[i];
}
return result;
}
//実験用。使ってない。
template <typename T, typename U>
U inner_single_thread(py::array_t<U> arr1, py::array_t<U> arr2) {
auto size = arr1.size();
auto *d1 = arr1.data();
auto *d2 = arr2.data();
auto result = calc_inner<T,U>(d1,d2,size);
return result;
}
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
U inner(py::array_t<U> arr1, py::array_t<U> arr2) {
auto size1 = arr1.size();
if( size1 != arr2.size() ){
throw invalid_argument("shapes not aligned");//2個の配列の長さが違う。
}
if( size1 < 16 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
return calc_inner<T,U>(arr1.data(), arr2.data(), size1);
}
auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
auto *d1 = arr1.data();
auto *d2 = arr2.data();
vector<future<U>> futures;
if( size > 0 ){
for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
ssize_t start = i * size;
auto d1start = d1 + start;
auto d2start = d2 + start;
futures.push_back(async(launch::async,[d1start,d2start,size](){//別スレッドでcalc_innerを実行
return calc_inner<T,U>(d1start, d2start, size);
}));
}
}
ssize_t start = (NUM_THREADS-1) * size;
U result = calc_inner<T,U>(d1 + start, d2 + start, size + remain);//自スレッド分の計算
for(auto&& f : futures){//他のスレッドの計算結果を集計
result += f.get();
}
return result;
}
PYBIND11_MODULE(mylib, m) {
m.doc() = "pybind11 mylib";
m.def("inner", &inner<__m256d,double, 4>);//double配列の場合
m.def("inner", &inner<__m256,float, 4>);//float配列の場合
}
計測プログラム
mylib.cpython-37m-darwin.so と同じ場所に置いて、実行すれば結果が表示されます。
import numpy
import mylib
import time
# 配列の長さ。1次元の配列です。
maxsize = 256 * 256 * 256 * 16
# 1回の計算にかかる時間を測る。
def laptime(f,vec1,vec2):
t = time.time()
result = f(vec1, vec2)
t = time.time() - t
return t
def print_time_inner(v1, v2):
time_numpy_inner = 0
time_mylib_inner = 0
#warming up
result_numpy_inner = numpy.inner(v1,v2)
result_mylib_inner = mylib.inner(v1,v2)
#できるだけ公平になるように、ループ内で交互に呼ぶ
for i in range(16):
time_mylib_inner += laptime(mylib.inner,v1,v2)
time_numpy_inner += laptime(numpy.inner,v1,v2)
#結果の表示
print("numpy.inner %5f(sec)" % time_numpy_inner, "result=",result_numpy_inner)
print("mylib.inner %5f(sec)" % time_mylib_inner, "result=",result_mylib_inner)
print("double array size=",maxsize)
vector1 = numpy.random.rand(maxsize)
vector2 = numpy.random.rand(maxsize)
print_time_inner(vector1,vector2)
print("float array size=",maxsize)
vector1 = vector1.astype('float32')
vector2 = vector2.astype('float32')
print_time_inner(vector1,vector2)
計測結果
1個目の数字が268435456(256 * 256 * 256 * 16)回を16回ループにかかった時間(秒)です。
double array size= 268435456
numpy.inner 2.596892(sec) result= 67115867.58791062
mylib.inner 2.569095(sec) result= 67115867.58790968
float array size= 268435456
numpy.inner 1.791725(sec) result= 66669584.0
mylib.inner 1.277165(sec) result= 66669976.0
多少はnumpyよりは、少しだけ速くなったようです。
floatで1.35倍
doubleで1.02倍
timeitを使ってない理由
かなり大きな配列を作っているためか、timeitを使うと実行順序によって時間が変化したため。
計測した環境について
numpy 1.16.1
Python 3.7.2
clang++
Apple LLVM version 10.0.0 (clang-1000.11.45.5)
Target: x86_64-apple-darwin18.2.0
Thread model: posix
InstalledDir: /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin
ハードの情報
機種名: MacBook Pro
機種ID: MacBookPro13,2
プロセッサ名: Intel Core i5
プロセッサ速度: 2.9 GHz
プロセッサの個数: 1
コアの総数: 2
二次キャッシュ(コア単位): 256 KB
三次キャッシュ: 4 MB
メモリ: 16 GB