この記事について
rxcpp
を使っていて、悩んだ時に見るメモ的なものを適宜更新していこうと思います。
本家はこちら
で、久々に、本家を見てみたら、サンプルコードがメソッドチェーンじゃなくて operator |
で繋げる形式になっていました。
随分前から rxjs
は observable
が肥大化しないために pipe()
を使っていましたが、同じような流れですかね(後で調べよう)。(独自のオペレータを作る時なんか良いかもなので、時期を見て置き換えていこうかなぁ。。。
サンプル的なもの
この記事の検証で書いた、すぐ動くサンプルコード的なものは、こちらに置いておきます。
また、適宜、記事の途中でサンプルコード
で関数名を記載しています。
void
的な値を発行したい
きっかけ作りのために値を発行したいことがあります。
その際に、発行する値は void
にしたいところです。
しかし、void
は型ですが、cpp
の場合は単体で値としては存在できないので observable<void>
はできません。
int
とか bool
とかで適当な値を発行しても良いのですが、後からコードを見ると、その値に意味があるのか無いのかがパッとみて分からなくなります。
そんな時は、空の型を定義してあげましょう。
struct unit{};
デバッガがクラッシュしたり挙動不審になる
rxcpp
の多くの機能が template
によって実装されています。
そして、テンプレートパラメータのデフォルト引数が定義されているケースが多く、例えば just
を使って observable<T>
を期待していても、テンプレートパラメータがくっついてきます。
auto x = rxcpp::observable<>::just(1);
// x = rxcpp::observable<int, rxcpp::sources::detail::iterate<std::__1::array<int, 1UL>, rxcpp::identity_one_worker>>
上記のように単発であれば、まだマシですが、map
や flat_map
などのオペレータを連結していくと、かなり大変な事になります。
auto x = rxcpp::observable<>::just(1)
.flat_map([=](auto){
return rxcpp::observable<>::just(true);
});
// rxcpp::observable<bool, rxcpp::operators::detail::flat_map<rxcpp::observable<int, rxcpp::sources::detail::iterate<std::__1::array<int, 1UL>, rxcpp::identity_one_worker>>, type, rxcpp::util::detail::take_at<1>, rxcpp::identity_one_worker>>
この時点で読む気が失せる感じですよね。まぁ、読む気が失せるだけなら良いのですが、実際、コンパイラが吐き出すデバッグシンボルが肥大化する原因になり、場合によってはデバッグシンボルを読み込んでいる最中にデバッガが死にます。
全てのテンプレートパラメータは内部的には必要なものなのですが、(多くの場合)第1パラメータ以外は外部で使用することはありません。
そこで、不要なテンプレートパラメータを消去するというものが as_dynamic()
です。
auto x = rxcpp::observable<>::just(1).as_dynamic();
// y = rxcpp::observable<int>
auto x = rxcpp::observable<>::just(1)
.flat_map([=](auto){
return rxcpp::observable<>::just(true);
}).as_dynamic();
// x = rxcpp::observable<bool>
ちなみに、実装はいたって単純で、 observable<T>
に変換しているだけです。
template<class... AN>
observable<T> as_dynamic(AN**...) const {
return *this;
}
ならば、逐一、 as_dynamic()
を呼べば良いかというと、なかなかそうも行かず、この as_dynamic()
は overvable<T>
のコピーコンストラクタが動作するため、キャストによる型変換とは異なりゼロコストでは無い点に注意が必要です。(コピー元の source
を引き受けて、スケジューラをデフォルトで初期化するだけなので、言うほど高コストではないですが)
as_dynamic()
が必須なケース
下記のように、関数の返却値で型を推論してもう場合、前述のとおり、型が異なるためエラーとなります。
.flat_map([=](bool b) {
if(b) return rxcpp::observable<>::just(unit{}); // rxcpp::observable<unit, rxcpp::sources::detail::iterate<std::__1::array<unit, 1UL>, rxcpp::identity_one_worker>>
else return rxcpp::observable<>::never<unit>(); // rxcpp::observable<unit, rxcpp::sources::detail::never<unit>>
})
この場合、下記のように明示的に返却値の型を指定するか、as_dynamic()
を使用します。
.flat_map([=](bool b) -> rxcpp::observable<unit> {
if(b) return rxcpp::observable<>::just(unit{});
else return rxcpp::observable<>::never<unit>();
})
.flat_map([=](bool b) {
if(b) return rxcpp::observable<>::just(unit{}).as_dynamic();
else return rxcpp::observable<>::never<unit>().as_dynamic();
})
どちらもコスト面は変わらないので、個人的には as_dynamic()
を使うことが多いです。
データを取りこぼす
例えば、下記の some_state
のように、 start()
を実行することで、値を発行し始めるような hot observable
があったとします。
struct some_state {
private:
rxcpp::subjects::subject<int> sbj_;
rxcpp::subscription sbs_;
public:
some_state() = default;
~some_state() = default;
auto observable() { return sbj_.get_observable(); }
void start(){
sbj_.get_subscriber().on_next(0);
sbs_ = rxcpp::observable<>::interval(std::chrono::milliseconds(200), rxcpp::observe_on_new_thread())
.tap([=](int n){
sbj_.get_subscriber().on_next(n);
})
.subscribe();
}
void end() {
sbj_.get_subscriber().on_completed();
sbs_.unsubscribe();
}
};
こうなると、「start()
と observable()
を何とかまとめられないか」と考えるのが世の常です。
すると、こんなコードを書くと見事に罠にハマります。(サンプルコード test_3_1()
)
auto fn = [](std::shared_ptr<some_state> state){
state->start();
return state->observable();
};
これだと、start()
を呼び出した際に、既に値が発行されてしまうため、俗に云うデータの取りこぼしが発生します。
このようなケースでは、仕掛けて(subscribeして)から引き金を引く(startする) という流れは常に意識する必要があります。
そこで、下記のような関数を用意しておくと便利です。
template <typename T> rxcpp::observable<T> ready_set_go(std::function<void()> f, rxcpp::observable<T> o) {
return rxcpp::observable<>::create<T>([f, o](rxcpp::subscriber<T> s){
o.subscribe([s](const T& v){
s.on_next(v);
}, [s](std::exception_ptr err){
s.on_error(err);
}, [s](){
s.on_completed();
});
try{
f();
}
catch(std::exception& err){
s.on_error(std::make_exception_ptr(err));
}
});
}
そうすると、先程のコードは下記のようになります。(サンプルコード test_3_2()
)
auto fn = [](std::shared_ptr<some_state> state){
return ready_set_go([state](){
state->start();
}, state->observable());
};
特定条件まで処理を繰り返したい
例えば、非同期の API 的なものがあって、応答が observable
になっているものがあったとします。
そして、 call()
は cold observable
で、api
の結果を値として1個発行して complete
するものとします。
enum class result { success, failure };
struct some_api {
auto call() -> rxcpp::observable<result>;
};
この some_api::call()
が result::success
となるまで待つというような場合の実装例と注意点を挙げておきます。
注意点としては、 call()
は cold observable
なので、リトライ時でも改めて call()
を呼び出す必要があります。つまり、先頭を call()
から始めると、call()
は一度しか呼ばれず、その次のメソッドチェーンからリトライ(ループ)されてしまい、意図した動作になりません。
retry()
を使う
retry()
を誘発するエラーなのか、subscribe()
に透過させるエラーなのかを適切に処理する必要がありますが、このコードでは割愛しています。(つまり、このままだとヨロシクナイ状態です。)
auto api = some_api{};
api->call()
.map([=](result x){
if(x == result::failure) throw 0;
return unit{};
}).as_dynamic()
.retry()
.take(1)
.subscribe(...)
never()
を使う
never()
を使う場合、値発行だけではなく complete
さえもしないことに注意が必要です。
auto api = std::make_shared<some_api>(5);
auto sbj = rxcpp::subjects::subject<unit>();
ready_set_go([=](){
sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
return api->call();
}).as_dynamic()
.flat_map([=](result x){
scope_counter sc;
if(x == result::failure){
sbj.get_subscriber().on_next(unit{});
return rxcpp::observable<>::never<unit>().as_dynamic();
}
else{
/**
* ここで sbj を complete させるべく
* sbj.get_subscriber().on_completed();
* を実行しても、この flat_map が never を返却していると、
* もはや、全ての observable を complete することが「絶対に不可能」なので、
* 適宜、後段で take() することで、前段の購読完了の意思表示をしなければ
* subscribe で complete が呼び出されないので注意。
**/
return rxcpp::observable<>::just(unit{}).as_dynamic();
}
}).as_dynamic()
.take(1) /** 明示的な購読終了の意思表示をしないと、全てが complete しないので注意 */
.subscribe(...)
/**
* ちなみに、 take(1) が無くても、この関数は終了するが
* その場合、 subscription は終了していない状態で、この関数を抜けるため
* subscription のデストラクタで unsubscribe されているだけということに注意が必要
**/
skip_while
を使う
このケースでは、似たような判定が重複して登場するため、無理矢理感が否めないですが。
auto api = std::make_shared<some_api>(5);
auto sbj = rxcpp::subjects::subject<unit>();
ready_set_go([=](){
sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
return api->call();
}).as_dynamic()
.tap([=](result x){
if(x == result::failure){
sbj.get_subscriber().on_next(unit{});
}
else{
sbj.get_subscriber().on_completed();
}
})
.skip_while([=](result x){
return x == result::failure;
}).as_dynamic()
.take(1)
.subscribe(...)
observable
作成する
observable::create
内で result::failure
時に自身の completed
を忘れると、 never
と同じことになり、全体の subscribe
が complete
しないので注意が必要です。
auto api = some_api{};
auto sbj = rxcpp::subjects::subject<unit>();
ready_set_go([=](){
sbj.get_subscriber().on_next(unit{});
}, sbj.get_observable())
.flat_map([=](unit){
return api->call();
}).as_dynamic()
.flat_map([=](result x){
return rxcpp::observable<>::create<unit>([=](rxcpp::subscriber<unit> s){
if(x == result::failure){
sbj.get_subscriber().on_next(unit{});
s.on_completed(); /** 忘れがち! */
}
else {
s.on_next(unit{});
s.on_completed();
sbj.get_subscriber().on_completed();
}
});
}).as_dynamic()
.subscribe(...)
as_blocking
を使ってループする
as_blocking()
で堰き止めて、while
で期待する値が得られるまでループする方法です。
この方法はスタックが深くならないし、コードがシンプルというメリットはあります。
しかし、このループは期待する値が得られるまで誰も止めることができません。
全体を unsubscribe
してもループは動き続けてしまうので、何らかの条件でループを抜ける仕組みを入れてあげる必要があります。
この辺をケアすると複雑になるかもです。
また、ループがメインスレッドで実行されると、メインスレッドが call()
による値の発行を待ってしまうので、実行するならワーカスレッドを使いましょう。
あと、call()
が値を発行せず complete
するケースがあるなら、first()
みたいに手を抜かず、subscribe
しましょう。
rxcpp::observable<>::create<unit>([](rxcpp::subscriber<unit> s){
try{
some_api api(5);
auto results = api.call().as_blocking();
while(results.first() == result::failure) {}
s.on_next(unit{});
s.on_completed();
}
catch(std::exception& e){
s.on_error(std::make_exception_ptr(e));
}
})
.subscribe(...)
注意点
-
call()
が、何らかのシステムコール的なAPIを使用して、そのcompletion handler
的な関数から値が発行されている場合、call()
以降のメソッドチェーンは、そのcompletion handler
の中で実行されることになります。つまり、そのまま再度call()
を呼び出すと、システムコールが再入することになるためエラーやクラッシュする可能性がありあます。そこで、call()
の後段で適宜observe_on
を使用して後段処理を別スレッドに逃してcompletion handler
の処理を戻してあげましょう。 - 仮に非同期 API だけど、特定のエラーなどで同期的に(同一スレッドで)値を発行した場合、どの手段を使ってもループ処理は再帰的に実行されます。数回なら許容されるかもですが、数十回とかになるとスタックオーバフローが発生しクラッシュします。そこで、ループを発生させるコードに
observe_on
を入れてスタックを消費しないよう心がけましょう。(サンプルコードtest_2_1()
とtest_2_2()
)ただし、実行速度とのトレードオフがあるため、やたらめったらobserve_on
を入れるのはやめましょう。(この点「retry はどこから retry するのか?」も参考にしてください) -
test_1_1()
からtest_1_4()
はどれもsubscribe
で 1回だけnext
が呼び出され、complete
するのですが、どの例も値の発行タイミングや判断処理のスコープが微妙に異なります。実際に動作させてログを見てみるとわかりやすいと思います。
スケジューラを作りたい
std::async
を使った簡単な scheduler
を例示しておきます。
スレッドプールや、特定スレッドで実行させたい場合なんかに std::async
を適宜置き換えると良いかと思います。
例えば、std::async
を boost::asio::io_context
に post()
するとか、 std::this_thread::sleep_until()
を boost::asio::deadline_timer
にする感じで使っています。
class async_scheduler : public rxcpp::schedulers::scheduler_interface {
private:
class async_worker : public rxcpp::schedulers::worker_interface {
private:
rxcpp::composite_subscription m_lifetime;
public:
explicit async_worker(rxcpp::composite_subscription cs) : m_lifetime(cs) {}
virtual ~async_worker() { m_lifetime.unsubscribe(); }
virtual clock_type::time_point now() const override { return clock_type::now(); }
virtual void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
schedule(now(), scbl);
}
virtual void schedule(rxcpp::schedulers::scheduler_interface::clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
if (scbl.is_subscribed()) {
auto THIS = shared_from_this();
std::async(std::launch::async, [THIS, scbl, when](){
std::this_thread::sleep_until(when);
scbl(rxcpp::schedulers::recursion(true).get_recurse());
});
}
}
};
public:
async_scheduler() = default;
virtual ~async_scheduler() = default;
virtual rxcpp::schedulers::scheduler_interface::clock_type::time_point now() const override {
return rxcpp::schedulers::scheduler_interface::clock_type::now();
}
virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
return rxcpp::schedulers::worker(cs, std::make_shared<async_worker>(cs));
}
};
rxcpp::observe_on_one_worker observe_on_async() {
return rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<async_scheduler>());
}
retry はどこから retry するのか?
下記のコードで確認しましょう。
void test_4_1() {
/** 関数の入退出を確認 */
struct scope {
std::string name_;
scope(const std::string& name) : name_(name) {
std::cout << "enter:" << name_ << std::endl;
}
scope(const std::string& name, int n) {
std::stringstream ss;
ss << name << " (" << n << ")";
name_ = ss.str();
std::cout << "enter:" << name_ << std::endl;
}
~scope() {
std::cout << "leave:" << name_ << std::endl;
}
};
int counter = 0; /** fn() 内の observable で活躍するカウンタ */
/**
* subscribe 毎にインクリメントされた値を1個だけ発行する
* cold observable を返却する関数
**/
auto fn = [&counter]() {
scope fns("fn()");
return rxcpp::observable<>::create<int>([&counter](rxcpp::subscriber<int> s){
scope fnos("observable 1st", counter);
s.on_next(counter++); /** counter を加算して発行 */
s.on_completed();
})
.flat_map([](int n){
scope fns("flat_map");
return rxcpp::observable<>::create<int>([n](rxcpp::subscriber<int> s){
scope fnos("observable 2nd", n);
s.on_next(n); /** 受け取った数値(counter) をそのまま発行 */
s.on_completed();
});
});
};
fn()
.map([=](int x){
std::cout << x << std::endl;
if(x == 2) return unit{}; /** その後 take(1) で終了 */
std::cout << "throw" << std::endl;
throw 0; /** retry を誘発 */
})
.retry()
.take(1)
.subscribe([=](unit){
std::cout << "on next" << std::endl;
}, [=](std::exception_ptr){
std::cout << "on error" << std::endl;
}, [=](){
std::cout << "on complete" << std::endl;
});
}
enter:fn()
leave:fn()
enter:observable 1st (0)
enter:flat_map
leave:flat_map
enter:observable 2nd (0)
0
throw
enter:observable 1st (1)
enter:flat_map
leave:flat_map
enter:observable 2nd (1)
1
throw
enter:observable 1st (2)
enter:flat_map
leave:flat_map
enter:observable 2nd (2)
2
on next
on complete
leave:observable 2nd (2)
leave:observable 1st (2)
leave:observable 2nd (1)
leave:observable 1st (1)
leave:observable 2nd (0)
leave:observable 1st (0)
源流であるfn()
が返却するobservable
を再度観測するという流れになります。
retry()
を契機にfn()
が呼び出される訳じゃない点に注意です。
なお、fn()
内の observable
で実行されているコードが complete
後に関数から抜けている点にも注目してください。(これがスタックオーバーフローの原因になり得ます)
ちなみに、サンプルコード test_4_2()
では、fn()
の直後に observe_on
でスレッドを切り替えてみました。
その結果はこちらです。
enter:fn()
leave:fn()
enter:observable 1st (0)
enter:flat_map
leave:flat_map
enter:observable 2nd (0)
leave:observable 2nd (0)
leave:observable 1st (0)
0
throw
enter:observable 1st (1)
enter:flat_map
leave:flat_map
enter:observable 2nd (1)
leave:observable 2nd (1)
leave:observable 1st (1)
1
throw
enter:observable 1st (2)
enter:flat_map
leave:flat_map
enter:observable 2nd (2)
leave:observable 2nd (2)
2
on next
leave:observable 1st (2)
on complete
マルチスレッドでオペレータの挙動が怪しい(take(1)
なのに復数の値が発行されてしまう症状など)
下記のコードは、マルチスレッドで値の発行とエラーをほぼ同時に発行するためのテストプログラムです。
void test_5_3()
{
for(auto i = 1;; i++){
std::cout << std::endl << "========== " << i << "==========" << std::endl;
auto count = std::make_shared<std::atomic_int>(0);
auto mtx = std::make_shared<sem<1>>();
auto sbsc = rxcpp::observable<>::interval(std::chrono::milliseconds(100), rxcpp::observe_on_new_thread())
.observe_on(rxcpp::observe_on_new_thread())
.flat_map([=](int x){
std::cout << std::this_thread::get_id() << " : " << x << " : enter" << std::endl;
if(x == 1){
mtx->lock();
return rxcpp::observable<>::just(unit{})
.observe_on(rxcpp::observe_on_new_thread())
.flat_map([=](unit){
std::cout << std::this_thread::get_id() << " : " << x << " : wait for get semaphore" << std::endl;
mtx->lock();
std::cout << std::this_thread::get_id() << " : " << x << " : throw" << std::endl;
return rxcpp::observable<>::error<int>(std::make_exception_ptr(std::exception()));
}).as_dynamic();
}
std::cout << std::this_thread::get_id() << " : " << x << " : unlock semaphore" << std::endl;
mtx->unlock();
std::this_thread::yield();
std::cout << std::this_thread::get_id() << " : " << x << " : emit" << std::endl;
return rxcpp::observable<>::just(x).as_dynamic();
})
.take(1)
.observe_on(rxcpp::observe_on_new_thread())
.on_error_resume_next([=](std::exception_ptr e){
std::cout << std::this_thread::get_id() << " : on_error_resume_next" << std::endl;
return rxcpp::observable<>::just(unit{})
.observe_on(rxcpp::observe_on_new_thread())
.flat_map([=](unit){
std::cout << std::this_thread::get_id() << " : on_error_resume_next: emit" << std::endl;
return rxcpp::observable<>::just(1);
});
})
.subscribe([=](int x){
(*count)++;
std::cout << std::this_thread::get_id() << " : on next " << x << std::endl;
}, [=](std::exception_ptr){
std::cout << std::this_thread::get_id() << " : on error" << std::endl;
}, [=](){
std::cout << std::this_thread::get_id() << " : on complete" << std::endl;
});
while(sbsc.is_subscribed()) {}
std::cout << "----------> next count = " << *count << std::endl;
assert(*count == 1);
}
}
同期オブジェクトの sem<1>
は自作の semaphore
で共有資源が1個の所謂 binary semaphore
です。(lock
した同期オブジェクトが別スレッドでunlock
しているためmutex
は使えません)
で、なんだかゴチャゴチャ書いていますが、ざっと説明するとこんな感じです。
-
interval
を使って 100ms 毎に値を発行 - 初回の値で
semaphore
を取得した後、再度semaphore
取得するため解放されるまで待つ。(...A) - 2回目の値で
semaphore
を解放(...B)しつつ、値を発行(...C) - (A) の解放待ちが、(B)により終了したので、エラーを発行する(...D)
- (C)と(D)がほぼ同時で
take()
オペレータに突入する - エラーの場合、そのエラーは揉み消して、下流には値(1という数値)を発行する
-
subscribe
で観測された値の個数を記録する(正常なら1になる筈です)
take()
に限らず、どんなオペレータも基本は下記のルールになります。
- 上流を
subscribe
する - 下流に
next
、error
、complete
を発行する - 下流に
error
やcomplete
を発行した場合は、上流をunsubscribe
して、自身は終了状態になる
従って、「値が先行」「エラーが先行」した場合、下記のようなログになります。
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000eb4a000 : 2 : emit
0x70000ec50000 : 1 : throw
0x70000eac7000 : on next 2
0x70000eac7000 : on complete
----------> next count = 1
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000ec50000 : 1 : throw
0x70000eac7000 : on_error_resume_next
0x70000eb4a000 : 2 : emit
0x70000ebcd000 : on_error_resume_next: emit
0x70000ebcd000 : on next 1
0x70000ebcd000 : on complete
----------> next count = 1
なるほど、ちゃんと動いてる的な、とても安心感があるログですね。
ところが、マルチスレッド下においてtake()
オペレータはブロッキングしません。
つまり、上流からの何らかの発行を受け下流に何かを発行している最中でも、自身が終了状態でなければ、上流からの発行を受け付けてしまいます。
そのような自体が発生した際のログは下記のようになります。
0x70000eb4a000 : 1 : enter
0x70000ec50000 : 1 : wait for get semaphore
0x70000eb4a000 : 2 : enter
0x70000eb4a000 : 2 : unlock semaphore
0x70000ec50000 : 1 : throw
0x70000eb4a000 : 2 : emit
0x70000eac7000 : on next 2
0x70000eac7000 : on_error_resume_next
0x70000eb4a000 : on_error_resume_next: emit
0x70000eb4a000 : on next 1
0x70000eb4a000 : on complete
----------> next count = 2
マルチコアのCPUだと、結構な頻度でこの手の「抜け」が発生します。
この場合、map
などのオペレータとsemaphore
を使ってブロックすることも可能ですが、もっと簡単な方法があります。
それは、このtake()
オペレータを同一のスレッドで待ち合わせる方法です。
具体的には take()
オペレータの手前で下記のコードを追加するだけです。
.observe_on(rxcpp::observe_on_new_thread()) /** <-- 追加 */
.take(1)
殆どのオペレータは、エラーが発生した場合、オペレータの機能が無くなり、エラーを下流へ透過します。
例えば delay()
オペレータの上流でエラーが発生しても、そのエラーの発行を遅延させることなく下流へエラーを発行します。(同時に上流はunsubscribe
します)
しかし、observe_on()
は特殊です1。observe_on()
はエラーの場合でも、そのまま下流にエラーを発行することなく、本来の機能を維持します。つまり、observe_on()
で指定したスケジューラでエラーが発行されます。(もちろん同時に上流はunsubscribe
されます)
従って、上記のようにobserve_on()
を使用して、待ち合わせ用のスケジューラを指定することで問題は回避されます。
当たり前ですが、observe_on()
で指定するスケジューラにスレッドプールを使わないでください2。スレッドプールを指定したら、結果並列処理になる可能性があるため、待ち合わせになりません。(スレッドを1個だけ抱えているスレッドプールなら別ですが)
サンプルコード test_5_3()
は値とエラーの例ですが、 test_5_4()
は値を2つ同時に発行する例になります。そして、値を2つ同時に発行する場合は take(1)
なのに値が2個発行される事態になります。
スケジューラの実行タイミングがよく分からなくなった
observe_on
は分かりやすいかと思うのですが、 subscribe_on
が分からなくて、結果グチャグチャになるケースがあるので、スケジューラが切り替わる状況を可視化して具体的に纏めてみようと思います。
準備
下記のような簡単な機能を実装しました。
デバッグ用のスケジューラ
デバッグ用のスケジューラを作りました。
スケジューラと言っても、ログを記録するだけで、内部は同期処理にしています。
ログに [[schedule]]
と入っている箇所はスケジューラが切り替わったことを示しています。
非同期の場合は、どこで指定されたスケジューラで実行されているのかを把握するには良いかと思います。
また、 {
と }
を使って期間を示すようにしました。
class debug_worker : public rxcpp::schedulers::worker_interface {
virtual void schedule(rxcpp::schedulers::scheduler_interface::clock_type::time_point when, const
rxcpp::schedulers::schedulable& scbl) const override {
if (scbl.is_subscribed()) {
std::cout << "[[schedule]] " << m_name << " {" << std::endl;
scbl(rxcpp::schedulers::recursion(true).get_recurse());
std::cout << "[[schedule]] " << m_name << " }" << std::endl;
}
}
};
値を1個だけ発行する observable
next
と complete
を実行している期間を{
と }
を使って表現しました。
rxcpp::observable<unit> obs(std::string name = std::string()) {
return rxcpp::observable<>::create<unit>([name](rxcpp::subscriber<unit> s){
std::cout << name << "emit next" << " {" << std::endl;
s.on_next(unit{});
std::cout << name << "emit next" << " }" << std::endl;
std::cout << name << "emit complete" << " {" << std::endl;
s.on_completed();
std::cout << name << "emit complete" << " }" << std::endl;
});
}
6-1. スケジューラ指定なし
スケジューラを使用しないケースです。
void test_6_1()
{
obs()
.subscribe(
[](unit){ std::cout << "on next" << std::endl; },
[](std::exception_ptr){ std::cout << "on error" << std::endl; },
[](){ std::cout << "on complete" << std::endl; }
);
}
emit next {
on next
emit next }
emit complete {
on complete
emit complete }
サンプルコードではインデントされていませんが、分かりやすいように手動で加工しました。
なお、以降は subscribe(...)
と省略します。
6-2. observe_on
void test_6_2(){
obs()
.observe_on(observe_on_debug("observe_on"))
.subscribe(...);
}
emit next {
[[schedule]] observe_on {
on next
[[schedule]] observe_on }
emit next }
emit complete {
[[schedule]] observe_on {
on complete
[[schedule]] observe_on }
emit complete }
observe_on
の効果は、指定子たスケジューラで下流を実行することです。
この例では、obs()
の下流に当たるsubscribe()
内の on next
と on complete
が observe_on
で指定したスケジューラで実行されていることが分かります。
あ、あと、次の 6-3
と比較ですが、6-2
のobs()
はメインスレッド(正確に言うとsubscribe
を実行したスレッド)で実行されます。これは何を意味するかというと、obs()
がメインスレッドで実行されるため、仮にobserve_on
で非同期スケジューラを使用しスレッド切替が発生したとしても、obs()
がcomplete
するまで、メインスレッドはブロックされます。
6-3. subscribe_on
void test_6_3(){
obs()
.subscribe_on(observe_on_debug("subscribe_on"))
.subscribe(...);
}
[[schedule]] subscribe_on {
emit next {
on next
emit next }
emit complete {
on complete
[[schedule]] subscribe_on {
[[schedule]] subscribe_on }
emit complete }
[[schedule]] subscribe_on }
subscribe_on
もスケジューラを要求するので observe_on_debug
を設定しています。
subscribe_on
の効果は、 subscribe
した時に先頭の observable
について subscribe_on
で指定したスケジューラで実行します。
6-2
とは対象的で、仮に subscribe_on
で非同期スケジューラを使用した場合、メインスレッドはブロッキングされません。
6-4. observe_on → observe_on
多くのオペレータにはスケジューラを設定することができます。
よく使う手として、just
オペレータにスケジューラを指定して頭出しをするなんてことをやります。
void test_6_4(){
rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
.flat_map([](unit){
return obs();
})
.observe_on(observe_on_debug("observe_on"))
.subscribe(...);
}
[[schedule]] just {
emit next {
[[schedule]] observe_on {
on next
[[schedule]] observe_on }
emit next }
emit complete {
emit complete }
[[schedule]] observe_on {
on complete
[[schedule]] observe_on }
[[schedule]] just }
just
が指定したスケジューラで実行されるため、test_6_3
と同様に、メインスレッドをブロックすることはありません。(test_6_2
とも比較しましょう)
あ、あと emit complete
と on complete
がずれている点ですが、これは emit complete
は flat_map
オペレータに対して発行され、flat_map
は上流のcomplete
を受けて(この時点でemit complete
の役目は終わり制御を戻します)、下流にcomplete
を発行します。最後に、observe_on
オペレータが上流の complete
を受けて、下流に complete
を発行します。そして、下流に complete
を発行する時は指定されたスケジューラで実行する流れになります。
6-5. observe_on → subscribe_on
void test_6_5(){
rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
.flat_map([](unit){
return obs();
})
.subscribe_on(observe_on_debug("subscribe_on"))
.subscribe(...);
}
[[schedule]] subscribe_on {
[[schedule]] just {
emit next {
on next
emit next }
emit complete {
emit complete }
on complete
[[schedule]] subscribe_on {
[[schedule]] subscribe_on }
[[schedule]] just }
[[schedule]] subscribe_on }
ちょっとややこしくなってきました。
しかし、 subscribe_on
と observe_on
はこれまでの説明通りの動作になっています。
注意点は subscribe_on
は subscribe
のスケジューラを指定するのではありません。
subscribe()
を実行する時の先頭の observable
の実行スケジューラを指定するものです。
6-6. observe_on → observe_on → observe_on
void test_6_6(){
rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
.observe_on(observe_on_debug("observe_on #1"))
.flat_map([](unit){
return obs();
})
.observe_on(observe_on_debug("observe_on #2"))
.subscribe(...);
}
[[schedule]] just {
[[schedule]] observe_on #1 {
emit next {
[[schedule]] observe_on #2 {
on next
[[schedule]] observe_on #2 }
emit next }
emit complete {
emit complete }
[[schedule]] observe_on #1 }
[[schedule]] observe_on #1 {
[[schedule]] observe_on #2 {
on complete
[[schedule]] observe_on #2 }
[[schedule]] observe_on #1 }
[[schedule]] just }
インデントが深くなっていますが、そんなに難しくは無いですね。
6-7. observe_on → observe_on → subscribe_on
void test_6_7(){
rxcpp::observable<>::just(unit{}, observe_on_debug("just"))
.observe_on(observe_on_debug("observe_on"))
.flat_map([](unit){
return obs();
})
.subscribe_on(observe_on_debug("subscribe_on"))
.subscribe(...);
}
[[schedule]] subscribe_on {
[[schedule]] just {
[[schedule]] observe_on {
emit next {
on next
emit next }
emit complete {
emit complete }
[[schedule]] observe_on }
[[schedule]] observe_on {
on complete
[[schedule]] subscribe_on {
[[schedule]] subscribe_on }
[[schedule]] observe_on }
[[schedule]] just }
[[schedule]] subscribe_on }
subscribe_on
が登場してますが、もう大丈夫ですよね。
6-8. subscribe_on → observe_on → subscribe_on
では、subscribe_on
が復数登場した場合、どうなるでしょうか?
void test_6_8(){
obs()
.subscribe_on(observe_on_debug("subscribe_on #1"))
.observe_on(observe_on_debug("observe_on"))
.subscribe_on(observe_on_debug("subscribe_on #2"))
.subscribe(
[=](unit){ std::cout << "on next" << std::endl; },
[=](std::exception_ptr){ std::cout << "on error" << std::endl; },
[=](){ std::cout << "on complete" << std::endl; }
);
}
[[schedule]] subscribe_on #2 {
[[schedule]] subscribe_on #1 {
emit next {
[[schedule]] observe_on {
on next
[[schedule]] observe_on }
emit next }
emit complete {
[[schedule]] observe_on {
on complete
[[schedule]] subscribe_on #2 {
[[schedule]] subscribe_on #2 }
[[schedule]] subscribe_on #1 {
[[schedule]] subscribe_on #1 }
[[schedule]] observe_on }
emit complete }
[[schedule]] subscribe_on #1 }
[[schedule]] subscribe_on #2 }
こうなってしまいます。
これで幸せになる方法が分からないのですが。。。
使うケースあるのかな??
6-9. subscribe_on → subscribe_on
では、subscribe_on
が復数存在しても意味が無いかというと、そういう訳ではありません。
void test_6_9(){
obs("(A) ")
.flat_map([](unit){
return obs("(B) ")
.subscribe_on(observe_on_debug("subscribe_on #1"));
})
.subscribe_on(observe_on_debug("subscribe_on #2"))
.subscribe(
[](unit){ std::cout << "on next" << std::endl; },
[](std::exception_ptr){ std::cout << "on error" << std::endl; },
[](){ std::cout << "on complete" << std::endl; }
);
}
[[schedule]] subscribe_on #2 {
(A) emit next {
[[schedule]] subscribe_on #1 {
(B) emit next {
on next
(B) emit next }
(B) emit complete {
[[schedule]] subscribe_on #1 {
[[schedule]] subscribe_on #1 }
(B) emit complete }
[[schedule]] subscribe_on #1 }
(A) emit next }
(A) emit complete {
on complete
[[schedule]] subscribe_on #2 {
[[schedule]] subscribe_on #2 }
(A) emit complete }
[[schedule]] subscribe_on #2 }
observable
がカスケードされている場合、subscribe_on
はそのスコープ内の先頭のobservable
の実行スケジューラを指定することになります。
subscribe()
内の next
と complete
の実行スケジューラが指定されている訳ではありません。(上記の例では、on next
はsubscribe_on #1
、on complete
はsubscribe_on #2
で実行されています)
もし、subscribe()
内の実行スケジューラを指定したい場合、subscribe()
の手前で observe_on
を使用する必要があります。
6-10. observe_on → observe_on
6-10
のように先頭の observable
の実行スケジューラを指定したいのであれば、 just
で頭出しする方法もあります。
void test_6_10(){
rxcpp::observable<>::just(unit{}, observe_on_debug("observe_on #2"))
.flat_map([](unit){
return obs("(A) ");
})
.flat_map([](unit){
return rxcpp::observable<>::just(unit{}, observe_on_debug("observe_on #1"))
.flat_map([](unit){
return obs("(B) ");
});
})
.subscribe(
[](unit){ std::cout << "on next" << std::endl; },
[](std::exception_ptr){ std::cout << "on error" << std::endl; },
[](){ std::cout << "on complete" << std::endl; }
);
}
[[schedule]] observe_on #2 {
(A) emit next {
[[schedule]] observe_on #1 {
(B) emit next {
on next
(B) emit next }
(B) emit complete {
(B) emit complete }
[[schedule]] observe_on #1 }
(A) emit next }
(A) emit complete {
(A) emit complete }
on complete
[[schedule]] observe_on #2 }
subscribe_on
より、just
で (または直前にobserve_on
で指定)頭出ししてスケジューリングする方が、個人的には分かりやすいし、スケジューラの切替が少ないので、こちらを使用するケースが多いです。
retry_when
的なことがしたい
現時点で rxcpp
には retry_when
なるオペレータは存在しません。
繰り返し処理には retry
オペレータは便利なのですが、再試行すべきエラーか下流に発行すべきエラーかを問わず、このretry
オペレータはエラーとなれば再試行します。
そこで、値と下流に発行すべきエラーをラップする方法があります。
something
クラス
この something<T>
クラスは、jsut<T>
またはerror<T>
を保有するクラスです。
そして、この something
を下流に値にして発行することで、 retry
をパスしようと考えています。
template<typename T = void> class something;
template <> class something<void> {
private:
class retry_trigger : public std::exception {};
public:
[[noreturn]] static void retry() {
throw retry_trigger{};
}
template<typename T, typename TT = typename std::remove_reference<T>::type>
static something<TT> success(T&& v) {
return something<TT>(
rxcpp::observable<>::just(std::forward<T>(v))
);
}
template<typename T> static something<T> error(std::exception_ptr err) {
return something<T>(
rxcpp::observable<>::error<T>(err)
);
}
};
template<typename T> class something {
friend class something<>;
private:
rxcpp::observable<T> o_;
something(rxcpp::observable<T> o) : o_(o) {}
public:
rxcpp::observable<T> proceed() { return o_; }
};
使用方法
使用方法といっても、そんなに大それたものでもないので、コメントを参照してください。
void test_7_1() {
some_api api(5);
auto sbsc = api.call()
.map([=](result r){
if(r == result::success) {
std::cout << "success" << std::endl;
return something<>::success(r); /* 正常 */
}
std::cout << "failure -> retry" << std::endl;
something<>::retry(); /* retryオペレータを反応させます */
})
.on_error_resume_next([](std::exception_ptr err){
/**
* このサンプルコードでは発生することはないのですが、
* 例えば、再試行させたくないエラーを、ここでsomethingとして値を発行します。
* そして、retryの後段で flat_map して下流にエラーを発行します。
*/
return rxcpp::observable<>::just(something<>::error<result>(err));
})
.retry()
.flat_map([](something<result> o){
/** ここで just<result> から error<result> に接続されます */
return o.proceed();
})
.subscribe(...);
}
例えば、再試行回数が2回の場合こんな感じになります。
void test_7_2() {
some_api api(5);
auto error_count = std::make_shared<int>(0);
auto sbsc = api.call()
.map([=](result r){
if(r == result::failure) {
*error_count = *error_count + 1;
std::cout << "error count " << *error_count << std::endl;
if(*error_count == 2){
std::cout << "pass the error" << std::endl;
return something<>::error<result>(std::make_exception_ptr(std::exception()));
}
}
std::cout << "failure -> retry" << std::endl;
something<>::retry();
})
.retry()
.flat_map([](something<result> o){
return o.proceed();
})
.subscribe(...);
}
手軽な感じではないですが、retry_when
より細かい制御が入る場合、案外取り回しが楽だったりします。
並列実行してるのにパフォーマンスが出ていないような気がする
並列実行はパフォーマンスを上げたり、CPUリソースを積極的に解放したりするための重要なテクニックですが、あまり過度に並列数を増やすと弊害 3 も出てきます。
例えば、100個のURLがあって、各URLの情報をWebClient的な非同期関数を使ってGETして、HTTPのレスポンスを取得する場合などです。
rxcpp::observable<>::iterate(urls) /* urls ... url のリスト */
.flat_map([=](const URL& url){
return http_get(url);
})
.subscribe(...)
この場合、urls
から1個ずつ url
を取り出して http_get()
を呼び出すのですが、http_get()
が非同期APIだとすると http_get()
を呼び出した直後に次の url
が発行されて、対向サーバーからの応答待ちの http_get()
が大量に貯留する可能性があります。
I/O待ちなのでCPUリソースは消費していないのですが、ネットワークドライバがてんやわんやな状態(輻輳状態)になります。
まぁまぁ、これはこれで動いたりするのですが、往々にして1個ずつ url
をhttp_get()
で呼び出した方が、復数のurl
をガツンと http_get()
で呼び出すよりもパフォーマンスが上がる場合があります。
そこで、http_get()
を並列実行する個数を制限するために inflow_restriction
クラスを用意しました。
template <int N>
class inflow_restriction {
private:
sem<N> m_sem;
public:
inflow_restriction() = default;
inflow_restriction(const inflow_restriction&) = delete;
~inflow_restriction() = default;
template <typename T>
rxcpp::observable<T> enter(rxcpp::observable<T> o){
return rxcpp::observable<>::create<T>([=](rxcpp::subscriber<T> s){
m_sem.lock();
o.subscribe([=](T v){
s.on_next(v);
}, [=](std::exception_ptr e){
s.on_error(e);
m_sem.unlock();
}, [=](){
s.on_completed();
m_sem.unlock();
});
});
}
};
同期オブジェクト sem<N>
で並列数を N
個として、N
個の共有資源を奪い合うことで並列数の制限を行います。
なお、そのうち使うことになるであろう std::counting_semaphore
を想定して、並列数をテンプレートパラメータにしました。
auto ifr = std::make_shared<inflow_restriction<4>>();
rxcpp::observable<>::iterate(urls) /* urls ... url のリスト */
.flat_map([=](const URL& url){
return ifr->enter(http_get(url)); /* 同時に4つまでしから subscribe しない */
})
.subscribe(...)
サンプルコードは test_8_1()
と test_8_2()
になります。