ことの発端
ウチの会社の業務では非同期処理が多いのです。
OSやら各種ライブラリやらのAPIなどで採用されている非同期処理はノンブロッキングI/O
とか非同期I/O
になるのですが、如何せん、そのまま使うとコードが分散したり、ネストが深くなったり、そんでもってエラー処理どうするの?的な感じになるので、なんとかならんかと思いました。
そんな中、C#
では async/await
なるとびきり素敵なソリューションが登場した訳ですが、C++
ではどうしようと悩んで色々と非同期ライブラリを探したところ ReactiveX
に出会った訳です。
まぁ、そんなこんなで ReactiveX
とは永い付き合いになったのですが、ここ2〜3年で状況が変わってきました。
というのも、Observable
のメソッドチェーンが千個以上になってくると、コンパイル時間が長かったり(1行変更10分コース、フルビルドなら1時間コース)、デバッガがクラッシュするなど、ReactiveX
のデバッグって結構面倒なのに、更に開発環境が悪化することでストレスを抱えることになりました。もはや、as_dynamic()
だけでは改善することができなくなりました。そのうち、PCのスペックが上がれば改善されるかとたかをくくっていたのですが、PCスペックの向上よりもメソッドチェーンの増加が上回ってしまい、開発環境は悪化の一途です。
で、「元々何をしたいのか?」と自分に問うと、「非同期処理をメンテナンスしやすくしたい」ということであって、決して ReactiveX
を使いたいというモチベーションではないということです。(ReactiveX
は素晴らしいのですが)
そこで、もう一度、非同期処理をどうしようか考えたときに、そう言えば標準ライブラリの std::feature
や std::promise
があるなぁ〜と思ったのですが、どうにもしっくりこない。
せめて、Javascript
の Promise
的なのが無いのか調べたところ、GitHub
に何種類かあったのですが、「シングルスレッドで使ってね」的なものだったり、「そもそもスレッドセーフなのか?」みたいなのが多かったので、結局自作したというオチです。
JPromise
一応、Javascript
の Promise
に準拠しつつ、スレッドセーフに作りました。(作ったつもりになっているだけかも?)
template <typename T, typename TT = typename std::remove_const<typename std::remove_reference<T>::type>::type>
auto pvalue(T&& value, int delay = 0) -> typename Promise<TT>::sp {
auto _value = std::forward<T>(value);
return Promise<>::create<TT>([&_value, delay](auto resolver) {
if(delay == 0) resolver.resolve(_value);
else{
setTimeout([resolver, _value]() {
resolver.resolve(std::move(_value));
}, delay);
}
});
}
void test_7() {
const auto r = pvalue(1, 100)
->then([](const auto& x){
log() << x << std::endl;
return pvalue(x + 1, 100)
->then([](const auto& x){
log() << x << std::endl;
return x + 1;
})
->then([](const auto& x){
log() << x << std::endl;
return pvalue(x + 1, 100)
->then([](const auto& x){
log() << x << std::endl;
return pvalue(x + 1, 100);
});
});
})
->then([](const auto& x){
log() << x << std::endl;
std::stringstream ss;
ss << "result = " << x << std::endl;
return ss.str();
})
->wait();
log() << r << std::endl;
}
Promise
破棄することで、メソッドチェーンの中断処理なんかも実装しています。
あと、異なる型の Promise
を型安全にしたりしてます。
{
auto p1 = pvalue(1, 900);
auto p2 = pvalue<double>(1.23, 1200);
auto p3 = pvalue<std::string>("abc", 500);
auto p = Promise<>::all_any(p1, p2, p3)
->then([](const auto& x){ /* x = std::tuple<int, double, std::string> */
std::cout << std::get<0>(x) << std::endl; /** int 1 */
std::cout << std::get<1>(x) << std::endl; /** double 1.23 */
std::cout << std::get<2>(x) << std::endl; /** string "abc" */
assert(std::get<0>(x) == 1);
assert(std::get<1>(x) == 1.23);
assert(std::get<2>(x) == "abc");
});
log() << "wait 2 sec" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
と、まぁ、これはこれで運用で使い込んで行こうと思うのですが、Promise
は値が1個しか発行することができず、やはり、連続的にデータを捌くケースでは ReactiveX
的なのが欲しい。
ということで、恐れ多いのですが、この勢いで RxCpp
とは別の RxCpp
的なのを作ってみようと考えました。
another-rxcpp
RxCpp
で生成されるシンボルが巨大過ぎてデバッガが死ぬという原因は、間違いなくテンプレートパラメータの第2引数以降にある(主に上流のソース(observable)の型)と考え、上流の型を関数オブジェクトでくるんでしまう手法で実装してみました。つまり、多くは固定的なテンプレートパラメータになるため、テンプレートの具体化時のコストも減るし、シンボルもそう長くならないのではと夢見てます。(もしかすると関数オブジェクトの生成量が増えてしまい、結果、夢に終わるかも知れませんが。。。)
template <typename T, typename TT = typename std::remove_const<typename std::remove_reference<T>::type>::type>
auto ovalue(T&& value, int delay = 0) -> observable<TT> {
auto _value = std::forward<T>(value);
return observable<>::create<TT>([_value, delay](subscriber<TT> s) {
if(delay == 0){
s.on_next(std::move(_value));
s.on_completed();
}
else{
setTimeout([s, _value]() {
s.on_next(std::move(_value));
s.on_completed();
}, delay);
}
});
}
template <typename T> auto doSubscribe(T source) {
log() << "doSubscribe" << std::endl;
return source.subscribe({
.on_next = [](auto&& x) {
log() << " [on_next] " << x << std::endl;
},
.on_error = [](std::exception_ptr err) {
log() << " [on_error] " << std::endl;
},
.on_completed = []() {
log() << " [on_completed] " << std::endl;
}
});
}
void test_observable() {
log() << "test_observable -- begin" << std::endl;
{
log() << "#1" << std::endl;
auto ob = ovalue(123)
| flat_map([](int&& x){
log() << x << std::endl;
return ovalue(std::string("abc"))
| map([](std::string&& x){
log() << x << std::endl;
return 456;
});
});
doSubscribe(ob);
}
{
auto ob = ovalue(1)
| flat_map([](int&& x){
log() << x << std::endl;
return ovalue(std::string("abc"), 500);
})
| flat_map([](std::string&& x){
log() << x << std::endl;
return ovalue(5);
})
| flat_map([](int&& x){
log() << x << std::endl;
return ovalue(x + 1, 500);
})
| flat_map([](int&& x){
log() << x << std::endl;
return ovalue(x + 1);
});
{
log() << "#2 wait with notify_on_unsubscribe()" << std::endl;
auto x = doSubscribe(ob);
std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx);
x.unsubscribe_notice()->wait(lock, [x](){ return x.is_subscribed(); });
}
{
log() << "#3 wait until is_subscribed() == true" << std::endl;
auto x = doSubscribe(ob);
while(x.is_subscribed()) {}
}
}
}
void test_subject() {
log() << "test_subject -- begin" << std::endl;
auto sbj = std::make_shared<subjects::subject<int>>();
std::weak_ptr<subjects::subject<int>> weak_sbj = sbj;
std::thread([weak_sbj]() mutable {
for(int i = 0; i < 100; i++){
std::this_thread::sleep_for(std::chrono::seconds(1));
auto p = weak_sbj.lock();
if(p) p->as_subscriber().on_next(i);
}
auto p = weak_sbj.lock();
if(p) p->as_subscriber().on_completed();
}).detach();
wait(2500);
auto s1 = doSubscribe(sbj->as_observable());
wait(1000);
auto s2 = doSubscribe(sbj->as_observable());
wait(1000);
auto s3 = doSubscribe(sbj->as_observable());
wait(500);
s1.unsubscribe();
wait(500);
s2.unsubscribe();
wait(500);
s3.unsubscribe();
wait(1000);
auto s4 = doSubscribe(sbj->as_observable());
wait(2000);
sbj.reset();
log() << "test_subject -- end" << std::endl << std::endl;
}
ヨチヨチですが、まぁ動いています。(苦笑)
こちらは、もう少しオペレータを追加しないと業務では使い物にならないので、絶賛コーディング中になりますが、ある程度実装が終わったら現状の RxCpp
と置き換えてみてコンパイル速度やデバッガの負荷などみてみたいと思います。
ただ、本家の RxCpp
のデバッガに優しい感じになってきたら、元サヤに戻ろうかと。
2020/01/15追記
その後、 another-rxcpp
にオペレータを追加したりバグ修正を行いました。
また、RxCpp
を使用しているプロジェクトでコード修正しなくて済むよう互換オプションも追加しました。
そして、弊社で開発している RxCpp
を使用している中規模アプリで置き換えてみたところ以下のような結果となりました。
- コンパイル時間が約1/3以下に短縮
- デバッグシンボルが1/10程度に減少
- デバッガの起動時間は1/10程度に短縮
唯一、スケジューラの解放タイミングがRxCpp
よりも遅いため、一時的にスレッド数が増える傾向がありますが、それ以外は概ね問題なさそうです。(メモリリークはValgrind
で確認しましたが問題ないです)
まぁ、RxCpp
を採用しているプロジェクトは金融系なので、おいそれと基本ライブラリを変更する訳にはいかない(テストフェーズがハンパない)のですが、リリースタイミングを見計らって全てanother-rxcpp
に置き換えていこうかと思います。
年末・年始は、この関数型プログラミングの悪夢で何度も「はっ!!」と起きてコーディングする日々が続いてましたが、そろそろゆっくり眠れそうです。