最初に
- この記事は、個人的メモのレベルだが、巷では
rxRust
の情報が少ないため、何かのヒントになればレベルで投稿することにしました。(またはコメントいただけると嬉しいです) -
ReactiveExtensions
はRxJs(TypeScript)
とRxCpp
をすべてのプロダクトで使用していますが、Rust
は初学者レベルです。 - 徐々に追記していく予定。
インストール
tomlファイル
[dependencies]
rxrust = { git = "https://github.com/rxRust/rxRust", tag = "v1.0.0-beta.1" }
インポート
use rxrust::prelude::*;
use std::convert::Infallible; // 必要があれば
今まで、Observable
がエラーしない場合Err
を()
とすることができたが、あるバージョンから Infallible
とすることになった。
subscribe と actual_subscribe
subscribe()
はObservable
がエラーを発行しない場合のみ使用できる。
それ以外は actual_subscribe()
を使用しなければならない。
actual_subscribe()
はObserver<>
で観測することになるので、下記の SubscribeOption
のような簡易オブジェクトを作ると便利かも。
use rxrust::prelude::Observer;
pub struct SubscribeOption<'a, T, E> {
pub on_next: Option<&'a dyn Fn(T)>,
pub on_error: Option<&'a dyn Fn(E)>,
pub on_complete: Option<&'a dyn Fn()>,
pub on_is_finish: Option<&'a dyn Fn() -> bool>,
}
impl<'a, T, E> Observer<T, E> for SubscribeOption<'a, T, E> {
fn next(&mut self, value: T) {
if let Some(f) = self.on_next {
f(value);
}
}
fn error(self, err: E) {
if let Some(f) = self.on_error {
f(err);
}
}
fn complete(self) {
if let Some(f) = self.on_complete {
f();
}
}
fn is_finished(&self) -> bool {
if let Some(f) = self.on_is_finish {
f()
} else {
false
}
}
}
observable の生成
基本形
fn main() {
let o32 = observable::create(|mut subscriber: Subscriber<_>| {
subscriber.next(1i32);
subscriber.next(2i32);
subscriber.next(3i32);
subscriber.next(4i32);
subscriber.complete();
});
o32.subscribe(|x| println!("{}", x));
}
// 出力
// 1
// 2
// 3
// 4
observableを返却する関数
fn main() {
fn func_number<O>() -> impl Observable<i32, Infallible, O> + ObservableExt<i32, Infallible>
where
O: Observer<i32, Infallible>,
{
observable::create(|mut subscriber: Subscriber<_>| {
subscriber.next(1i32);
subscriber.next(2i32);
subscriber.next(3i32);
subscriber.next(4i32);
subscriber.complete()
})
}
fn func_string<O>() -> impl Observable<String, Infallible, O> + ObservableExt<String, Infallible>
where
O: Observer<String, Infallible> + 'static,
{
observable::create(|mut subscriber: Subscriber<_>| {
subscriber.next(1i32);
subscriber.next(2i32);
subscriber.next(3i32);
subscriber.next(4i32);
subscriber.complete()
})
.flat_map(|x| if x == 1 { of(x) } else { of(x + 2) })
.flat_map(|x| of(format!("string {}", x)))
}
func_number()
.map(|x| x * 10)
.actual_subscribe(rxutil::SubscribeOption {
on_next: Some(&|value| {
println!("next: {:}", value);
}),
on_error: Some(&|_| {
println!("error");
}),
on_complete: Some(&|| {
println!("complete");
}),
on_is_finish: None,
});
func_string()
.map(|x| format!("map {}", x))
.subscribe(|x| println!("{}", x)); // Err が Infallible なら subscribeが使える
}
// next: 10
// next: 20
// next: 30
// next: 40
// complete
// map string 1
// map string 4
// map string 5
// map string 6
エラーの取り扱い
エラー復旧オペレータ Catch
Retry
が存在しない
-
Catch
が必要なのであれば、Result<>
を伝搬するのが良いかと。 -
Retry
が必要なのであれば、Observable
を作って内部でループするのが良いかと。 - エラー型は概ね
Infallible
が使用されている。(つまりエラーが発生しない)
エラー型を定義するとsubscribe()
は使用できない
エラー型が定義される(Infallible
じゃなくてResult<>
)ということは、エラー送出される可能性があことを意味している。
subscribe()
は next
のみ観測できるが、error
を観測することができないため、エラーが送出される可能性がある場合にはsubscribe()
を使用することができない。
この場合、next
、error
、complete
のフルセットで観測することができるactual_subscribe()
を使用する。
エラーの発行は Result<>
を使用する
rust
には例外機構がないため、エラー送出はResult<(), Err>
を使用します。
値は使用しないので、明示的に()
とするのが良さそう。
fn main() {
type TyItem = i32;
type TyMyError = Result<(), &'static str>;
struct MyObservable;
impl<O> Observable<TyItem, TyMyError, O> for MyObservable
where
O: Observer<TyItem, TyMyError>,
{
type Unsub = ();
fn actual_subscribe(self, mut observer: O) -> Self::Unsub {
observer.next(1);
observer.next(2);
observer.next(3);
observer.error(TyMyError::Err("observer.error"));
// observer.complete();
}
}
impl ObservableExt<TyItem, TyMyError> for MyObservable {}
MyObservable {}
.map(|x| format!("{}", x))
.actual_subscribe(rxutil::SubscribeOption {
on_next: Some(&|value| {
println!("next: {:}", value);
}),
on_error: Some(&|err: Result<_, &str>| {
if let Err(x) = err {
println!("error: {:?}", x);
}
}),
on_complete: Some(&|| {
println!("complete");
}),
on_is_finish: None,
});
}
// next: 1
// next: 2
// next: 3
// error: Err("observer.error")
絶望的な問題
flat_map()
返却するobservable
の制約が厳しすぎる。
具体的には、下記のように flat_map()
内で of()
と never()
を合成することができない。
rx
ってobservable
をつなぎまくって実装するのが醍醐味だと思うんだけどなぁ。。。
回避方法を模索したけど、現段階では見つけられなかった。
(浅い知識ではあるけど、rxRust
の実装方針をみると解なしのような気がする)
fn main() {
observable::from_iter(1..3)
.flat_map(|x| match x {
1 => of(1),
2 => never(),
})
.actual_subscribe(rxutil::SubscribeOption {
on_next: Some(&|value| {
println!("next: {:}", value);
}),
on_error: Some(&|err| {
if let Err(x) = err {
println!("error: {:?}", x);
}
}),
on_complete: Some(&|| {
println!("complete");
}),
on_is_finish: None,
});
}
追記:2023/03/31 rxRust
さん、ごめんなさい。。。
上記コードはnever
を使わない前提であれば、下記のように書ける。
fn main() {
observable::from_iter(1..3)
.flat_map(|x| -> BoxOp<'_, i32, _> {
match x {
1 => of(x).box_it(),
2 => empty().box_it(),
3 => of(x).map(|x| x * x).box_it(),
_ => of(x).box_it(),
}
})
.actual_subscribe(rxutil::SubscribeOption {
on_next: Some(&|value| {
println!("next: {:}", value);
}),
on_error: Some(&|err| {
if let Err(x) = err {
println!("error: {:?}", x);
}
}),
on_complete: Some(&|| {
println!("complete");
}),
on_is_finish: None,
});
}
rxcpp
のas_dynamic()
に相当するものがbox_it()
らしい。
ただ、この場合、型推論が効かないので、明示的に返却型を指定することになる。
う〜〜ん、rxRust
を自作したおかげで、上の答えを見つけるというオチか。。。
で、自作してみる
回避方法の模索を諦めて、rxRust
とは異なる方針で自作可能か検証してみる。
ザックリとして方針は下記のとおり。
- エラーは
anyhow::Error
で表現し、Arc
で引きずり回す。 - 上流ソースを関数に閉じ込めて隠蔽する。
- メモリや速度の効率よりも柔軟性を重視する。
う〜ん。。。 another-rxcpp
に続いて another-rxrust
か。。。
んで、RxRust
では表現できない、こんな事ができるようになる。
use crate::prelude::*;
use anyhow::anyhow;
use std::{thread, time};
fn basic() {
fn ob() -> Observable<'static, i32> {
Observable::create(|s| {
s.next(1);
s.next(2);
s.next(3);
s.next(4);
s.complete();
})
}
ob()
.observe_on(schedulers::new_thread())
.flat_map(|x| match x {
1 => observables::empty(),
2 => observables::just(x),
3 => ob().map(|x| (x + 100)),
4 => observables::error(RxError::new(anyhow!("err"))),
_ => observables::never(),
})
.map(|x| format!("{}", x))
.on_error_resume_next(|e| ob().map(move |x| format!("resume {:} {}", e.error, x)))
.subscribe(
|x| {
println!("next {}", x);
},
|e| {
println!("error {:}", e.error);
},
|| {
println!("complete");
},
);
thread::sleep(time::Duration::from_millis(500));
}
// next 2
// next 101
// next 102
// next 103
// next 104
// next resume err 1
// next resume err 2
// next resume err 3
// next resume err 4
// complete