2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

rxcppで悩んだ時見るメモ書き的なもの

Last updated at Posted at 2021-12-14

この記事について

rxcpp を使っていて、悩んだ時に見るメモ的なものを適宜更新していこうと思います。
本家はこちら

で、久々に、本家を見てみたら、サンプルコードがメソッドチェーンじゃなくて operator | で繋げる形式になっていました。
随分前から rxjsobservable が肥大化しないために pipe() を使っていましたが、同じような流れですかね(後で調べよう)。(独自のオペレータを作る時なんか良いかもなので、時期を見て置き換えていこうかなぁ。。。

サンプル的なもの

この記事の検証で書いた、すぐ動くサンプルコード的なものは、こちらに置いておきます。

また、適宜、記事の途中でサンプルコードで関数名を記載しています。

void 的な値を発行したい

きっかけ作りのために値を発行したいことがあります。
その際に、発行する値は void にしたいところです。
しかし、void は型ですが、cpp の場合は単体で値としては存在できないので observable<void> はできません。
int とか bool とかで適当な値を発行しても良いのですが、後からコードを見ると、その値に意味があるのか無いのかがパッとみて分からなくなります。
そんな時は、空の型を定義してあげましょう。

unit型みたいなの
struct unit{};

デバッガがクラッシュしたり挙動不審になる

rxcpp の多くの機能が template によって実装されています。
そして、テンプレートパラメータのデフォルト引数が定義されているケースが多く、例えば just を使って observable<T> を期待していても、テンプレートパラメータがくっついてきます。

auto x = rxcpp::observable<>::just(1);
// x = rxcpp::observable<int, rxcpp::sources::detail::iterate<std::__1::array<int, 1UL>, rxcpp::identity_one_worker>>

上記のように単発であれば、まだマシですが、mapflat_map などのオペレータを連結していくと、かなり大変な事になります。

auto x = rxcpp::observable<>::just(1)
.flat_map([=](auto){
  return rxcpp::observable<>::just(true);
});
// rxcpp::observable<bool, rxcpp::operators::detail::flat_map<rxcpp::observable<int, rxcpp::sources::detail::iterate<std::__1::array<int, 1UL>, rxcpp::identity_one_worker>>, type, rxcpp::util::detail::take_at<1>, rxcpp::identity_one_worker>>

この時点で読む気が失せる感じですよね。まぁ、読む気が失せるだけなら良いのですが、実際、コンパイラが吐き出すデバッグシンボルが肥大化する原因になり、場合によってはデバッグシンボルを読み込んでいる最中にデバッガが死にます
全てのテンプレートパラメータは内部的には必要なものなのですが、(多くの場合)第1パラメータ以外は外部で使用することはありません。
そこで、不要なテンプレートパラメータを消去するというものが as_dynamic() です。

auto x = rxcpp::observable<>::just(1).as_dynamic();
// y = rxcpp::observable<int>
auto x = rxcpp::observable<>::just(1)
.flat_map([=](auto){
  return rxcpp::observable<>::just(true);
}).as_dynamic();
// x = rxcpp::observable<bool>

ちなみに、実装はいたって単純で、 observable<T> に変換しているだけです。

as_dynamic()の実装
template<class... AN>
observable<T> as_dynamic(AN**...) const {
  return *this;
}

ならば、逐一、 as_dynamic() を呼べば良いかというと、なかなかそうも行かず、この as_dynamic()overvable<T> のコピーコンストラクタが動作するため、キャストによる型変換とは異なりゼロコストでは無い点に注意が必要です。(コピー元の source を引き受けて、スケジューラをデフォルトで初期化するだけなので、言うほど高コストではないですが)

as_dynamic() が必須なケース

下記のように、関数の返却値で型を推論してもう場合、前述のとおり、型が異なるためエラーとなります。

.flat_map([=](bool b) {
  if(b) return rxcpp::observable<>::just(unit{});  // rxcpp::observable<unit, rxcpp::sources::detail::iterate<std::__1::array<unit, 1UL>, rxcpp::identity_one_worker>>
  else  return rxcpp::observable<>::never<unit>(); // rxcpp::observable<unit, rxcpp::sources::detail::never<unit>>
})

この場合、下記のように明示的に返却値の型を指定するか、as_dynamic() を使用します。

返却型を明示する
.flat_map([=](bool b) -> rxcpp::observable<unit> {
  if(b) return rxcpp::observable<>::just(unit{});
  else  return rxcpp::observable<>::never<unit>();
})
as_dynamic()で型を整える
.flat_map([=](bool b) {
  if(b) return rxcpp::observable<>::just(unit{}).as_dynamic();
  else  return rxcpp::observable<>::never<unit>().as_dynamic();
})

どちらもコスト面は変わらないので、個人的には as_dynamic() を使うことが多いです。

データを取りこぼす

例えば、下記の some_state のように、 start() を実行することで、値を発行し始めるような hot observable があったとします。

struct some_state {
private:
  rxcpp::subjects::subject<int> sbj_;
  rxcpp::subscription sbs_;
public:
  some_state() = default;
  ~some_state() = default;
  auto observable() { return sbj_.get_observable(); }
  void start(){
    sbj_.get_subscriber().on_next(0);
    sbs_ = rxcpp::observable<>::interval(std::chrono::milliseconds(200), rxcpp::observe_on_new_thread())
    .tap([=](int n){
      sbj_.get_subscriber().on_next(n);
    })
    .subscribe();
  }
  void end() {
    sbj_.get_subscriber().on_completed();
    sbs_.unsubscribe();
  }
};

こうなると、「start()observable() を何とかまとめられないか」と考えるのが世の常です。
すると、こんなコードを書くと見事に罠にハマります。(サンプルコード test_3_1()

取りこぼす可能性がある
auto fn = [](std::shared_ptr<some_state> state){
  state->start();
  return state->observable();
};

これだと、start() を呼び出した際に、既に値が発行されてしまうため、俗に云うデータの取りこぼしが発生します。
このようなケースでは、仕掛けて(subscribeして)から引き金を引く(startする) という流れは常に意識する必要があります。
そこで、下記のような関数を用意しておくと便利です。

:ready_set_go
template <typename T> rxcpp::observable<T> ready_set_go(std::function<void()> f, rxcpp::observable<T> o) {
  return rxcpp::observable<>::create<T>([f, o](rxcpp::subscriber<T> s){
    o.subscribe([s](const T& v){
      s.on_next(v);
    }, [s](std::exception_ptr err){
      s.on_error(err);
    }, [s](){
      s.on_completed();
    });
    try{
      f();
    }
    catch(std::exception& err){
      s.on_error(std::make_exception_ptr(err));
    }
  });
}

そうすると、先程のコードは下記のようになります。(サンプルコード test_3_2()

取りこぼさない
auto fn = [](std::shared_ptr<some_state> state){
  return ready_set_go([state](){
    state->start();
  }, state->observable());
};

特定条件まで処理を繰り返したい

例えば、非同期の API 的なものがあって、応答が observable になっているものがあったとします。
そして、 call()cold observable で、api の結果を値として1個発行して complete するものとします。

enum class result { success, failure };
struct some_api {
  auto call() -> rxcpp::observable<result>;
};

この some_api::call()result::success となるまで待つというような場合の実装例と注意点を挙げておきます。
注意点としては、 call()cold observable なので、リトライ時でも改めて call() を呼び出す必要があります。つまり、先頭を call() から始めると、call() は一度しか呼ばれず、その次のメソッドチェーンからリトライ(ループ)されてしまい、意図した動作になりません。

retry() を使う

retry() を誘発するエラーなのか、subscribe() に透過させるエラーなのかを適切に処理する必要がありますが、このコードでは割愛しています。(つまり、このままだとヨロシクナイ状態です。)

test_1_1抜粋
auto api = some_api{};

api->call()
.map([=](result x){
  if(x == result::failure) throw 0;
  return unit{};
}).as_dynamic()
.retry()
.take(1)
.subscribe(...)

never() を使う

never() を使う場合、値発行だけではなく complete さえもしないことに注意が必要です。

test_1_2抜粋
auto api = std::make_shared<some_api>(5);
auto sbj = rxcpp::subjects::subject<unit>();

ready_set_go([=](){
  sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
  return api->call();
}).as_dynamic()
.flat_map([=](result x){
  scope_counter sc;
  if(x == result::failure){
    sbj.get_subscriber().on_next(unit{});
    return rxcpp::observable<>::never<unit>().as_dynamic();
  }
  else{
    /**
     * ここで sbj を complete させるべく
     * sbj.get_subscriber().on_completed();
     * を実行しても、この flat_map が never を返却していると、
     * もはや、全ての observable を complete することが「絶対に不可能」なので、
     * 適宜、後段で take() することで、前段の購読完了の意思表示をしなければ
     * subscribe で complete が呼び出されないので注意。
     **/
    return rxcpp::observable<>::just(unit{}).as_dynamic();
  }
}).as_dynamic()
.take(1) /** 明示的な購読終了の意思表示をしないと、全てが complete しないので注意 */
.subscribe(...)
/**
 * ちなみに、 take(1) が無くても、この関数は終了するが
 * その場合、 subscription は終了していない状態で、この関数を抜けるため
 * subscription のデストラクタで unsubscribe されているだけということに注意が必要
 **/

skip_while を使う

このケースでは、似たような判定が重複して登場するため、無理矢理感が否めないですが。

test_1_3抜粋
auto api = std::make_shared<some_api>(5);
auto sbj = rxcpp::subjects::subject<unit>();

ready_set_go([=](){
  sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
  return api->call();
}).as_dynamic()
.tap([=](result x){
  if(x == result::failure){
    sbj.get_subscriber().on_next(unit{});
  }
  else{
    sbj.get_subscriber().on_completed();
  }
})
.skip_while([=](result x){
  return x == result::failure;
}).as_dynamic()
.take(1)
.subscribe(...)

observable 作成する

observable::create 内で result::failure 時に自身の completed を忘れると、 never と同じことになり、全体の subscribecomplete しないので注意が必要です。

test_1_4抜粋
auto api = some_api{};
auto sbj = rxcpp::subjects::subject<unit>();
ready_set_go([=](){
  sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
  return api->call();
}).as_dynamic()
.flat_map([=](result x){
  return rxcpp::observable<>::create<unit>([=](rxcpp::subscriber<unit> s){
    if(x == result::failure){
      sbj.get_subscriber().on_next(unit{});
      s.on_completed(); /** 忘れがち! */
    }
    else {
      s.on_next(unit{});
      s.on_completed();
      sbj.get_subscriber().on_completed();
    }
  });
}).as_dynamic()
.subscribe(...)

as_blocking を使ってループする

as_blocking() で堰き止めて、while で期待する値が得られるまでループする方法です。
この方法はスタックが深くならないし、コードがシンプルというメリットはあります。
しかし、このループは期待する値が得られるまで誰も止めることができません。
全体を unsubscribe してもループは動き続けてしまうので、何らかの条件でループを抜ける仕組みを入れてあげる必要があります。
この辺をケアすると複雑になるかもです。
また、ループがメインスレッドで実行されると、メインスレッドが call() による値の発行を待ってしまうので、実行するならワーカスレッドを使いましょう。
あと、call()が値を発行せず complete するケースがあるなら、first() みたいに手を抜かず、subscribeしましょう。

test_1_5
rxcpp::observable<>::create<unit>([](rxcpp::subscriber<unit> s){
  try{
    some_api api(5);
    auto results = api.call().as_blocking();
    while(results.first() == result::failure) {}
    s.on_next(unit{});
    s.on_completed();
  }
  catch(std::exception& e){
    s.on_error(std::make_exception_ptr(e));
  }
})
.subscribe(...)

注意点

  1. call() が、何らかのシステムコール的なAPIを使用して、その completion handler 的な関数から値が発行されている場合、call()以降のメソッドチェーンは、その completion handler の中で実行されることになります。つまり、そのまま再度 call() を呼び出すと、システムコールが再入することになるためエラーやクラッシュする可能性がありあます。そこで、call() の後段で適宜 observe_on を使用して後段処理を別スレッドに逃して completion handler の処理を戻してあげましょう。
  2. 仮に非同期 API だけど、特定のエラーなどで同期的に(同一スレッドで)値を発行した場合、どの手段を使ってもループ処理は再帰的に実行されます。数回なら許容されるかもですが、数十回とかになるとスタックオーバフローが発生しクラッシュします。そこで、ループを発生させるコードに observe_on を入れてスタックを消費しないよう心がけましょう。(サンプルコード test_2_1()test_2_2())ただし、実行速度とのトレードオフがあるため、やたらめったらobserve_onを入れるのはやめましょう。(この点「retry はどこから retry するのか?」も参考にしてください)
  3. test_1_1() から test_1_4() はどれも subscribe で 1回だけ next が呼び出され、complete するのですが、どの例も値の発行タイミングや判断処理のスコープが微妙に異なります。実際に動作させてログを見てみるとわかりやすいと思います。

スケジューラを作りたい

std::async を使った簡単な scheduler を例示しておきます。
スレッドプールや、特定スレッドで実行させたい場合なんかに std::async を適宜置き換えると良いかと思います。
例えば、std::asyncboost::asio::io_contextpost() するとか、 std::this_thread::sleep_until()boost::asio::deadline_timer にする感じで使っています。

class async_scheduler : public rxcpp::schedulers::scheduler_interface {
private:
  class async_worker : public rxcpp::schedulers::worker_interface {
  private:
    rxcpp::composite_subscription m_lifetime;
  public:
    explicit async_worker(rxcpp::composite_subscription cs) : m_lifetime(cs) {}
    virtual ~async_worker() { m_lifetime.unsubscribe(); }
    virtual clock_type::time_point now() const override { return clock_type::now(); }
    virtual void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
      schedule(now(), scbl);
    }
    virtual void schedule(rxcpp::schedulers::scheduler_interface::clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
      if (scbl.is_subscribed()) {
        auto THIS = shared_from_this();
        std::async(std::launch::async, [THIS, scbl, when](){
          std::this_thread::sleep_until(when);
          scbl(rxcpp::schedulers::recursion(true).get_recurse());
        });
      }
    }
  };
public:
  async_scheduler() = default;
  virtual ~async_scheduler() = default;
  virtual rxcpp::schedulers::scheduler_interface::clock_type::time_point now() const override {
    return rxcpp::schedulers::scheduler_interface::clock_type::now();
  }
  virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
    return rxcpp::schedulers::worker(cs, std::make_shared<async_worker>(cs));
  }
};

rxcpp::observe_on_one_worker observe_on_async() {
  return rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<async_scheduler>());
}

retry はどこから retry するのか?

下記のコードで確認しましょう。

test_4_1
void test_4_1() {
  /** 関数の入退出を確認 */
  struct scope {
    std::string name_;
    scope(const std::string& name) : name_(name) {
      std::cout << "enter:" << name_ << std::endl;
    }
    scope(const std::string& name, int n) {
      std::stringstream ss;
      ss << name << " (" << n << ")";
      name_ = ss.str();
      std::cout << "enter:" << name_ << std::endl;
    }
    ~scope() {
      std::cout << "leave:" << name_ << std::endl;
    }
  };

  int counter = 0;  /** fn() 内の observable で活躍するカウンタ */

  /**
   * subscribe 毎にインクリメントされた値を1個だけ発行する
   * cold observable を返却する関数
   **/
  auto fn = [&counter]() {
    scope fns("fn()");
    return rxcpp::observable<>::create<int>([&counter](rxcpp::subscriber<int> s){
      scope fnos("observable 1st", counter);
      s.on_next(counter++); /** counter を加算して発行 */
      s.on_completed();
    })
    .flat_map([](int n){
      scope fns("flat_map");
      return rxcpp::observable<>::create<int>([n](rxcpp::subscriber<int> s){
        scope fnos("observable 2nd", n);
        s.on_next(n); /** 受け取った数値(counter) をそのまま発行 */
        s.on_completed();
      });
    });
  };

  fn()
  .map([=](int x){
    std::cout << x << std::endl;
    if(x == 2) return unit{}; /** その後 take(1) で終了 */
    std::cout << "throw" << std::endl;
    throw 0;  /** retry を誘発 */
  })
  .retry()
  .take(1)
  .subscribe([=](unit){
    std::cout << "on next" << std::endl;
  }, [=](std::exception_ptr){
    std::cout << "on error" << std::endl;
  }, [=](){
    std::cout << "on complete" << std::endl;
  });
}
結果
enter:fn()
leave:fn()
enter:observable 1st (0)
enter:flat_map
leave:flat_map
enter:observable 2nd (0)
0
throw
enter:observable 1st (1)
enter:flat_map
leave:flat_map
enter:observable 2nd (1)
1
throw
enter:observable 1st (2)
enter:flat_map
leave:flat_map
enter:observable 2nd (2)
2
on next
on complete
leave:observable 2nd (2)
leave:observable 1st (2)
leave:observable 2nd (1)
leave:observable 1st (1)
leave:observable 2nd (0)
leave:observable 1st (0)

源流であるfn() が返却するobservableを再度観測するという流れになります。
retry() を契機にfn() が呼び出される訳じゃない点に注意です。
なお、fn() 内の observable で実行されているコードが complete 後に関数から抜けている点にも注目してください。(これがスタックオーバーフローの原因になり得ます)

ちなみに、サンプルコード test_4_2() では、fn() の直後に observe_on でスレッドを切り替えてみました。
その結果はこちらです。

fn()直後にスレッドを切り替える
enter:fn()
leave:fn()
enter:observable 1st (0)
enter:flat_map
leave:flat_map
enter:observable 2nd (0)
leave:observable 2nd (0)
leave:observable 1st (0)
0
throw
enter:observable 1st (1)
enter:flat_map
leave:flat_map
enter:observable 2nd (1)
leave:observable 2nd (1)
leave:observable 1st (1)
1
throw
enter:observable 1st (2)
enter:flat_map
leave:flat_map
enter:observable 2nd (2)
leave:observable 2nd (2)
2
on next
leave:observable 1st (2)
on complete

マルチスレッドでオペレータの挙動が怪しい(take(1)なのに復数の値が発行されてしまう症状など)

下記のコードは、マルチスレッドで値の発行とエラーをほぼ同時に発行するためのテストプログラムです。

test_5_3
void test_5_3()
{
  for(auto i = 1;; i++){
    std::cout << std::endl << "========== " << i << "==========" << std::endl; 
    auto count = std::make_shared<std::atomic_int>(0);
    auto mtx = std::make_shared<sem<1>>();
    auto sbsc = rxcpp::observable<>::interval(std::chrono::milliseconds(100), rxcpp::observe_on_new_thread())
    .observe_on(rxcpp::observe_on_new_thread())
    .flat_map([=](int x){
      std::cout << std::this_thread::get_id() << " : " << x << " : enter" << std::endl;
      if(x == 1){ 
        mtx->lock();
        return rxcpp::observable<>::just(unit{})
        .observe_on(rxcpp::observe_on_new_thread())
        .flat_map([=](unit){
          std::cout << std::this_thread::get_id() << " : " << x << " : wait for get semaphore" << std::endl;
          mtx->lock();
          std::cout << std::this_thread::get_id() << " : " << x << " : throw" << std::endl;
          return rxcpp::observable<>::error<int>(std::make_exception_ptr(std::exception()));
        }).as_dynamic();
      }
      std::cout << std::this_thread::get_id() << " : " << x << " : unlock semaphore" << std::endl;
      mtx->unlock();
      std::this_thread::yield();
      std::cout << std::this_thread::get_id() << " : " << x << " : emit" << std::endl;
      return rxcpp::observable<>::just(x).as_dynamic();
    })
    .take(1)
    .observe_on(rxcpp::observe_on_new_thread())
    .on_error_resume_next([=](std::exception_ptr e){
      std::cout << std::this_thread::get_id() << " : on_error_resume_next" << std::endl;
      return rxcpp::observable<>::just(unit{})
      .observe_on(rxcpp::observe_on_new_thread())
      .flat_map([=](unit){
        std::cout << std::this_thread::get_id() << " : on_error_resume_next: emit" << std::endl;
        return rxcpp::observable<>::just(1);
      });
    })
    .subscribe([=](int x){
      (*count)++;
      std::cout << std::this_thread::get_id() << " : on next " << x << std::endl;
    }, [=](std::exception_ptr){
      std::cout << std::this_thread::get_id() << " : on error" << std::endl;
    }, [=](){
      std::cout << std::this_thread::get_id() << " : on complete" << std::endl;
    });
    
    while(sbsc.is_subscribed()) {}

    std::cout << "----------> next count = " << *count << std::endl;
    assert(*count == 1);
  }
}

同期オブジェクトの sem<1> は自作の semaphore で共有資源が1個の所謂 binary semaphore です。(lockした同期オブジェクトが別スレッドでunlockしているためmutexは使えません)
で、なんだかゴチャゴチャ書いていますが、ざっと説明するとこんな感じです。

  1. interval を使って 100ms 毎に値を発行
  2. 初回の値でsemaphoreを取得した後、再度semaphore取得するため解放されるまで待つ。(...A)
  3. 2回目の値でsemaphoreを解放(...B)しつつ、値を発行(...C)
  4. (A) の解放待ちが、(B)により終了したので、エラーを発行する(...D)
  5. (C)と(D)がほぼ同時で take() オペレータに突入する
  6. エラーの場合、そのエラーは揉み消して、下流には値(1という数値)を発行する
  7. subscribe で観測された値の個数を記録する(正常なら1になる筈です)

take() に限らず、どんなオペレータも基本は下記のルールになります。

  • 上流をsubscribeする
  • 下流にnexterrorcomplete を発行する
  • 下流にerrorcomplete を発行した場合は、上流を unsubscribe して、自身は終了状態になる

従って、「値が先行」「エラーが先行」した場合、下記のようなログになります。

値が先行した場合
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000eb4a000 : 2 : emit
0x70000ec50000 : 1 : throw
0x70000eac7000 : on next 2
0x70000eac7000 : on complete
----------> next count = 1
エラーが先行した場合
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000ec50000 : 1 : throw
0x70000eac7000 : on_error_resume_next
0x70000eb4a000 : 2 : emit
0x70000ebcd000 : on_error_resume_next: emit
0x70000ebcd000 : on next 1
0x70000ebcd000 : on complete
----------> next count = 1

なるほど、ちゃんと動いてる的な、とても安心感があるログですね。
ところが、マルチスレッド下においてtake()オペレータはブロッキングしません。
つまり、上流からの何らかの発行を受け下流に何かを発行している最中でも、自身が終了状態でなければ、上流からの発行を受け付けてしまいます。
そのような自体が発生した際のログは下記のようになります。

同時に発生した場合
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000ec50000 : 1 : throw
0x70000eb4a000 : 2 : emit
0x70000eac7000 : on next 2
0x70000eac7000 : on_error_resume_next
0x70000eb4a000 : on_error_resume_next: emit
0x70000eb4a000 : on next 1
0x70000eb4a000 : on complete
----------> next count = 2

マルチコアのCPUだと、結構な頻度でこの手の「抜け」が発生します。
この場合、mapなどのオペレータとsemaphoreを使ってブロックすることも可能ですが、もっと簡単な方法があります。
それは、このtake()オペレータを同一のスレッドで待ち合わせる方法です。
具体的には take() オペレータの手前で下記のコードを追加するだけです。

.observe_on(rxcpp::observe_on_new_thread())  /** <-- 追加 */
.take(1)

殆どのオペレータは、エラーが発生した場合、オペレータの機能が無くなり、エラーを下流へ透過します。
例えば delay() オペレータの上流でエラーが発生しても、そのエラーの発行を遅延させることなく下流へエラーを発行します。(同時に上流はunsubscribeします)
しかし、observe_on()は特殊です1observe_on()はエラーの場合でも、そのまま下流にエラーを発行することなく、本来の機能を維持します。つまり、observe_on()で指定したスケジューラでエラーが発行されます。(もちろん同時に上流はunsubscribeされます)

従って、上記のようにobserve_on()を使用して、待ち合わせ用のスケジューラを指定することで問題は回避されます。

当たり前ですが、observe_on()で指定するスケジューラにスレッドプールを使わないでください2。スレッドプールを指定したら、結果並列処理になる可能性があるため、待ち合わせになりません。(スレッドを1個だけ抱えているスレッドプールなら別ですが)

サンプルコード test_5_3() は値とエラーの例ですが、 test_5_4() は値を2つ同時に発行する例になります。そして、値を2つ同時に発行する場合は take(1) なのに値が2個発行される事態になります。

スケジューラの実行タイミングがよく分からなくなった

observe_on は分かりやすいかと思うのですが、 subscribe_on が分からなくて、結果グチャグチャになるケースがあるので、スケジューラが切り替わる状況を可視化して具体的に纏めてみようと思います。

準備

下記のような簡単な機能を実装しました。

デバッグ用のスケジューラ

デバッグ用のスケジューラを作りました。
スケジューラと言っても、ログを記録するだけで、内部は同期処理にしています。
ログに [[schedule]] と入っている箇所はスケジューラが切り替わったことを示しています。
非同期の場合は、どこで指定されたスケジューラで実行されているのかを把握するには良いかと思います。
また、 {} を使って期間を示すようにしました。

debug_scheduler抜粋
class debug_worker : public rxcpp::schedulers::worker_interface {
  virtual void schedule(rxcpp::schedulers::scheduler_interface::clock_type::time_point when, const 
  rxcpp::schedulers::schedulable& scbl) const override {
    if (scbl.is_subscribed()) {
      std::cout << "[[schedule]] " << m_name << " {" << std::endl;
      scbl(rxcpp::schedulers::recursion(true).get_recurse());
      std::cout << "[[schedule]] " << m_name << " }" << std::endl;
    }
  }
};

値を1個だけ発行する observable

nextcomplete を実行している期間を{} を使って表現しました。

rxcpp::observable<unit> obs(std::string name = std::string()) {
  return rxcpp::observable<>::create<unit>([name](rxcpp::subscriber<unit> s){
    std::cout << name << "emit next" << " {" << std::endl;
    s.on_next(unit{});
    std::cout << name << "emit next" << " }" << std::endl;
    std::cout << name << "emit complete" << " {" << std::endl;
    s.on_completed();
    std::cout << name << "emit complete" << " }" << std::endl;
  });
}

6-1. スケジューラ指定なし

スケジューラを使用しないケースです。

test_6_1
void test_6_1()
{
  obs()
  .subscribe(
    [](unit){ std::cout << "on next" << std::endl; },
    [](std::exception_ptr){ std::cout << "on error" << std::endl; },
    [](){ std::cout << "on complete" << std::endl; }
  );
}
結果
emit next {
  on next
emit next }
emit complete {
  on complete
emit complete }

サンプルコードではインデントされていませんが、分かりやすいように手動で加工しました。
なお、以降は subscribe(...) と省略します。

6-2. observe_on

test_6_2
void test_6_2(){
  obs()
  .observe_on(observe_on_debug("observe_on"))
  .subscribe(...);
}
結果
emit next {
  [[schedule]] observe_on {
    on next
  [[schedule]] observe_on }
emit next }
emit complete {
  [[schedule]] observe_on {
    on complete
  [[schedule]] observe_on }
emit complete }

observe_on の効果は、指定子たスケジューラで下流を実行することです。
この例では、obs() の下流に当たるsubscribe()内の on nexton completeobserve_on で指定したスケジューラで実行されていることが分かります。

あ、あと、次の 6-3 と比較ですが、6-2obs()はメインスレッド(正確に言うとsubscribeを実行したスレッド)で実行されます。これは何を意味するかというと、obs()がメインスレッドで実行されるため、仮にobserve_onで非同期スケジューラを使用しスレッド切替が発生したとしても、obs()completeするまで、メインスレッドはブロックされます。

6-3. subscribe_on

test_6_3
void test_6_3(){
  obs()
  .subscribe_on(observe_on_debug("subscribe_on"))
  .subscribe(...);
}
結果
[[schedule]] subscribe_on {
  emit next {
    on next
  emit next }
  emit complete {
    on complete
    [[schedule]] subscribe_on {
    [[schedule]] subscribe_on }
  emit complete }
[[schedule]] subscribe_on }

subscribe_on もスケジューラを要求するので observe_on_debug を設定しています。
subscribe_on の効果は、 subscribe した時に先頭の observable について subscribe_on で指定したスケジューラで実行します。

6-2とは対象的で、仮に subscribe_on で非同期スケジューラを使用した場合、メインスレッドはブロッキングされません。

6-4. observe_on → observe_on

多くのオペレータにはスケジューラを設定することができます。
よく使う手として、justオペレータにスケジューラを指定して頭出しをするなんてことをやります。

test_6_4
void test_6_4(){
  rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
  .flat_map([](unit){
    return obs();
  })
  .observe_on(observe_on_debug("observe_on"))
  .subscribe(...);
}
結果
[[schedule]] just {
  emit next {
    [[schedule]] observe_on {
      on next
    [[schedule]] observe_on }
  emit next }
  emit complete {
  emit complete }
  [[schedule]] observe_on {
    on complete
  [[schedule]] observe_on }
[[schedule]] just }

justが指定したスケジューラで実行されるため、test_6_3と同様に、メインスレッドをブロックすることはありません。(test_6_2とも比較しましょう)
あ、あと emit completeon complete がずれている点ですが、これは emit completeflat_map オペレータに対して発行され、flat_mapは上流のcompleteを受けて(この時点でemit completeの役目は終わり制御を戻します)、下流にcompleteを発行します。最後に、observe_on オペレータが上流の complete を受けて、下流に complete を発行します。そして、下流に complete を発行する時は指定されたスケジューラで実行する流れになります。

6-5. observe_on → subscribe_on

test_6_5
void test_6_5(){
  rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
  .flat_map([](unit){
    return obs();
  })
  .subscribe_on(observe_on_debug("subscribe_on"))
  .subscribe(...);
}
結果
[[schedule]] subscribe_on {
  [[schedule]] just {
    emit next {
      on next
    emit next }
    emit complete {
    emit complete }
    on complete
    [[schedule]] subscribe_on {
    [[schedule]] subscribe_on }
  [[schedule]] just }
[[schedule]] subscribe_on }

ちょっとややこしくなってきました。
しかし、 subscribe_onobserve_on はこれまでの説明通りの動作になっています。
注意点は subscribe_onsubscribe のスケジューラを指定するのではありません。
subscribe()を実行する時の先頭の observable の実行スケジューラを指定するものです。

6-6. observe_on → observe_on → observe_on

test_6_6
void test_6_6(){
  rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
  .observe_on(observe_on_debug("observe_on #1"))
  .flat_map([](unit){
    return obs();
  })
  .observe_on(observe_on_debug("observe_on #2"))
  .subscribe(...);
}
結果
[[schedule]] just {
  [[schedule]] observe_on #1 {
    emit next {
      [[schedule]] observe_on #2 {
        on next
      [[schedule]] observe_on #2 }
    emit next }
    emit complete {
    emit complete }
  [[schedule]] observe_on #1 }
  [[schedule]] observe_on #1 {
    [[schedule]] observe_on #2 {
      on complete
    [[schedule]] observe_on #2 }
  [[schedule]] observe_on #1 }
[[schedule]] just }

インデントが深くなっていますが、そんなに難しくは無いですね。

6-7. observe_on → observe_on → subscribe_on

test_6_7
void test_6_7(){
  rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
  .observe_on(observe_on_debug("observe_on"))
  .flat_map([](unit){
    return obs();
  })
  .subscribe_on(observe_on_debug("subscribe_on"))
  .subscribe(...);
}
結果
[[schedule]] subscribe_on {
  [[schedule]] just {
    [[schedule]] observe_on {
      emit next {
        on next
      emit next }
      emit complete {
      emit complete }
    [[schedule]] observe_on }
    [[schedule]] observe_on {
      on complete
      [[schedule]] subscribe_on {
      [[schedule]] subscribe_on }
    [[schedule]] observe_on }
  [[schedule]] just }
[[schedule]] subscribe_on }

subscribe_on が登場してますが、もう大丈夫ですよね。

6-8. subscribe_on → observe_on → subscribe_on

では、subscribe_on が復数登場した場合、どうなるでしょうか?

test_6_8
void test_6_8(){
  obs()
  .subscribe_on(observe_on_debug("subscribe_on #1"))
  .observe_on(observe_on_debug("observe_on"))
  .subscribe_on(observe_on_debug("subscribe_on #2"))
  .subscribe(
    [=](unit){ std::cout << "on next" << std::endl; },
    [=](std::exception_ptr){ std::cout << "on error" << std::endl; },
    [=](){ std::cout << "on complete" << std::endl; }
  );
}
結果
[[schedule]] subscribe_on #2 {
  [[schedule]] subscribe_on #1 {
    emit next {
      [[schedule]] observe_on {
        on next
      [[schedule]] observe_on }
    emit next }
    emit complete {
      [[schedule]] observe_on {
        on complete
        [[schedule]] subscribe_on #2 {
        [[schedule]] subscribe_on #2 }
        [[schedule]] subscribe_on #1 {
        [[schedule]] subscribe_on #1 }
      [[schedule]] observe_on }
    emit complete }
  [[schedule]] subscribe_on #1 }
[[schedule]] subscribe_on #2 }

こうなってしまいます。
これで幸せになる方法が分からないのですが。。。
使うケースあるのかな??

6-9. subscribe_on → subscribe_on

では、subscribe_onが復数存在しても意味が無いかというと、そういう訳ではありません。

test_6_9
void test_6_9(){
  obs("(A) ")
  .flat_map([](unit){
    return obs("(B) ")
    .subscribe_on(observe_on_debug("subscribe_on #1"));
  })
  .subscribe_on(observe_on_debug("subscribe_on #2"))
  .subscribe(
    [](unit){ std::cout << "on next" << std::endl; },
    [](std::exception_ptr){ std::cout << "on error" << std::endl; },
    [](){ std::cout << "on complete" << std::endl; }
  );
}
結果
[[schedule]] subscribe_on #2 {
  (A) emit next {
    [[schedule]] subscribe_on #1 {
      (B) emit next {
        on next
      (B) emit next }
      (B) emit complete {
        [[schedule]] subscribe_on #1 {
        [[schedule]] subscribe_on #1 }
      (B) emit complete }
    [[schedule]] subscribe_on #1 }
  (A) emit next }
    (A) emit complete {
      on complete
      [[schedule]] subscribe_on #2 {
      [[schedule]] subscribe_on #2 }
    (A) emit complete }
[[schedule]] subscribe_on #2 }

observableがカスケードされている場合、subscribe_on はそのスコープ内の先頭のobservableの実行スケジューラを指定することになります。
subscribe() 内の nextcomplete の実行スケジューラが指定されている訳ではありません。(上記の例では、on nextsubscribe_on #1on completesubscribe_on #2で実行されています)
もし、subscribe() 内の実行スケジューラを指定したい場合、subscribe() の手前で observe_on を使用する必要があります。

6-10. observe_on → observe_on

6-10 のように先頭の observable の実行スケジューラを指定したいのであれば、 just で頭出しする方法もあります。

test_6_10
void test_6_10(){
  rxcpp::observable<>::just(unit{}, observe_on_debug("observe_on #2"))
  .flat_map([](unit){
    return obs("(A) ");
  })
  .flat_map([](unit){
    return rxcpp::observable<>::just(unit{}, observe_on_debug("observe_on #1"))
    .flat_map([](unit){
      return obs("(B) ");
    });
  })
  .subscribe(
    [](unit){ std::cout << "on next" << std::endl; },
    [](std::exception_ptr){ std::cout << "on error" << std::endl; },
    [](){ std::cout << "on complete" << std::endl; }
  );
}
結果
[[schedule]] observe_on #2 {
  (A) emit next {
    [[schedule]] observe_on #1 {
      (B) emit next {
        on next
      (B) emit next }
      (B) emit complete {
      (B) emit complete }
    [[schedule]] observe_on #1 }
  (A) emit next }
  (A) emit complete {
  (A) emit complete }
  on complete
[[schedule]] observe_on #2 }

subscribe_on より、justで (または直前にobserve_onで指定)頭出ししてスケジューリングする方が、個人的には分かりやすいし、スケジューラの切替が少ないので、こちらを使用するケースが多いです。

retry_when 的なことがしたい

現時点で rxcpp には retry_when なるオペレータは存在しません。
繰り返し処理には retry オペレータは便利なのですが、再試行すべきエラーか下流に発行すべきエラーかを問わず、このretryオペレータはエラーとなれば再試行します。
そこで、値と下流に発行すべきエラーをラップする方法があります。

something クラス

この something<T> クラスは、jsut<T>またはerror<T>を保有するクラスです。
そして、この something を下流に値にして発行することで、 retry をパスしようと考えています。

something
template<typename T = void> class something;

template <> class something<void> {
private:
  class retry_trigger : public std::exception {};
public:
  [[noreturn]] static void retry() {
    throw retry_trigger{};
  }
  template<typename T, typename TT = typename std::remove_reference<T>::type>
  static something<TT> success(T&& v) {
    return something<TT>(
      rxcpp::observable<>::just(std::forward<T>(v))
    );
  }
  template<typename T> static something<T> error(std::exception_ptr err) {
    return something<T>(
      rxcpp::observable<>::error<T>(err)
    );
  }
};

template<typename T> class something {
friend class something<>;
private:
  rxcpp::observable<T> o_;
  something(rxcpp::observable<T> o) : o_(o) {}
public:
  rxcpp::observable<T> proceed() { return o_; }
};

使用方法

使用方法といっても、そんなに大それたものでもないので、コメントを参照してください。

test_7_1
void test_7_1() {
  some_api api(5);

  auto sbsc = api.call()
  .map([=](result r){
    if(r == result::success) {
      std::cout << "success" << std::endl;
      return something<>::success(r); /* 正常 */
    }
    std::cout << "failure -> retry" << std::endl;
    something<>::retry(); /* retryオペレータを反応させます */
  })
  .on_error_resume_next([](std::exception_ptr err){
    /**
     * このサンプルコードでは発生することはないのですが、
     * 例えば、再試行させたくないエラーを、ここでsomethingとして値を発行します。
     * そして、retryの後段で flat_map して下流にエラーを発行します。
     */
    return rxcpp::observable<>::just(something<>::error<result>(err));    
  })
  .retry()
  .flat_map([](something<result> o){
    /** ここで just<result> から error<result> に接続されます */
    return o.proceed();
  })
  .subscribe(...);
}

例えば、再試行回数が2回の場合こんな感じになります。

test_7_2
void test_7_2() {
  some_api api(5);
  auto error_count = std::make_shared<int>(0);

  auto sbsc = api.call()
  .map([=](result r){
    if(r == result::failure) {
      *error_count = *error_count + 1;
      std::cout << "error count " << *error_count << std::endl;
      if(*error_count == 2){
        std::cout << "pass the error" << std::endl;
        return something<>::error<result>(std::make_exception_ptr(std::exception()));
      }
    }
    std::cout << "failure -> retry" << std::endl;
    something<>::retry();
  })
  .retry()
  .flat_map([](something<result> o){
    return o.proceed();
  })
  .subscribe(...);
}

手軽な感じではないですが、retry_when より細かい制御が入る場合、案外取り回しが楽だったりします。

並列実行してるのにパフォーマンスが出ていないような気がする

並列実行はパフォーマンスを上げたり、CPUリソースを積極的に解放したりするための重要なテクニックですが、あまり過度に並列数を増やすと弊害 3 も出てきます。
例えば、100個のURLがあって、各URLの情報をWebClient的な非同期関数を使ってGETして、HTTPのレスポンスを取得する場合などです。

こんなケース
rxcpp::observable<>::iterate(urls) /* urls ... url のリスト */
.flat_map([=](const URL& url){
  return http_get(url);  
})
.subscribe(...)

この場合、urls から1個ずつ url を取り出して http_get() を呼び出すのですが、http_get() が非同期APIだとすると http_get() を呼び出した直後に次の url が発行されて、対向サーバーからの応答待ちの http_get() が大量に貯留する可能性があります。
I/O待ちなのでCPUリソースは消費していないのですが、ネットワークドライバがてんやわんやな状態(輻輳状態)になります。
まぁまぁ、これはこれで動いたりするのですが、往々にして1個ずつ urlhttp_get() で呼び出した方が、復数のurlをガツンと http_get() で呼び出すよりもパフォーマンスが上がる場合があります。

そこで、http_get()を並列実行する個数を制限するために inflow_restriction クラスを用意しました。

inflow_restriction
template <int N>
class inflow_restriction {
private:
  sem<N> m_sem;

public:
  inflow_restriction() = default;
  inflow_restriction(const inflow_restriction&) = delete;
  ~inflow_restriction() = default;

  template <typename T>
  rxcpp::observable<T> enter(rxcpp::observable<T> o){
    return rxcpp::observable<>::create<T>([=](rxcpp::subscriber<T> s){
      m_sem.lock();
      o.subscribe([=](T v){
        s.on_next(v);
      }, [=](std::exception_ptr e){
        s.on_error(e);
        m_sem.unlock();
      }, [=](){
        s.on_completed();
        m_sem.unlock();
      });
    });
  }
};

同期オブジェクト sem<N> で並列数を N 個として、N個の共有資源を奪い合うことで並列数の制限を行います。
なお、そのうち使うことになるであろう std::counting_semaphore を想定して、並列数をテンプレートパラメータにしました。

並列数を4とした場合
auto ifr = std::make_shared<inflow_restriction<4>>();
rxcpp::observable<>::iterate(urls) /* urls ... url のリスト */
.flat_map([=](const URL& url){
  return ifr->enter(http_get(url)); /* 同時に4つまでしから subscribe しない */
})
.subscribe(...)

サンプルコードは test_8_1()test_8_2() になります。

  1. observe_on() は下流の実行するスケジューラを指定するので、特殊というより当たり前で、ある意味何もやっていないとも言えます。

  2. 具体的にはスケジューラに実行依頼が入った際に毎回スレッドが切り替わるようなケースです。

  3. 並列数を増やし過ぎるとオーバーヘッドが大きくなりパフォーマンスが低下します。(並列スローダウン

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?