(この記事は「fukuoka.ex x ザキ研 Advent Calendar 2017」の18日目です)
昨日は @zumin さんの「FlowのStage数を増やして実行してみた」でした。でした。
はじめに
前回のまとめはこちらでした。
- Rustler 0.17.1 へのアップグレードにより,
Nif
プレフィックスがほとんど不要になりました。NifResult
だけは依然として必要です。 - Flow 0.14.0 へのアップグレードにより,Window で指定する
:reset
とmap_state
が廃止されて,代わりにon_trigger
で調整するようになりました。 - バージョンアップによる速度向上はほとんどないか,あったとして気持ち速くなったかぐらいです。
今回は3期に渡って行ってきた「季節外れのアドベントカレンダー」最終回として,ベンチマーク高速化の試みをやってみます。なんと豪華3本立てです!
第1弾は非同期NIFの導入です。 @twinbee さんの「Elixirから簡単にRustを呼び出せるRustler #5 NIFからメッセージを返す」をGPU駆動に応用してみます。
NIFの1ms制約と非同期NIF呼出し
現行のErlang VMには,NIFの実行時間を1ms以内に収めないと並列処理のパフォーマンスが悪化するという制約があります。Dirty CPU Schedulerによりこの制約を外すことはできるのですが,さらにいろいろ制約があるようです。
非同期NIF呼出しによって,この制約を打破できます。ここでいう非同期NIF呼出しというのは,NIFの中でスレッドを起動し,このスレッドから結果をメッセージで送ってElixir側で受取ることです。こうすると引数を受取ってスレッドを起動し引数を受継ぐまでの処理を1ms以内に実行すれば良くなるので,1ms制約を満たすことが容易になります。
Elixir側のコード
ではコードを見ていきます。Elixir側のコードは割とシンプルです。
def map_calc_g1(x, p, mu, _stages) do
x
|> Enum.to_list
|> LogisticMapNif.call_ocl(p, mu)
end
def map_calc_g2(x, p, mu, _stages) do
x
|> Enum.to_list
|> LogisticMapNif.call_ocl2(p, mu)
receive do
l -> l
end
end
map_calc_g1
は同期NIF呼出し,map_calc_g2
は非同期NIF呼出しです。LogisticMapNif
モジュールの関数の呼出しはNIFのラッパーです。call_ocl
ではNIFの結果をそのまま戻り値として返しますが,call_ocl2
ではNIFを実行した時には実質的な処理はされず,非同期的に実質的な処理を実行して結果をメッセージで返します。非同期NIF呼出しではreceive
を用いてメッセージを受け取り,受取った値をそのまま式の値としています。
一応 LogisticMapNif
の該当部分のコードも載せておきます。いつものNIFの流儀に沿ってコードを書くだけです。
defmodule LogisticMapNif do
use Rustler, otp_app: :logistic_map, crate: :logistic_map
def call_ocl(_x, _p, _mu), do: :erlang.nif_error(:nif_not_loaded)
def call_ocl2(_x, _p, _mu), do: :erlang.nif_error(:nif_not_loaded)
Rust側のコード
Rust側のコードは次のような感じです。まず共通部分です。
#[macro_use] extern crate rustler;
#[macro_use] extern crate lazy_static;
extern crate ocl;
use rustler::{Env, Term, NifResult, Encoder, Error};
use rustler::env::{OwnedEnv, SavedTerm};
use rustler::types::list::ListIterator;
use rustler::types::tuple::make_tuple;
use ocl::{ProQue, Buffer, MemFlags};
mod atoms {
rustler_atoms! {
atom ok;
//atom error;
//atom __true__ = "true";
//atom __false__ = "false";
}
}
rustler_export_nifs! {
"Elixir.LogisticMapNif",
[("call_ocl", 3, call_ocl),
("call_ocl2", 3, call_ocl2)],
None
}
元のコードである call_ocl
は次の通りです。
fn trivial(x: Vec<i64>, p: i64, mu: i64) -> ocl::Result<(Vec<i64>)> {
let src = r#"
__kernel void calc(__global long* input, __global long* output, long p, long mu) {
size_t i = get_global_id(0);
long x = input[i];
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
x = mu * x * (x + 1) % p;
output[i] = x;
}
"#;
let pro_que = ProQue::builder()
.src(src)
.dims(x.len())
.build().expect("Build ProQue");
let source_buffer = Buffer::builder()
.queue(pro_que.queue().clone())
.flags(MemFlags::new().read_write())
.len(x.len())
.copy_host_slice(&x)
.build()?;
let result_buffer: Buffer<i64> = Buffer::builder()
.queue(pro_que.queue().clone())
.flags(MemFlags::new().read_write())
.len(x.len())
.build()?;
let kernel = pro_que.kernel_builder("calc")
.arg(&source_buffer)
.arg(&result_buffer)
.arg(p)
.arg(mu)
.build()?;
unsafe { kernel.enq()?; }
let mut vec_result = vec![0; result_buffer.len()];
result_buffer.read(&mut vec_result).enq()?;
Ok(vec_result)
}
fn call_ocl<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
let iter: ListIterator = try!(args[0].decode());
let p: i64 = try!(args[1].decode());
let mu: i64 = try!(args[2].decode());
let res: Result<Vec<i64>, Error> = iter
.map(|x| x.decode::<i64>())
.collect();
match res {
Ok(result) => {
let r1: ocl::Result<(Vec<i64>)> = trivial(result, p, mu);
match r1 {
Ok(r2) => Ok(r2.encode(env)),
Err(_) => Err(Error::BadArg),
}
},
Err(err) => Err(err),
}
}
非同期NIF呼出しをする call_ocl2
は次のようになります。
fn call_ocl2<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
let pid = env.pid();
let mut my_env = OwnedEnv::new();
let saved_list = my_env.run(|env| -> NifResult<SavedTerm> {
let list_arg = args[0].in_env(env);
let p = args[1].in_env(env);
let mu = args[2].in_env(env);
Ok(my_env.save(make_tuple(env, &[list_arg, p, mu])))
})?;
std::thread::spawn(move || {
my_env.send_and_clear(&pid, |env| {
let result: NifResult<Term> = (|| {
let tuple = saved_list.load(env).decode::<(Term, i64, i64)>()?;
let iter: ListIterator = try!(tuple.0.decode());
let p = tuple.1;
let mu = tuple.2;
let res: Result<Vec<i64>, Error> = iter
.map(|x| x.decode::<i64>())
.collect();
match res {
Ok(result) => {
let r1: ocl::Result<(Vec<i64>)> = trivial(result, p, mu);
match r1 {
Ok(r2) => Ok(r2.encode(env)),
Err(_) => Err(Error::BadArg),
}
},
Err(err) => Err(err)
}
})();
match result {
Err(_err) => env.error_tuple("test failed".encode(env)),
Ok(term) => term
}
});
});
Ok(atoms::ok().to_term(env))
}
プログラムの解説をしていきます。
let pid = env.pid();
プロセスIDを取得します。この値は,後でスレッドを起動するときに使います。
let mut my_env = OwnedEnv::new();
let saved_list = my_env.run(|env| -> NifResult<SavedTerm> {
let list_arg = args[0].in_env(env);
let p = args[1].in_env(env);
let mu = args[2].in_env(env);
Ok(my_env.save(make_tuple(env, &[list_arg, p, mu])))
})?;
スレッドを起動したときにそのままだと引数の値を引き継げないので,実行時環境を新たに作成して引数の値を記録しておきます。実行時環境にはタプルの形で記録します。この段階では引数の値をそのまま記録するだけで,Rustの構造体にデコードしません。
std::thread::spawn(move || {
...
});
Ok(atoms::ok().to_term(env))
... で記述した処理をするスレッドを起動し,アトム :ok
を返して終了します。NIFとしての実行は以上までです。...の中は非同期的に実行します。
my_env.send_and_clear(&pid, |env| {
let result: NifResult<Term> = (|| {
let tuple = saved_list.load(env).decode::<(Term, i64, i64)>()?;
let iter: ListIterator = try!(tuple.0.decode());
let p = tuple.1;
let mu = tuple.2;
...
})();
});
引数の値を記録した実行時環境を読み込みます。タプルを読みだして,リストの値をデコードします。整数値はそのまま読まれるみたいです。
let res: Result<Vec<i64>, Error> = iter
.map(|x| x.decode::<i64>())
.collect();
.collect();
match res {
Ok(result) => {
...
},
Err(err) => Err(err)
}
リストの値をRustの可変長配列の構造体であるVec
に読み込みます。デーコドするときにエラーが出るかもしれないので,いったん Result<Vec<i64>, Error>
型で受けてから,match
で正常終了した場合とエラー処理を必要とする場合に分岐します。おきまりのパターンですね。
let r1: ocl::Result<(Vec<i64>)> = trivial(result, p, mu);
match r1 {
Ok(r2) => Ok(r2.encode(env)),
Err(_) => Err(Error::BadArg),
}
trivial
関数を呼出してGPU処理をします。この部分を書き換えれば任意の関数を呼出すことができます。これも一旦 Result
型で受けて,正常終了した場合とエラー処理を必要とする場合に分岐します。
正常終了した場合は,GPU処理の戻り値(リスト)をエンコードして正常終了を発行します。こうするだけで,Elixir側にメッセージが送信されます。
エラー処理を必要とする場合は,このブロックの内側は Result
型,外側はNifResult
型なので,エラーの型を変換してやる必要があります。今回は引数エラー(BadArgument
)を発行したいので,Error::BadArg
を渡します。
この辺りが,Rust プログラミングを難しくしている要因の1つだなと痛切に思います。
std::thread.spawn(move || {
my_env.send_and_clear(&pid, |env| {
let result: NifResult<Term> = (|| {
...
})();
match result {
Err(_err) => env.error_tuple("test failed".encode(env)),
Ok(term) => term
}
}
});
});
これはブロックの内側 ... から外側に戻り値とエラーを引き継ぐためのコードです。この辺りも,Rustプログラミングを難しくしていますよね〜
実行結果
実行結果はこんな感じです。
$ mix run -e "LogisticMapNif.init; LogisticMap.benchmarks_g1; LogisticMap.benchmarks_g2"
Compiling NIF crate :logistic_map (native/logistic_map)...
Compiling logistic_map v0.1.0 (file:///Users/zacky/github/logistic_map/native/logistic_map)
Finished release [optimized] target(s) in 10.90s
Compiling 2 files (.ex)
LogisticMapNif_map_calc_list: 3400
LogisticMapNif_map_calc_binary: 3400
LogisticMapNif_map_calc_binary_to_binary: 3400
LogisticMapNif_call_ocl: 1
stages: 1
9.394874
stages: 1
6.86028
今までのNIF呼出しだと,9.395秒,非同期NIF呼出しだと6.860秒となりました! 実に1.37倍の速度向上です! NIFの1ms制約を満たすと,これだけ性能が向上するんですね!
まとめ
- 非同期NIF呼出しを実装することで,NIFの1ms制約を満たすことができ,大幅に実行速度を改善することができます。
- この改良により,GPU駆動のベンチマークプログラムを1.37倍速度向上させることができました!
- 非同期NIF呼出しのためのRustプログラミングは,型を合わせるのが難しいです。本プログラムを参考に開発してください!
次は「ZEAM開発ログ v.0.3.2 rayon によるSIMD(SSE2)マルチコアCPUによりOpenCL + GPUを上回るパフォーマンスが出た件」です。お楽しみに!
p.s.「いいね」よろしくお願いします
よろしければ,ページ左上の や のクリックをお願いしますー
ここの数字が増えると,書き手としては「ウケている」という感覚が得られ,連載を更に進化させていくモチベーションになりますので,もっとElixirネタを見たいというあなた,私たちと一緒に盛り上げてください!