LoginSignup
1
2

More than 5 years have passed since last update.

pybind11 + SIMD命令でnumpyを超えることができるか?

Last updated at Posted at 2019-03-16

経緯

唐突に、「pybind11+SIMD命令+スレッドを使えば、numpyを使うより速く計算できるようになるんじゃね?」
と、思い立ち試してみました。

試したこと

まずは簡単そうなnumpy.innerから挑戦
とりあえず、1次元配列同士の内積を求めることしかできません。

使い方

パッケージ名はmylibという名前で、下記のようにコンパイルすると、mylib.cpython-37m-darwin.so という名前のファイルができます。このファイルをカレントに置いておけば、pyファイルからmylibとしてimport可能になります。1次元以外の配列を渡すとどうなるかは試してません。

clang++ -mavx2 -mfma -O3 -Wall -shared -std=c++11 -fPIC `python -m pybind11 --includes` -undefined dynamic_lookup mylib.cpp -o mylib`python3-config --extension-suffix`

Pybind11 C++ のコード

スレッドを4個使用し、SIMD命令を使って書いてあります。

#include <immintrin.h>
#include <type_traits>
#include <thread>
#include <future>
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>

using namespace std;
namespace py = pybind11;

//T,Uの組み合わせの確認マクロ
#define type_check() static_assert(((is_same<double,U>::value) && (is_same<__m256d,T>::value)) || \
    ((is_same<float,U>::value) && (is_same<__m256,T>::value)),"type mismatch")

//
//simd組み込み関数をdouble/float両方で使えるようにするための、template関数
//
template <typename T,typename U>
static inline T m256_load(const U* d1){
    type_check();
    if(is_same<double,U>::value){
        return (T)_mm256_load_pd((const double*)d1);
    }else{
        return (T)_mm256_load_ps((const float*)d1);
    }
}
template <typename T,typename U>
static inline void m256_store(U* d1, T reg){
    type_check();
    if(is_same<double,U>::value){
        _mm256_store_pd((double*)d1, (__m256d)reg);
    }else{
        _mm256_store_ps((float*)d1, (__m256)reg);
    }
}
template <typename T>
static inline T m256_fmadd(T d1, T d2, T d3){
    if(is_same<__m256d,T>::value){
        return (T)_mm256_fmadd_pd((__m256d)d1,(__m256d)d2,(__m256d)d3);
    }else{
        return (T)_mm256_fmadd_ps((__m256)d1,(__m256)d2,(__m256)d3);
    }
}
template <typename T>
static inline T m256_setzerp(){
    if(is_same<__m256d,T>::value){
        return (T)_mm256_setzero_pd();
    }else{
        return (T)_mm256_setzero_pd();
    }
}
//simd無しの内積を求める関数
template <typename U>
static U calc_inner_nosimd(const U* v1, const U* v2, ssize_t size){
    U result = 0;
    for(ssize_t i = 0; i < size; i++){
        result += v1[i] * v2[i];
    }
    return result;
}
//simd命令を使って、内積を求める。
template <typename T,typename U>
static U calc_inner(const U* v1, const U* v2, ssize_t size){
    type_check();
    constexpr int step = (is_same<float,U>::value) ? 8 : 4;//simdレジスタ1個に入るデータの数
    if( size < step ){
        return calc_inner_nosimd<U>(v1,v2,size);//配列が短すぎる場合には普通に計算
    }
    auto remain = size % step;//simdに入りきらないあまり分。後で別に計算する。
    size -= remain;
    auto sum = m256_setzerp<T>();
    for(ssize_t i = 0; i < size; i+= step){
        auto d1 = m256_load<T,U>(v1);
        auto d2 = m256_load<T,U>(v2);
        sum = m256_fmadd<T>(d1,d2,sum);//sum=d1*d2+sum
        v1 += step;
        v2 += step;
    }
    U result = calc_inner_nosimd<U>(v1,v2,remain);//4個または8個のあまりの計算を行う。

    U results[step];
    m256_store<T,U>(results, sum);//計算結果を配列に書き出す。
    for(int i=0; i < step; i++){//計算結果の集計
        result += results[i];
    }
    return result;
}
//実験用。使ってない。
template <typename T, typename U>
U inner_single_thread(py::array_t<U> arr1, py::array_t<U> arr2) {
    auto size = arr1.size();
    auto *d1 = arr1.data();
    auto *d2 = arr2.data();
    auto result = calc_inner<T,U>(d1,d2,size);
    return result;
}
//NUM_THREADSの数のスレッドに計算を割り当てて計算する関数。
//future/asyncを使用
template <typename T, typename U, unsigned NUM_THREADS>
U inner(py::array_t<U> arr1, py::array_t<U> arr2) {
    auto size1 = arr1.size();
    if( size1 != arr2.size() ){
        throw invalid_argument("shapes not aligned");//2個の配列の長さが違う。
    }
    if( size1 < 16 ){//配列が短い場合にはスレッドを起こさない。この数は環境によって要調整
        return calc_inner<T,U>(arr1.data(), arr2.data(), size1);
    }
    auto remain = size1 % NUM_THREADS;//スレッドに割り振れないあまり。
    auto size = size1 / NUM_THREADS;//スレッド1個あたりの計算数
    auto *d1 = arr1.data();
    auto *d2 = arr2.data();
    vector<future<U>> futures;
    if( size > 0 ){
        for(unsigned i = 0; i < NUM_THREADS-1; i++){//自分を除いた数のスレッドを実行
            ssize_t start = i * size;
            auto d1start = d1 + start;
            auto d2start = d2 + start;
            futures.push_back(async(launch::async,[d1start,d2start,size](){//別スレッドでcalc_innerを実行
                return calc_inner<T,U>(d1start, d2start, size);
            }));
        }
    }
    ssize_t start = (NUM_THREADS-1) * size;
    U result = calc_inner<T,U>(d1 + start, d2 + start, size + remain);//自スレッド分の計算
    for(auto&& f : futures){//他のスレッドの計算結果を集計
        result += f.get();
    }
    return result;
}

PYBIND11_MODULE(mylib, m) {
    m.doc() = "pybind11 mylib";
    m.def("inner", &inner<__m256d,double, 4>);//double配列の場合
    m.def("inner", &inner<__m256,float, 4>);//float配列の場合
}

計測プログラム

mylib.cpython-37m-darwin.so と同じ場所に置いて、実行すれば結果が表示されます。

import numpy
import mylib
import time

#配列の長さ。1次元の配列です。
maxsize = 256 * 256 * 256 * 16

#1回の計算にかかる時間を測る。
def laptime(f,vec1,vec2):
    t = time.time()
    result = f(vec1, vec2)
    t = time.time() - t
    return t

def print_time_inner(v1, v2):
    time_numpy_inner = 0
    time_mylib_inner = 0

    #warming up
    result_numpy_inner = numpy.inner(v1,v2)
    result_mylib_inner = mylib.inner(v1,v2)
    #できるだけ公平になるように、ループ内で交互に呼ぶ
    for i in range(16):
        time_mylib_inner += laptime(mylib.inner,v1,v2)
        time_numpy_inner += laptime(numpy.inner,v1,v2)
    #結果の表示
    print("numpy.inner %5f(sec)" % time_numpy_inner,      "result=",result_numpy_inner)
    print("mylib.inner %5f(sec)" % time_mylib_inner,      "result=",result_mylib_inner)

print("double array size=",maxsize)
vector1 = numpy.random.rand(maxsize)
vector2 = numpy.random.rand(maxsize)
print_time_inner(vector1,vector2)

print("float array size=",maxsize)
vector1 = vector1.astype('float32')
vector2 = vector2.astype('float32')
print_time_inner(vector1,vector2)

計測結果

1個目の数字が268435456(256 * 256 * 256 * 16)回を16回ループにかかった時間(秒)です。

double array size= 268435456
numpy.inner 2.596892(sec) result= 67115867.58791062
mylib.inner 2.569095(sec) result= 67115867.58790968
float array size= 268435456
numpy.inner 1.791725(sec) result= 66669584.0
mylib.inner 1.277165(sec) result= 66669976.0

多少はnumpyよりは、少しだけ速くなったようです。
floatで1.35倍
doubleで1.02倍

timeitを使ってない理由

かなり大きな配列を作っているためか、timeitを使うと実行順序によって時間が変化したため。

計測した環境について

numpy           1.16.1
Python 3.7.2

clang++
Apple LLVM version 10.0.0 (clang-1000.11.45.5)
Target: x86_64-apple-darwin18.2.0
Thread model: posix
InstalledDir: /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin

ハードの情報

  機種名:    MacBook Pro
  機種ID: MacBookPro13,2
  プロセッサ名:   Intel Core i5
  プロセッサ速度:    2.9 GHz
  プロセッサの個数: 1
  コアの総数:  2
  二次キャッシュ(コア単位):  256 KB
  三次キャッシュ:    4 MB
  メモリ:    16 GB
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2