3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RxRustのメモ

Last updated at Posted at 2023-03-14

最初に

  • この記事は、個人的メモのレベルだが、巷ではrxRustの情報が少ないため、何かのヒントになればレベルで投稿することにしました。(またはコメントいただけると嬉しいです)
  • ReactiveExtensionsRxJs(TypeScript)RxCppをすべてのプロダクトで使用していますが、Rustは初学者レベルです。
  • 徐々に追記していく予定。

インストール

tomlファイル

Cargo.toml
[dependencies]
rxrust = { git = "https://github.com/rxRust/rxRust", tag = "v1.0.0-beta.1" }

インポート

use rxrust::prelude::*;
use std::convert::Infallible; // 必要があれば

今まで、Observableがエラーしない場合Err()とすることができたが、あるバージョンから Infallible とすることになった。

subscribe と actual_subscribe

subscribe()Observableがエラーを発行しない場合のみ使用できる。
それ以外は actual_subscribe() を使用しなければならない。

actual_subscribe()Observer<>で観測することになるので、下記の SubscribeOption のような簡易オブジェクトを作ると便利かも。

rxutil.rs内の(SubscribeOption)
use rxrust::prelude::Observer;

pub struct SubscribeOption<'a, T, E> {
  pub on_next: Option<&'a dyn Fn(T)>,
  pub on_error: Option<&'a dyn Fn(E)>,
  pub on_complete: Option<&'a dyn Fn()>,
  pub on_is_finish: Option<&'a dyn Fn() -> bool>,
}

impl<'a, T, E> Observer<T, E> for SubscribeOption<'a, T, E> {
  fn next(&mut self, value: T) {
    if let Some(f) = self.on_next {
      f(value);
    }
  }
  fn error(self, err: E) {
    if let Some(f) = self.on_error {
      f(err);
    }
  }
  fn complete(self) {
    if let Some(f) = self.on_complete {
      f();
    }
  }
  fn is_finished(&self) -> bool {
    if let Some(f) = self.on_is_finish {
      f()
    } else {
      false
    }
  }
}

observable の生成

基本形

fn main() {
 let o32 = observable::create(|mut subscriber: Subscriber<_>| {
   subscriber.next(1i32);
   subscriber.next(2i32);
   subscriber.next(3i32);
   subscriber.next(4i32);
   subscriber.complete();
  });
  o32.subscribe(|x| println!("{}", x));
}
// 出力
// 1
// 2
// 3
// 4

observableを返却する関数

fn main() {
  fn func_number<O>() -> impl Observable<i32, Infallible, O> + ObservableExt<i32, Infallible>
  where
    O: Observer<i32, Infallible>,
  {
    observable::create(|mut subscriber: Subscriber<_>| {
      subscriber.next(1i32);
      subscriber.next(2i32);
      subscriber.next(3i32);
      subscriber.next(4i32);
      subscriber.complete()
    })
  }

  fn func_string<O>() -> impl Observable<String, Infallible, O> + ObservableExt<String, Infallible>
  where
    O: Observer<String, Infallible> + 'static,
  {
    observable::create(|mut subscriber: Subscriber<_>| {
      subscriber.next(1i32);
      subscriber.next(2i32);
      subscriber.next(3i32);
      subscriber.next(4i32);
      subscriber.complete()
    })
    .flat_map(|x| if x == 1 { of(x) } else { of(x + 2) })
    .flat_map(|x| of(format!("string {}", x)))
  }

  func_number()
    .map(|x| x * 10)
    .actual_subscribe(rxutil::SubscribeOption {
      on_next: Some(&|value| {
        println!("next: {:}", value);
      }),
      on_error: Some(&|_| {
        println!("error");
      }),
      on_complete: Some(&|| {
        println!("complete");
      }),
      on_is_finish: None,
    });

  func_string()
    .map(|x| format!("map {}", x))
    .subscribe(|x| println!("{}", x)); // Err が Infallible なら subscribeが使える
}
// next: 10
// next: 20
// next: 30
// next: 40
// complete
// map string 1
// map string 4
// map string 5
// map string 6

エラーの取り扱い

エラー復旧オペレータ Catch Retry が存在しない

  • Catchが必要なのであれば、Result<> を伝搬するのが良いかと。
  • Retry が必要なのであれば、Observableを作って内部でループするのが良いかと。
  • エラー型は概ね Infallible が使用されている。(つまりエラーが発生しない)

エラー型を定義するとsubscribe()は使用できない

エラー型が定義される(InfallibleじゃなくてResult<>)ということは、エラー送出される可能性があことを意味している。
subscribe()next のみ観測できるが、errorを観測することができないため、エラーが送出される可能性がある場合にはsubscribe()を使用することができない。
この場合、nexterrorcompleteのフルセットで観測することができるactual_subscribe() を使用する。

エラーの発行は Result<> を使用する

rustには例外機構がないため、エラー送出はResult<(), Err>を使用します。
値は使用しないので、明示的に()とするのが良さそう。

fn main() {
  type TyItem = i32;
  type TyMyError = Result<(), &'static str>;
  struct MyObservable;
  impl<O> Observable<TyItem, TyMyError, O> for MyObservable
  where
    O: Observer<TyItem, TyMyError>,
  {
    type Unsub = ();

    fn actual_subscribe(self, mut observer: O) -> Self::Unsub {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.error(TyMyError::Err("observer.error"));
      // observer.complete();
    }
  }
  impl ObservableExt<TyItem, TyMyError> for MyObservable {}

  MyObservable {}
    .map(|x| format!("{}", x))
    .actual_subscribe(rxutil::SubscribeOption {
      on_next: Some(&|value| {
        println!("next: {:}", value);
      }),
      on_error: Some(&|err: Result<_, &str>| {
        if let Err(x) = err {
          println!("error: {:?}", x);
        }
      }),
      on_complete: Some(&|| {
        println!("complete");
      }),
      on_is_finish: None,
    });
}
// next: 1
// next: 2
// next: 3
// error: Err("observer.error")

絶望的な問題

flat_map() 返却するobservable の制約が厳しすぎる。
具体的には、下記のように flat_map() 内で of()never() を合成することができない。
rxってobservableをつなぎまくって実装するのが醍醐味だと思うんだけどなぁ。。。
回避方法を模索したけど、現段階では見つけられなかった。
(浅い知識ではあるけど、rxRustの実装方針をみると解なしのような気がする)

fn main() {
  observable::from_iter(1..3)
    .flat_map(|x| match x {
      1 => of(1),
      2 => never(),
    })
    .actual_subscribe(rxutil::SubscribeOption {
      on_next: Some(&|value| {
        println!("next: {:}", value);
      }),
      on_error: Some(&|err| {
        if let Err(x) = err {
          println!("error: {:?}", x);
        }
      }),
      on_complete: Some(&|| {
        println!("complete");
      }),
      on_is_finish: None,
    });
}

追記:2023/03/31 rxRustさん、ごめんなさい。。。

上記コードはneverを使わない前提であれば、下記のように書ける。

fn main() {
  observable::from_iter(1..3)
    .flat_map(|x| -> BoxOp<'_, i32, _> {
      match x {
        1 => of(x).box_it(),
        2 => empty().box_it(),
        3 => of(x).map(|x| x * x).box_it(),
        _ => of(x).box_it(),
      }
    })
    .actual_subscribe(rxutil::SubscribeOption {
      on_next: Some(&|value| {
        println!("next: {:}", value);
      }),
      on_error: Some(&|err| {
        if let Err(x) = err {
          println!("error: {:?}", x);
        }
      }),
      on_complete: Some(&|| {
        println!("complete");
      }),
      on_is_finish: None,
    });
}

rxcppas_dynamic()に相当するものがbox_it()らしい。
ただ、この場合、型推論が効かないので、明示的に返却型を指定することになる。
う〜〜ん、rxRustを自作したおかげで、上の答えを見つけるというオチか。。。

で、自作してみる

回避方法の模索を諦めて、rxRustとは異なる方針で自作可能か検証してみる。
ザックリとして方針は下記のとおり。

  • エラーは anyhow::Error で表現し、Arc で引きずり回す。
  • 上流ソースを関数に閉じ込めて隠蔽する。
  • メモリや速度の効率よりも柔軟性を重視する。

う〜ん。。。 another-rxcpp に続いて another-rxrust か。。。

んで、RxRustでは表現できない、こんな事ができるようになる。

use crate::prelude::*;
use anyhow::anyhow;
use std::{thread, time};

fn basic() {
  fn ob() -> Observable<'static, i32> {
    Observable::create(|s| {
      s.next(1);
      s.next(2);
      s.next(3);
      s.next(4);
      s.complete();
    })
  }

  ob()
    .observe_on(schedulers::new_thread())
    .flat_map(|x| match x {
      1 => observables::empty(),
      2 => observables::just(x),
      3 => ob().map(|x| (x + 100)),
      4 => observables::error(RxError::new(anyhow!("err"))),
      _ => observables::never(),
    })
    .map(|x| format!("{}", x))
    .on_error_resume_next(|e| ob().map(move |x| format!("resume {:} {}", e.error, x)))
    .subscribe(
      |x| {
        println!("next {}", x);
      },
      |e| {
        println!("error {:}", e.error);
      },
      || {
        println!("complete");
      },
    );

  thread::sleep(time::Duration::from_millis(500));
}

// next 2
// next 101
// next 102
// next 103
// next 104
// next resume err 1
// next resume err 2
// next resume err 3
// next resume err 4
// complete
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?