(この記事は、「fukuoka.ex(その2) Elixir Advent Calendar 2017」の17日目、
  および「Webパフォーマンス Advent Calendar 2017」の6日目です)
昨日は、@tuchiroさんのElixirでSI開発入門 #8 Railsからのモデルの移行1(FitGap分析)でした
この連載の、前回までの記事は、以下になります
ElixirのGenStageに入門する #1
|> ElixirのGenStageに入門する#2 バックプレッシャーを理解する
|> Elixir並列処理「Flow」の2段ステージ構造を理解する
|>Elixirから簡単にRustを呼び出せるRustler #1 準備編
|> Elixirから簡単にRustを呼び出せるRustler #2 クレートを使ってみる
|> Elixirから簡単にRustを呼び出せるRustler #3 いろいろな型を呼び出す
|> Elixirから簡単にRustを呼び出せるRustler #4 SHIFT-JIS変換を行う



 お礼:各種ランキングに83回のランクインを達成しました
 お礼:各種ランキングに83回のランクインを達成しました 


 
4/27から、37日間に渡り、毎日お届けしている「季節外れのfukuoka.ex Elixir Advent Calendar」と「季節外れのfukuoka.ex(その2) Elixir Advent Calender」ですが、Qiitaトップページトレンドランキングに12回入賞、Elixirウィークリーランキングでは6週連続で1/2/3フィニッシュを飾り、各種ランキング通算で、トータル83回ものランクインを達成しています
みなさまの暖かい応援に励まされ、合計552件ものQiita「いいね」もいただき、fukuoka.exアドバイザーズとfukuoka.exキャストの一同、ますます季節外れのfukuoka.ex Advent Calendar、頑張っていきます
はじめに
Elixirはマルチコアの能力を簡単に引き出せる言語ですが、数値計算などはそれほど早くないという特性があります。この連載はNIFという外部言語インターフェースを使って、C言語並のスピードとモダンな文法を持ち合わせたRustのプログラムを呼び出すシリーズです。
RustlerのNIF対応バリュエーション
さて、ここまで基本的なNIFインターフェースについて学んできました。
この他にも、Rustlerには重要なNIF対応の機能が用意されています。
- 
Resource対応 プロセスに依存しない領域に、リソースを確保。 
- 
Thread対応 NIFでマルチスレッドを実行。 
- 
Dirty CPU Scheduler 関数毎の指定 
- 
Dirty IO Scheduler 関数毎の指定(CPU Schedulerとは排他) 
- 
Threadを使って、呼び出しプロセスにメッセージを返す。 
今回は、**「Threadを使って、呼び出しプロセスにメッセージを返す」**にフォーカスします。
NIFからThreadを起動してメッセージを返す
NIFの結果を「メッセージで受け取る」ことができると、何が嬉しいかというと、「非同期でNIFの実行が出来る」ということです。どのような実装で行うのでしょうか?
前回までと同様に、RustlerのTESTコードを解読して行きます。
実行結果
今回は先に実行例と結果を見てみます。
引数のキーワードリストの、空リストを含む全てのタプルの組み合わせを、リストとして返します。このように、NIFからメッセージ送信を使っての「入れ子のリスト」の受け取りが可能であることがおわかり頂けると思います。
iex(1)> NifExample.test_message
[
  [],
  [rope: 4],
  [ring_of_power: 75],
  [ring_of_power: 75, rope: 4],
  [spaniel: 34],
  [spaniel: 34, rope: 4],
  [spaniel: 34, ring_of_power: 75],
  [spaniel: 34, ring_of_power: 75, rope: 4]
]
iex(2)>
Elixir側
呼び出し側を見ていきます。
Key-Valueタプルのリスト、いわゆるKeyword Listを、NIF関数のsublistsに送っています。続くreceive doは、Elixirのアクターモデルであるメッセージ(メール)を受け取るまで待機するコードです。 受取ったメッセージをそのまま返す。という単純な構造です。
 def test_message do
    shop_menu = [
        {:spaniel, 34},
        {:ring_of_power, 75},
        {:rope, 4}
      ]
      sublists(shop_menu)
      receive do
        message ->
            message # 受取ったメッセージをそのまま返す
      end
 end
Rust側
テストコードを通常を、通常のRutlerプロジェクトでコンパイルが通るようにしたコードが以下です。 テストコードのコメントを訳しているので、ポイントのみを解説します。
- 
スレッドをspawnして、メッセージを返す 
 どうやって非同期を実現しているのかが、一番興味のある点だとは思います。Rustlerでは新たにスレッドを生成してNIFの関数呼び出しより長い生存期間を作り出し、そこからErlang VMのプロセスIDにメッセージを送信することで実現しています。スレッドはRustが提供する標準スレッドで、OSのNativeスレッドを使用します。
- 
スレッドのオーナーはErlang VM ? NIF? 
 スレッドのオーナーはNIFになります。Rustのもつスレッドライブラリを利用して、スレッドを生成しています
- 
ワーカースレッドの「環境」 
 ワーカースレッドには「スレッド環境」が必用ですが、VMのプロセスに依存しない場所に環境が保持される必用があります。Rustlerでは、**OwedEnv**というErlang OTPのAPIenif_alloc_envを抽象化したトレイトで実現しています。
- 
メッセージセンドバックの方法 
 スレッドの中からErlang OTPのAPIenif_send関数を実行することにより、メッセージが返されます。my_env.send_and_clear関数の中に実装されています。
- 
ライフタイム変換 
 詳しくは扱いませんが、Rustにはライフタイムという概念があります。<'a>という「護符」のような表現で現しています。NifTerm型は関数内のライフタイムが定義されているのに、スレッドの中で関数を超えるライフタイムが発生するので、ライフタイム変換が必用になっています。
- 
リスト処理 
 主題からはオマケ的なポジションですが、my_env.send_and_clear内の無名関数内では、NifTerm型の空のリストの生成等、NIFでリストを扱う際の有益なサンプルとなっています。
use rustler::env::{OwnedEnv, SavedTerm};
use rustler::types::list::NifListIterator;
fn sublists<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
    //"thread NIF"です:メッセージを返すスレッドを生成します
    //後で呼び出し側のスレッドに渡します。
    let pid = env.pid();
    // ワーカースレッドには環境が必要です。`env`を他のスレッドへ移行することはできません。
    // NIFから戻ったとたんに、Erlang VMが破棄してしまうからです。
    //なのでここでは、 `OwnedEnv`を使います。
    let mut my_env = OwnedEnv::new();
    //(リスト必須の)引数をとり、`my_env`にコピーし
    // 反転させます。`my_env.save()`を使って
    // ライフタイムパラメーターを持たない形式のtermsに記憶することができます。    
    let saved_reversed_list = my_env.run(|env| -> NifResult<SavedTerm> {
        let list_arg = args[0].in_env(env);
        Ok(my_env.save(list_arg.list_reverse()?))
    })?;
    //ワーカースレッドを開始します。この `move`クロージャは、
    // `my_env`と` saved_reversed_list`です。    
    std::thread::spawn(move || {
        // `OwnedEnv`から`Env`を得るには、`.send()`を使います
        // Rustのコードを実行後、最後にpidをセンドバックします。
        my_env.send_and_clear(&pid, |env| {
            let result: NifResult<NifTerm> = (|| {
                let reversed_list = saved_reversed_list.load(env);
                let iter: NifListIterator = try!(reversed_list.decode());
                let empty_list = Vec::<NifTerm>::new().encode(env);
                let mut all_sublists = vec![empty_list];
                for element in iter {
                    for i in 0 .. all_sublists.len() {
                        let new_list = all_sublists[i].list_prepend(element);
                        all_sublists.push(new_list);
                    }
                }
                Ok(all_sublists.encode(env))
            })();
            match result {
                Err(_err) => env.error_tuple("test failed".encode(env)),
                Ok(term) => term
            }
        });
    });
    Ok(atoms::ok().to_term(env))
}
パフォーマンス
NIFからのスレッド起動、簡単とは言えないまでも悪くない記述性に見えます。
長期間稼働するワーカースレッドを起動したりと、夢が膨らむのですが、パフォーマンスはどうなのでしょうか?
今回は、単純な足し算をするセンドバックNIFを書いてみます。
関数マッピングは略してます。
Rust側 (パフォーマンス測定用)
Rust側は整数に1を足して、メッセージを返すだけのコード。
use rustler::env::{OwnedEnv, SavedTerm};
fn sendback_test<'a>(env: NifEnv<'a>, args: &[NifTerm<'a>]) -> NifResult<NifTerm<'a>> {
    let pid = env.pid();
    let mut my_env = OwnedEnv::new();
    let saved_reversed_int = my_env.run(|env| -> NifResult<SavedTerm> {
        let int_arg = args[0].in_env(env);
        Ok(my_env.save(int_arg))
    })?;
    std::thread::spawn(move || {
        my_env.send_and_clear(&pid, |env| {
            let result: NifResult<NifTerm> = (|| {
                let res_term = saved_reversed_int.load(env);
                let res_int : i64  = try!(res_term.decode());
                Ok((res_int + 1).encode(env))  // Plus 1 !
            })();
            match result {
                Err(_err) => env.error_tuple("test failed".encode(env)),
                Ok(term) => term
            }
        });
    });
    Ok(atoms::ok().to_term(env))
}
Elixir側(パフォーマンス測定用)
Elixir側の呼び出し部分です。以下2種類のテストコードを書いてます。
- メッセージ受信まで行う同期呼び出し
- NIF呼び出しのみ行う非同期呼び出し
  # NIF起動からメッセージ受信までの処理(同期)
  def test_sendback(n \\ 10) do
      sendback_test(n)
      receive do
        message ->
           # IO.inspect(message)
           message
      end
  end
  # NIF起動のスピードのみを測定(非同期)
  def test_sendback2(loop \\ 1) do
    IO.inspect :timer.tc( fn ->
        Enum.reduce(1..loop, 0, fn i,_-> sendback_test(i) end)
    end)
    receive do
      message ->
         # IO.inspect(message)
         message
    end
  def sendback_test(_a), do: exit(:nif_not_loaded)
end
以下対話環境で実行します。
- 同期実行
iex(1)> :timer.tc(fn -> 
    Enum.reduce(1..10000,0,fn i,_-> NifExample.test_sendback(i) end)
  end)
- 非同期実行
 ファンクションの中で時間計測をしています。
 測定後にflush()でメールボックスを空にします
iex(1)> :timer.tc(fn -> NifExample.test_message2(10000) end)
測定結果
以下の結果が出ました。
各5回のテストを行い平均を取りました。
| テスト | タイプ | 測定結果avg(μ秒) | 
|---|---|---|
| test_sendback | 同期 | 1381657.8 | 
| test_message2 | 非同期 | 1058134.2 | 
(測定環境)
Docker on Windows Home Edition(use Virtual box)
Elixir 1.6.5 OTP 20.3
Rustc  1.22.1
一回当たりの呼び出しコストが平均105.8μ sec、メール受信のコストが平均32.4μ secかかっています。 通常のNIF呼び出しが1桁μ secであることを考えると、かなり遅い結果といえます。
RustのスレッドはOSのネイティブスレッドであり、Rustlerでは新規スレッドを毎回生成しているために、これだけの時間がかかっているものと思われます。Elxirの軽量プロセスの素晴らしさが改めて身に沁みますね!
 
高頻度でこのNIFを呼び出すのには向いてないということがわかりましたが、NIFスレッドで長期間起動してElixir側のプロセスにメッセージを返すという作りには向いてる気がします。(現時点では使用例を知らないので、やんわりとした表現に留めておきます ) 外部言語とElixirでデータを受け渡しを行う一つの技として、頭に入れておいて損のない技術ですね。
) 外部言語とElixirでデータを受け渡しを行う一つの技として、頭に入れておいて損のない技術ですね。
Rustlerはまだ紹介出来てない機能があるのですが、本連載はここで終了します。Rustlerの活動は活発なので、またご報告の機会はあると思います。
次回は、「Google DatalabコンテナにElixirを入れてTensorflexを動かそう」です。お楽しみに!
明日は @zacky1972 さんの「ZEAM開発ログv0.1.6.1 Elixir / Rustler 小ネタ集〜 Rustler でリストからバイナリに変換」になります。
★★★ 満員御礼!Elixir MeetUpを6月末に開催します ★★★
※応募多数により、増枠しました
「fukuoka.ex#11:DB/データサイエンスにコネクトするElixir」を6/22(金)19時に開催します
私もMnesia/ETSを使ったスモールアナリティクスで登壇予定です。


