はじめに
新年、あけましておめでとうございます!
初投稿になります!皆様はじめまして、Rafkaと言います。
軽く自己紹介しますと、私は現在大学院生で、数値シミュレーションをやる研究室に所属しております。初めて触った言語はC#で、好きな言語もC#です。まぁ研究では使っていないんですけどね。
最近LAPACKを触る機会があり、その時にその計算速度に感動したので、C#向けのパッケージとか無いかなと探したんですけど、LAPACKのコール名(getrfやgbtrs等)で呼び出せるものが見つからなかったので、自分でラッパーライブラリを作成しました。内部で使用しているライブラリは見つけたんですけどね。できれば同じような事をやりたい人に使ってもらいたいので、この記事では紹介も兼ねて、ライブラリの作成方法と作ったライブラリの使用例を書いていきます。
この記事の概要
この記事を纏めると以下の通りです。
- C++/CLIでラッパー書いたけどLinux(Mono)で呼べないんだけど......
- P/Invokeで呼び出すやつなら上手くいくだろうか?→上手く行った。
- どれぐらい早くなるか、私が書いたC#のコードと速度比較してみた。
ラッピングの対象としたBLASやLAPACKの実装は、Intel® Math Kernel Library(Intel MKL)にしました。
環境
実行環境である私のマシンは以下の様な構成です。
- CPU:Intel Corei7-4790K 4.00GHz
- メモリ:DDR3-1600 16G
- マザーボード:Asus H97-PRO
- GPU:MSI GTX980
- OS:Windows10
使用した環境は
- .NET Framework 4.7.1 (C++/CLIによるラッパー向け)
- .NET Core 2.0.0 (P/Invokeを用いたMultiPlatformラッパー向け)
- Intel® Math Kernel Library 2018.1
IDEはC++/CLI版を書く時はVisualStudio2017を、P/Invoke版を書く時はJetBrainsのRider2017.3を使用しました。
Intel® Math Kernel Libraryとは
Intelが自社製CPU向けに最適化したBLASやLAPACKの実装を含めた様々な数学ルーチンを含んだライブラリです。自分のとこのCPU向けに自前で最適化しているので、IntelのCPUで動かすならOpenBLASなどと比べて速いらしいです。どうせなら一番早くしたいので、今回のラッピングの対象にはMKLを選びました。
MKLをインストールするには
MKLは以前は有料だったようですが、現在は無料で使用することが出来ます。一先ずココに飛んでもらえたら、ページの中央にFree Downloadっていうボタンが有ると思うのでそれを押してもらって、そこから先はページの案内通りにアカウントを登録した後にインストーラがダウンロード出来たと思います。
その後私の場合はPATHを通す必要がありました。システム環境変数に追記したのは以下の3つです。
- ~IntelSWTools\compilers_and_libraries\windows\redist\intel64_win\compiler
- ~IntelSWTools\compilers_and_libraries\windows\redist\intel64_win\mkl
- ~IntelSWTools\compilers_and_libraries\windows\redist\intel64_win\tbb\vc_mt
"~IntelSWTools"の部分は各々のインストールディレクトリに読み替えてください。
ライブラリの実装
MKLに限らず、C#からCやC++で作成されたネイティブコードを呼び出す方法は幾つかありますが、調べてパッとでてくるのは大体次の2つみたいです。
- C++/CLIを用いてネイティブとマネージドの橋渡し的なライブラリを書く。
- 利点:P/Invokeより速い。
- 欠点:C++/CLIを学習する必要がある・.NET Framework Only?
- C#からP/Invokeを用いて呼び出す。
- 利点:C#で完結する。
- 欠点:C++/CLIでラップするより遅い。
せっかく呼び出し先のライブラリが高速なので、呼び出しのオーバーヘッドもなるべく少なくなるようにと思い、C++/CLIを用いてラッパーライブラリの作成を行いました。何しろ書いている時はMonoでも動くと思っていたので......
C++/CLIを用いたラッピングの方針
基本的には、
- 配列は
pin_ptr<Type>
を使って固定してからネイティブに渡す。 -
CBlasRowMajor
などの定数は使いやすいように列挙体にする。 - 事前に宣言する必要のない引数(ピボット用の
ipiv
等)は修飾子out
を指定できるようにする。
という方針でラッピングしていきました。
例として一般行列を倍精度でLU分解するdgetrf
関数のラッピングを説明していきます。
まず、MKLではlapack_int LAPACKE_dgetrf(int matrix_layout , lapack_int m , lapack_int n , double* a , lapack_int lda , lapack_int* ipiv);
と宣言されています。引数はそれぞれ以下の様な意味です。
引数 | 意味 |
---|---|
int matrix_layout |
行列の格納方式。行優先ならLAPACK_ROW_MAJOR 、列優先ならLAPACK_COL_MAJOR が与えられる。 |
lapack_int m |
行列a の行数 |
lapack_int n |
行列a の列数 |
double* a |
行列が格納されている1次元配列a の先頭ポインタ |
lapack_int lda |
行列a のLeading Dimension。行優先ならn 。 |
lapack_int* ipiv |
ピボット選択を行った場合の結果を格納するための配列。 |
int matrix_layout
はLAPACK_ROW_MAJOR
かLAPACK_COL_MAJOR
しか取り得ないので、こういうのは列挙体にした方がいいですね。以下の様な感じでラッピングしました。
#pragma once
#include <mkl.h>
namespace MKLSharp {
public enum class LapackLayout {
RowMajor = LAPACK_ROW_MAJOR,
ColMajor = LAPACK_COL_MAJOR
};
}
lapack_int m
、lapack_int n
やlapack_int lda
はそのままint
で受け渡して大丈夫でしょう。因みにlapack_int
は__int64
で定義されており、__int64
はlong long
となっているので、適切にラッピングするならlapack_int
はlong
が正しいと思います。
double* a
は配列ですが、C#で普通に宣言するとdouble[] a
(C++/CLIに渡すとarray<double> a
)ですが、そのままでは渡せないので、ここでpin_ptr<Type>
の出番です。pin_ptr<double> ptr_a = &a[0]
とすれば、配列a
のGCによる回収を阻止して、ネイティブのポインタとして扱えるようになります。
最後にlapack_int* ipiv
ですが、これはdgetrf
の処理で初めて必要になる配列なので、できれば事前に宣言したりしないで使えたほうがいいと思いました。なぜなら(特にC#7.0以降なら)、
var ipiv = new long[Math.Max(1, Math.Min(m, n))];
Lapack.dgetrf(LapackLayout.RowMajor, m, n, a, lda, ipiv);
と書くよりも、
Lapack.dgetrf(LapackLayout.RowMajor, m, n, a, lda, out var ipiv);
と書けたほうが便利じゃないですか?
ということでココまでを纏めた結果、ラッピングコードは以下のようになりました。
#pragma once
using namespace System;
using namespace System::Runtime::InteropServices;
namespace MKLSharp {
public ref class Lapack {
public:
static __int64 dgetrf(LapackLayout Layout, int m, int n,
array<double>^ a, int lda, [Out]array<__int64>^% ipiv);
}
}
#include "stdafx.h"
#include "Lapack.h"
namespace MKLSharp {
__int64 Lapack::dgetrf(LapackLayout Layout, int m, int n,
array<double>^ a, int lda, [Out]array<__int64>^% ipiv) {
pin_ptr<double> ptr_a = &a[0];
ipiv = gcnew array<__int64>(Math::Max(1, Math::Min(m, n)));
pin_ptr<__int64> ptr_i = &ipiv[0];
auto res = LAPACKE_dgetrf((int)Layout, m, n, ptr_a, lda, ptr_i);
ptr_a = nullptr;
ptr_i = nullptr;
return res;
}
}
こんな感じでラッピングを進めて、このC++/CLI版に関してはfloat
とdouble
に限りますが、Level1~3のBLASとこのページのLAPACKの関数はラッピングが済んでおります。使用サンプルはココに書いてあります。ところで、上に書いたようにこのライブラリは、研究室のPCを使ってMonoで動かそうとした結果動かなかったので、P/Invokeを利用する方向でもラッピングをすることにしました。
P/Invokeを用いたラッピング
せっかく公式でオープンソース化、マルチプラットフォーム化が進んでいるC#なので、Windowsでしか使えないライブラリよりもLinuxやmacOSでも使えるライブラリのほうが良いともいました。色々方法を調べた結果、P/Invokeならどのプラットフォームでも大丈夫そうだったので、これを用いたライブラリも作ることにしました。こっちはLevel1~3のBLASとLAPACKの対称正定値帯行列向けのところまでのラッピングが済んでおります。それ以外にC++/CLI版と若干仕様が変わっており、float
とdouble
の使い分けをメソッド名ではなく、メソッドのオーバーロードで解決するようになっています。
例として、C++/CLI版でも示したgetrf
は以下のようにラッピングしています。
using System.Runtime.InteropServices;
using static System.Math;
namespace SharpMKLStd {
public static class Lapack {
private const string LibPath = "mkl_rt.dll";
[DllImport(LibPath, CallingConvention = CallingConvention.Cdecl, EntryPoint = "LAPACKE_sgetrf")]
public static extern int getrf(LapackLayout Layout, int m, int n, float[] a, int lda, int[] ipiv);
public static int getrf(LapackLayout Layout, int m, int n, float[] a, int lda, out int[] ipiv) {
ipiv = new int[Max(1, Min(m, n))];
return getrf(Layout, m, n, a, lda, ipiv);
}
[DllImport(LibPath, CallingConvention = CallingConvention.Cdecl, EntryPoint = "LAPACKE_dgetrf")]
public static extern int getrf(LapackLayout Layout, int m, int n, double[] a, int lda, int[] ipiv);
public static int getrf(LapackLayout Layout, int m, int n, double[] a, int lda, out int[] ipiv) {
ipiv = new int[Max(1, Min(m, n))];
return getrf(Layout, m, n, a, lda, ipiv);
}
}
}
雑な説明になってしまい恐縮ですが、P/Invokeを利用するこちらのバージョンでは、研究室のLinux環境でも、我が家のWindowsの環境でも動作することが確認できました。無事にLinuxでも動作することが確認できたので、今後はこっちをメインに更新していこうと考えています。次の章で話す使用例も、P/Invoke版をメインに書いていきます。
使用例
この章では実際に作成したライブラリの使用例と、BLASやLAPACKを利用することでどれぐらい速くなるかを、ベクトルの内積とポアソン方程式の求解を題材に調べたので、それらについて書いていきます。実行はRiderのデフォルトのReleaseの設定で行っています。
内積
私のラッパーでは、ddotは以下の文で呼び出せます。
Blas1.dot(n, x, incX, y, incY);
ここでn
はベクトルの要素数で、x
とy
はベクトルを表す配列、incX
とincY
はそれぞれx
とy
のインクリメント幅を指定します。普通は1にします。これの比較対象となるC#のコードは以下のように書きました。
private static double Dot(double[] x, double[] y) {
var res = 0.0;
for (var i = 0; i < x.Length; i++)
res += x[i] * y[i];
return res;
}
これらを用いてそれぞれで計算速度を測定するプログラムを以下のように作成しました。
void CompareTimeDot(int size) {
const int LoopDot = 10000;
var sw = new Stopwatch();
(var x, var y) = GenerateVector();
WriteLine($"Calc dot product by raw C# : size = {size}");
sw.Reset();
var res = 0.0;
for (var i = 0; i < LoopDot; i++) {
sw.Start();
res = Dot(x, y);
sw.Stop();
}
WriteLine($"Result : {res}\tTime : {sw.Elapsed / (double) LoopDot}");
WriteLine($"Calc dot product by BLAS : size = {size}");
sw.Reset();
for (var i = 0; i < LoopDot; i++) {
sw.Start();
res = dot(size, x, 1, y, 1);
sw.Stop();
}
WriteLine($"Result : {res}\tTime : {sw.Elapsed / (double) LoopDot}\n");
(double[] x, double[] y) GenerateVector() {
x = new double[size];
y = new double[size];
for (var i = 0; i < size; i++) {
x[i] = 1.0;
y[i] = 1.0;
}
return (x, y);
}
}
このプログラムを、要素数を10・100・100000と変化させて実行した時の出力を以下に示します。
Calc dot product by raw C# : size = 10
Result : 10 Time : 00:00:00.0000001
Calc dot product by BLAS : size = 10
Result : 10 Time : 00:00:00.0000008
Calc dot product by raw C# : size = 100
Result : 100 Time : 00:00:00.0000001
Calc dot product by BLAS : size = 100
Result : 100 Time : 00:00:00.0000001
Calc dot product by raw C# : size = 100000
Result : 100000 Time : 00:00:00.0000796
Calc dot product by BLAS : size = 100000
Result : 100000 Time : 00:00:00.0000135
表に纏めると以下の様な感じです。
要素数 | C# ($\mu$s) | BLAS ($\mu$s) |
---|---|---|
10 | 0.1 | 0.8 |
100 | 0.1 | 0.1 |
100000 | 79.6 | 13.5 |
要素数が小さい内はラッピングのオーバーヘッドでBLASの方が遅くなっていますが、要素数が十分大きくなると効果がしっかり出ているようです。
ポアソン方程式
ポアソン方程式の詳細については省略させていただきます。LAPACKの速度を知るきっかけになった講義での課題だったので、ただ流用しやすかったというだけなんですけどね。ですのでこれから説明するプログラムには、ベースにしたプログラムがあります。
さて、何においてもLAPACKの呼び出しと比較するためのC#のコードが必要です。ですのでLU分解を行うC#のコードを以下のように書きました。
private static void Decomp(int m, int n, double[] a, int[] ipiv) {
for (var i = 0; i < ipiv.Length; i++) ipiv[i] = i;
for (var i = 0; i < m; i++) {
if (!IsZero(a[i * n + i])) continue;
var t = i;
for (var j = i + 1; j < m; j++)
if (a[t * n + i] < a[j * n + i])
t = j;
for (var j = 0; j < n; j++) Swap(ref a[i], ref a[t]);
ipiv[i] = t;
}
for (var k = 0; k < m; k++) {
for (var i = k + 1; i < m; i++) {
var p = a[i * n + k] / a[k * n + k];
for (var j = k + 1; j < m; j++)
a[i * n + j] -= p * a[k * n + j];
a[i * n + k] = p;
}
}
bool IsZero(double p) => -double.Epsilon * 1e10 < p && p < double.Epsilon * 1e10;
}
そして分解された行列を用いて方程式を解くプログラムを以下のように書きました。
private static void Solve(int m, int n, double[] a, double[] b, int[] ipiv) {
for(var i = 0; i < ipiv.Length; i++)
if (ipiv[i] != i) Swap(ref b[i], ref b[ipiv[i]]);
for (var i = 1; i < b.Length; i++) {
var sum = 0.0;
for (var j = 0; j <= i - 1; j++) sum += a[i * n + j] * b[j];
b[i] -= sum;
}
for (var i = b.Length - 1; i >= 0; i--) {
var sum = 0.0;
for (var j = i + 1; j < b.Length; j++) sum += a[i * n + j] * b[j];
b[i] = (b[i] - sum) / a[i * n + i];
}
}
アルゴリズムを素直に実装しただけなので、あまり速度が出る書き方にはなっていないと思います。すみません。
一方LAPACKの呼び出しは、分解が
Lapack.dgetrf(Layout, m, n, a, lda, ipiv);
で、解くのが
Lapack.dgetrs(Layout, Trans, n, nrhs, a, lda, ipiv, b, ldb);
です。これらは一般行列で利用できる関数なので、この他にも帯行列向けの解法と、対称正定値帯行列向けの解法も試しました。
これらを用いてそれぞれの時間を測定するプログラムを以下のように作成しました。
using System.Diagnostics;
using SharpMKL;
using static ShaprMKL.Blas1;
using static SharpMKL.Lapack;
using static System.Console;
void CompareTimeLU() {
const int LoopLU = 100;
const int M = 49;
const int N = M * M;
const double h = 1.0 / (M + 1);
const double Heat = 4.0;
var aBase = new double[N * N];
var bBase = new double[N];
var ipiv = new int[N];
int generalIndex(int i, int j) => i * N + j;
for (var i = 1; i <= M; i++) {
for (var j = 1; j <= M; j++) {
var k = (i - 1) * M + j - 1;
aBase[generalIndex(k, k)] = 4.0 / (h * h);
if (i > 1) {
var kDown = k - M;
aBase[generalIndex(k, kDown)] = -1.0 / (h * h);
}
if (i < M) {
var kUp = k + M;
aBase[generalIndex(k, kUp)] = -1.0 / (h * h);
}
if (j > 1) {
var kLeft = k - 1;
aBase[generalIndex(k, kLeft)] = -1.0 / (h * h);
}
if (j < M) {
var kRight = k + 1;
aBase[generalIndex(k, kRight)] = -1.0 / (h * h);
}
bBase[k] = Heat;
}
}
var sw = new Stopwatch();
WriteLine("Calc Poisson eq by raw C#.");
sw.Reset();
var res = new double[bBase.Length];
for (var i = 0; i < LoopLU; i++) {
copy(aBase.Length, aBase, 1, out var a, 1);
copy(bBase.Length, bBase, 1, out var b, 1);
sw.Start();
Decomp(N, N, a, ipiv);
Solve(N, N, a, b, ipiv);
sw.Stop();
if (i == LoopLU - 1) copy(b.Length, b, 1, res, 1);
}
WriteLine($"Result : {res[((M + 1) / 2 - 1) * M + M + 1]}\tTime : {sw.Elapsed / (double) LoopLU}");
WriteLine("Calc Poisson eq by LAPACK calls for general matrix.");
sw.Reset();
for (var i = 0; i < LoopLU; i++) {
copy(aBase.Length, aBase, 1, out var a, 1);
copy(bBase.Length, bBase, 1, out var b, 1);
sw.Start();
getrf(LapackLayout.RowMajor, N, N, a, N, ipiv);
getrs(LapackLayout.RowMajor, LapackTranspose.NoTrans, N, 1, a, N, ipiv, b, 1);
sw.Stop();
if (i == LoopLU - 1) copy(b.Length, b, 1, res, 1);
}
WriteLine($"Result : {res[((M + 1) / 2 - 1) * M + M + 1]}\tTime : {sw.Elapsed / (double) LoopLU}");
const int bu = M;
const int bl = M;
const int ldab = 2 * bl + bu + 1;
int bandIndex(int i, int j) => j * ldab + bl + bu + (i - j);
var abBase = new double[ldab * N];
for (var i = 1; i <= M; i++) {
for (var j = 1; j <= M; j++) {
var k = (i - 1) * M + j - 1;
abBase[bandIndex(k, k)] = 4.0 / (h * h);
if (i > 1) {
var kDown = k - M;
abBase[bandIndex(k, kDown)] = -1.0 / (h * h);
}
if (i < M) {
var kUp = k + M;
abBase[bandIndex(k, kUp)] = -1.0 / (h * h);
}
if (j > 1) {
var kLeft = k - 1;
abBase[bandIndex(k, kLeft)] = -1.0 / (h * h);
}
if (j < M) {
var kRight = k + 1;
abBase[bandIndex(k, kRight)] = -1.0 / (h * h);
}
bBase[k] = Heat;
}
}
WriteLine("Calc Poisson eq by LAPACK calls for band matrix.");
sw.Reset();
for (var i = 0; i < LoopLU; i++) {
copy(abBase.Length, abBase, 1, out var ab, 1);
copy(bBase.Length, bBase, 1, out var b, 1);
sw.Start();
gbtrf(LapackLayout.ColumnMajor, N, N, bl, bu, ab, ldab, ipiv);
gbtrs(LapackLayout.ColumnMajor, LapackTranspose.NoTrans, N, bl, bu, 1, ab, ldab, ipiv, b, N);
sw.Stop();
if (i == LoopLU - 1) copy(b.Length, b, 1, res, 1);
}
WriteLine($"Result : {res[((M + 1) / 2 - 1) * M + M + 1]}\tTime : {sw.Elapsed / (double) LoopLU}");
const int ldapb = bl + 1;
int packedIndex(int i, int j) => j * ldapb + i - j;
var apbBase = new double[ldapb * N];
for (var i = 1; i <= M; i++) {
for (var j = 1; j <= M; j++) {
var k = (i - 1) * M + j - 1;
apbBase[packedIndex(k, k)] = 4.0 / (h * h);
if (i > 1) {
var kDown = k - M;
apbBase[packedIndex(k, kDown)] = -1.0 / (h * h);
}
if (j > 1) {
var kLeft = k - 1;
apbBase[packedIndex(k, kLeft)] = -1.0 / (h * h);
}
bBase[k] = Heat;
}
}
WriteLine("Calc Poisson eq by LAPACK calls for positive difinite band matrix.");
sw.Reset();
for (var i = 0; i < LoopLU; i++) {
copy(apbBase.Length, apbBase, 1, out var apb, 1);
copy(bBase.Length, bBase, 1, out var b, 1);
sw.Start();
pbtrf(LapackLayout.ColumnMajor, LapackUpLo.Lower, N, bl, apb, ldapb);
pbtrs(LapackLayout.ColumnMajor, LapackUpLo.Lower, N, bl, 1, apb, ldapb, b, N);
sw.Stop();
if (i == LoopLU - 1) copy(b.Length, b, 1, res, 1);
}
WriteLine($"Result : {res[((M + 1) / 2 - 1) * M + M + 1]}\tTime : {sw.Elapsed / (double) LoopLU}");
}
これを実際に実行した時の出力を以下に示します。
Calc Poisson eq by raw C#.
Result : 0.0508289330017333 Time : 00:00:05.7561797
Calc Poisson eq by LAPACK calls for general matrix.
Result : 0.0508289330017335 Time : 00:00:00.1399642
Calc Poisson eq by LAPACK calls for band matrix.
Result : 0.0508289330017333 Time : 00:00:00.0095349
Calc Poisson eq by LAPACK calls for positive difinite band matrix.
Result : 0.0508289330017334 Time : 00:00:00.0014153
表に纏めると以下の様な感じです。
解法 | 実行時間 (ms) |
---|---|
自作のC#プログラム | 5756.1797 |
一般行列用LAPACK | 139.9642 |
帯行列用LAPACK | 9.5349 |
対称正定値帯行列用LAPACK | 1.4153 |
一応期待通りの速度がでているようで、満足です。
このベンチマークプログラムの全体はココにあります。
おわりに
いかがでしたでしょうか。思っていたより長い記事になってしまって、ここまで読んでいただいてありがとうございます。確かにC#から利用できるようになっていて、要素数が大きい行列やベクトルに対してはパフォーマンスが発揮できていると思います。
ところでこういうライブラリってどの程度需要があるんでしょうかね。今後の開発の予定としては、
- nugetパッケージで公開する
- FFTなどのMKLにある他の機能もラッピングする
というのを考えています。
他に現在の実装で気に入らない部分が幾つかありまして、
- MKLのファイルパスを"mkl_rt.dll"と直打ちしている。
- 複素数バージョンをラッピングできていない。
1つ目については、例えばLinuxなら"libmkl_rt.so"なので、私が試した時は"mkl_rt.dll"の名前でシンボリックリンクを作成しました。後から調べてわかったのですが、DllImportのファイルパスはかなり柔軟に探索してもらえるらしいので、もしかしたらリンクを作成しなくてもうまくいくかもしれません?正月明けたら試してみようと思います。
2つ目については、一筋縄ではいかない気がしたので後回しにしているというのが現状です。できればC#に標準であるComplex
構造体を利用したいのですが、メンバがfloat
のものとdouble
の複素数構造体を用意して、それぞれにSystem.Numerics.Complex
構造体へのキャスト演算子のオーバーロードを実装すれば行けるでしょうか。これもとりあえず試してみた方が良さそうですね。
何にせよ自分にも需要があるので開発は続けていきます。もし「こういうの探してたんだよ~」って方が居ましたら是非使っていただいて、納得行くものでしたら嬉しいです。他にも「こういう書き方もあるぜ!」とか「ここはこう書いたほうが行儀が良い」等あれば指摘していただけると助かります。非常に勉強になるので。GitHubへのプルリクエストも大歓迎です!
参考にしたところ
ネイティブコードのラッピングに際しては、以下のサイトを参考にさせていただきました。