本記事は「Elixir Advent Calendar 2022 カレンダー15の24日目」です。その16まであるので前後記事はカレンダーを参照ください。
今回はRustlerを使って、rustのライブラリをelixirから利用する際の知見を共有したいと思います。
移植しようとしているのは、ZenohというPub/Sub通信ライブラリですzenohの詳細については別記事を起こしたので以下を参照ください。
https://qiita.com/Shintaro_Hosoai/items/0bde489cde43a00d6f96
Rustlerの情報は少ないので、@twinbeeさんの一連の記事がとても参考になりました。ありがとうございます。
|> Elixirから簡単にRustを呼び出せるRustler #1 準備編
|> Elixirから簡単にRustを呼び出せるRustler #2 クレートを使ってみる
|> Elixirから簡単にRustを呼び出せるRustler #3 いろいろな型を呼び出す
|> Elixirから簡単にRustを呼び出せるRustler #4 SHIFT-JIS変換を行う
|> Elixirから簡単にRustを呼び出せるRustler #5 NIFからメッセージを返す
zenohを移植すると嬉しいこと
- ネット越しのpubsubが使える
他のpubsubライブラリと比べても十分高い通信性能を確保でき、利用方法も簡単です。 - 別のzenohを実装した言語と喋れる
他のクライアントライブラリとしてrust、python、cが用意されてるので、これらと相互にお話しできます。 - zenohのプラグインで繋がるものと喋れる
MQTT,rest,DDSとつながります。また組込み向けにも実装されてるので、これらのデバイスともお話しできそうです。
ちょっとワクワクしませんか?
Rustlerの基本
rastlerの利用方法は、Hex doc、docs.rs、githubを参照するのですが、いかんせん情報が少なすぎます。都度ソースや他の実装例を拾い読みすることになります。
プロジェクトの作成
$ mix new [project_name] # elixirプロジェクト作成
$ cd [project_name]
defp deps do
[
{:rustler, "~> 0.26.0"} # 追加
]
end
$ mix deps.get
$ mix rustler.new
This is the name of the Elixir module the NIF module will be registered to.
Module name > NifSample # これから作ろうとしているNIFのElixirのモジュール名を指定
This is the name used for the generated Rust crate. The default is most likely fine.
Library name (nifsample) > # rustのクレート名を指定。デフォルトでOK
これでelixirからrustを利用するための基本構造が整います。とても簡単ですね。
ディレクトリの構造は以下のようになります。
vscodeでelixir, rustを編集する人は、elixir用とは別途、native/rustlerフォルダをエディタで開くと、rustanalyzerが助けてくれるようになります。(一つだけでもrustanalyzerの設定でなんとかなりそうですが、、調べてないです。)
またRustとElixirの関数の対応関係は以下のようになります。
サンプルのコードにElixir側のNifモジュールを追加すれば利用できるようになります。
$ iex -S mix
iex> NifSample.add(4,5)
9
zenohの移植
今回やろうとしているのは、zenohのelixirクライアントライブラリの整備となります。主要なapi群を、取捨したりelixirっぽい味付けをしながら、順次移植していきます。
サンプルプログラムの呼び出し
とはいえ、本当にzenohを追加したものがほんとに動くの?ってことで、まずは移植し始める前にサンプルプログラムを呼び出す例を試してみます。コード全体はここに置いてます。以下はPub部分の抜粋です。
pub async fn pub_zenoh(key_expr: String, value: String) {
env_logger::init();
let session = zenoh::open(Config::default()).res().await.unwrap();
println!("Declaring Publisher on '{}'...", key_expr);
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{:4}] {}", idx, value);
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
publisher.put(buf).res().await.unwrap();
}
}
#[rustler::nif]
fn tester_pub(key_expr: String, value: String) {
block_on(pub_zenoh(key_expr, value));
}
zenohのAPIは殆ど非同期なのですがasyncなnifは作れないため、pub_zenoh関数はasync包み、nifの呼び出し口のtester_pubでblock_onで同期しています。key_exprは他のPub/Subで言うところのトピック名、valueはメッセージの内容となります。処理は大まかにはzenohのセッションを作成、Publisherを作成、ループ内で1秒待ってput(publish)しています。
同様にsub部分です。
pub async fn sub_zenoh(key_expr: String) {
env_logger::init();
println!("Opening session...");
let session = zenoh::open(Config::default()).res().await.unwrap();
println!("Declaring Subscriber on '{}'...", &key_expr);
let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap();
println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
println!(">> [Subscriber] Received {} ('{}': '{}')",
sample.kind, sample.key_expr.as_str(), sample.value);
},
_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
}
}
#[rustler::nif]
pub fn tester_sub(key_exprt: String) {
block_on(sub_zenoh(key_exprt));
}
ターミナルを二枚開き
iex(1)> NifZenoh.tester_sub("nifzenoh/sample")
Opening session...
Declaring Subscriber on 'nifzenoh/sample'...
Enter 'q' to quit...
>> [Subscriber] Received PUT ('nifzenoh/sample': '[ 0] hello zenoh!')
>> [Subscriber] Received PUT ('nifzenoh/sample': '[ 1] hello zenoh!')
iex(1)> NifZenoh.tester_pub("nifzenoh/sample", "hello zenoh!")
Declaring Publisher on 'nifzenoh/sample'...
Putting Data ('nifzenoh/sample': '[ 0] hello zenoh!')...
Putting Data ('nifzenoh/sample': '[ 1] hello zenoh!')..
一応通信できているようです。表示がおかしい(キャリッジリターンが効いていない)のは、nifで呼び出した先のRustでPrintlnするとこのようになってしまうようです。基本的にRustから標準出力を使うことはないはずなので一旦はスルーします。
通信ライブラリはテスト時に相手がいないと厄介なので、このようなものを作っとくと後々楽です。
ざっくり設計
とりあえず最低限動くことが分かったので、設計方針を決めていきます。
rust側は基本的にzenohのAPIをラップするような形で、できる限り同レベルで揃え、lib.rsに集約します。
elixir側はNifは一つのモジュールで繋ぎ、その後Zenohのモジュールと同等のElixirのモジュールに割り振ります。このレイヤは基本的にベタにzenoh apiを移植したものにします。基本的にユーザはnifの関数は直接触らず、このレイヤの関数を使って貰います。
Publisherの移植
さていよいよ移植開始ですが、tester_pubでも分かるように、ZenohではSessionを作り、Publisherを作り、そこからputという風に段階が分れています。これらを個別にNifに落とすと参照を持ち回す必要があります。
この時、実体はRust側にあり、そのポインタだけをElixir側に渡すような形態になるそうです。
以下がZenoh::openに対応するnif関数のzenoh_openです。
ResourceArcが先ほどのべたポインタだけ渡す入れ物と思ってください。単にResourceArcとできればよかったんですが、rustlerで扱えるのは自プロジェクト内のStructだけとのことで、一段入れ子にしつつMutexで包んでいます。
zenoh_open内では、サンプルと同様にzenoh::openをデフォルト設定で呼んで、Session::leakで使い捨ての参照を得ています。あとは上で作ったStructに詰めて返しています。またloadのところはResourceArcで利用するStructの登録を行っています。マクロでなんかいい感じに処理してくれるそうです。
struct SessionContainer {
session_mux: Mutex<&'static Session>,
}
#[rustler::nif]
fn zenoh_open<'a>() -> ResourceArc<SessionContainer> {
ResourceArc::new(SessionContainer {
session_mux: Mutex::new(Session::leak(
block_on(zenoh::open(Config::default()).res()).unwrap(),
)),
})
}
fn load<'a>(env: Env<'a>, _: Term<'a>) -> bool {
rustler::resource!(SessionContainer, env);
true
}
さて、同様にPublisherを保持するStructも作っていくのですが、こちらはPublisher<'a>を保持する必要がありますが、loadで登録するResourceArcで扱う構造体にはライフタイム制約を用いることができないため、<'static>を指定する必要があります。当初上記のSessionContainerではMutexで保持していたのですが、これだとdeclare_publisherを呼んだ際にstaticの制約を満たすことができず、苦肉の策でSession側も'staticを付与したという経緯があります。
struct PublisherContainer {
publisher_mux: Mutex<Publisher<'static>>,
}
#[rustler::nif]
fn session_declare_publisher<'a>(
env: Env<'a>,
resource_session: ResourceArc<SessionContainer>,
keyexpr: String,
) -> Term<'a> {
let session = resource_session.session_mux.lock().unwrap();
let publisher = session.declare_publisher(keyexpr);
let resource_publisher = ResourceArc::new(PublisherContainer {
publisher_mux: Mutex::new(block_on(publisher.res()).unwrap()),
});
(ok(), resource_publisher).encode(env)
}
残りはput部です。ここは使うだけなので平和ですね。
#[rustler::nif]
fn publisher_put<'a>(
env: Env<'a>,
resource_session: ResourceArc<PublisherContainer>,
value: String,
) -> Term<'a> {
let publisher = &resource_session.publisher_mux.lock().unwrap();
block_on(publisher.put(value).res()).unwrap();
(ok()).encode(env)
}
これでPublisher側は、APIを分けて実装することができ、以下のように動作させられました。
$ iex -S mix
iex> Tester.sub("topic")
$ iex -S mix
iex> session = Zenohex.open
iex> {:ok, publisher} = Session.declare_publisher(session, "topic")
iex> Publisher.put(publisher, "hello?")
Subscriberの移植
Pub/SubではSubscriberにメッセージが届くと登録してあるコールバック関数が呼び出されるのですが、この部分の実装がなかなかに面倒です。RustからElixirの関数をコールバックする方法は見つけられなかったため、RustからElixirの特定のプロセスにメッセージを送る方法で代用しています。
#[rustler::nif]
fn session_declare_subscriber<'a>(
env: Env<'a>,
resource_session: ResourceArc<SessionContainer>,
keyexpr: String,
pid: Pid,
) -> Term<'a> {
let mut subscriber_env = OwnedEnv::new();
let session = resource_session.session_mux.lock().unwrap();
let subscriber = block_on(session.declare_subscriber(keyexpr).res()).unwrap();
std::thread::spawn(move || loop { // メッセージを待ち続けるスレッドを作成
let sample = block_on(subscriber.recv_async()).unwrap();
subscriber_env.send_and_clear(&pid, |env| sample.value.to_string().encode(env)); // pidのプロセスにメッセージの内容を送信
});
ok().encode(env)
}
def session_declare_subscriber(_session, _keyexpr, _callbackpid), do: exit(:nif_not_loaded) # こっちがNif関数。第三引数にコールバックしたい関数を置く
def session_declare_subscriber_wrapper(session, keyexpr, callback) do # こちらはそのラッパー。呼び出し時にプロセスを作ってpidを得てから上記のNifを呼ぶ
pid =
spawn(fn ->
receive do
msg -> callback.(msg) # メッセージが届いたら、コールバック関数を呼ぶ
end
end)
session_declare_subscriber(session, keyexpr, pid)
end
今悩んでるところ
-
送受するデータ型の定義
現状の実装では送れるメッセージはテキスト単体です。zenohでは任意のデータを送ることができるのですが、この定義をどうしたものかと悩んでおります。将来的にはElixir側で適当なMapなりStructureを作って、ポンっと投げれば良きに図らってくれるようにしたいのですが、Zenohで送信データを加工する際には型情報が必要となります。マクロでゴリゴリ頑張れば、シリアライザ・デシリアライザも作れなくも無さそうですが・・さすがにやりたくないなぁというお気持ちです。
とりあえずは送信データの加工部分もNIFで引っ張り上げて、Elixir側で加工して投げるという形に落ち着きそうです。 -
設定情報の取り回し
ZenohのAPIではBuilderパターンが多用されておりRustで使う分にはとても楽なのですが、1設定項目毎にNif経由で渡すというのはあまり現実的では無いように思います。Elixir側で設定項目を纏めておいて一括で渡す方式にしようと思ってますが、さてどうしたものか・・。 -
非同期関数の扱い
ZenohのAPIの多くはasyncです。現状は全部block_onで潰すという野蛮なことをしてますが、できれば非同期を活かした実装にしたいところです。とはいえ、Rustlerがasyncに対応してないので、互いの間を全部メッセージ通信で繋ぐかというと・・それはそれでつらそうです。
終わりに
zenohのAPIをnifで叩くだけでしょ?よゆーよゆーとか思っていた時期が、私にもありました。全然わかんねぇよこんちくしょう。rustもelixirも詳しくないですが、手を出しちまったもんは仕方ないので頑張ります。
少しでも皆さんのRust-Elixirを繋ぐ架け橋に貢献できれば幸いです。
現状のgithubはこちらになります。
https://github.com/b5g-ex/zenohex
まだ文字を送るだけで精一杯ですが、少しずつ育てて行こうと思います。優しく見守ったりstar, issue, PRくれると嬉しいです。