RxCppを使うにあたっての自分向けのメモ
observable::create.filter.map.subscribe
#include "rxcpp/rx.hpp"
namespace rx=rxcpp;
namespace rxsub=rxcpp::subjects;
namespace rxu=rxcpp::util;
namespace rxs=rxcpp::sources;
int main()
{
std::cout << "start" << std::endl;
rx::observable<>::create<int>([](rx::subscriber<int> dest) {
dest.on_next(1);
dest.on_next(10);
dest.on_next(100);
}).filter([](int i) {
return i != 10;
}).map(/* Selector s */[](int i) {
std::cout << i << std::endl;
return i * 2;
}).subscribe([](int i) {
std::cout << i << std::endl;
});
std::cout << "end" << std::endl;
return 0;
}
stdout
start
1
2
100
200
end
observable::create.subscribe(on{next, error, complete})
rx::observable<>::create<int>([](rx::subscriber<int> dest) {
try {
dest.on_next(1);
dest.on_next(2);
throw std::make_exception_ptr(std::runtime_error("error"));
dest.on_next(3);
dest.on_completed();
} catch(std::exception_ptr e) {
dest.on_error(e);
}
}).subscribe(
// on next
[](const int v){ std::cout << v << std::endl; },
// on error
[](const std::exception_ptr& e){ std::cout << "error." << std::endl; },
// on completed
[](){ std::cout << " done." << std::endl; }
);
stdout
1
2
error.
return observable
rxcpp::observable<int> a() {
return rx::observable<>::from<int>(1, 2, 3);
}
int main()
{
a().subscribe([](int i) {
std::cout << i << std::endl;
});
return 1;
}
stdout
1
2
3
range.flat_map
rxs::range(1, 3)
.flat_map(
[](int i) {
return rxs::range(1, i)
.as_dynamic();
},
[](int i, int j) {
std::cout << "# " << i << ", " << j << std::endl;
return i;
}
)
.subscribe(
[](int i) {
std::cout << "subscribe: " << i << std::endl;
}
);
stdout
# 1, 1
subscribe: 1
# 2, 1
subscribe: 2
# 2, 2
subscribe: 2
# 3, 1
subscribe: 3
# 3, 2
subscribe: 3
# 3, 3
subscribe: 3
streamの中でstreamにつなげる
rxs::range(1, 3)
.flat_map(
/* CollectionSelector &&s */
[](int i) {
return rx::observable<>::create<int>([=](rx::subscriber<int> dest) {
dest.on_error(std::make_exception_ptr(std::runtime_error("error")));
});
},
/* ResultSelector &&rs */
[](int i, int j) {
std::cout << "# " << i << ", " << j << std::endl;
return i;
})
.subscribe(
[](int i) {
std::cout << i << std::endl;
},
[](const std::exception_ptr& e) {
std::cout << "error." << std::endl;
});
stdout
error.
実用例っぽいobservableの連結
ApiClient::getApiResponseA(int)
-
ApiClient::getApiResponseB(int)
があり、A
の結果を使いB
を叩く例
class ApiClient
{
public:
static rxcpp::observable<int> getApiResponseA(const int i) {
return rx::observable<>::create<int>([=](rx::subscriber<int> dest) {
try {
// なんらかのAPI処理
dest.on_next(i);
} catch (std::exception_ptr e) {
dest.on_error(e);
}
dest.on_completed();
});
}
static rxcpp::observable<std::string> getApiResponseB(const int i) {
//throw std::make_exception_ptr(std::runtime_error("error"));
return rx::observable<>::create<std::string>([=](rx::subscriber<std::string> dest) {
try {
// なんらかのAPI処理
// throw std::make_exception_ptr(std::runtime_error("error"));
auto str = std::string("result:") + std::to_string(i);
dest.on_next(str);
} catch (std::exception_ptr e) {
dest.on_error(e);
}
dest.on_completed();
});
}
};
int main()
{
ApiClient::getApiResponseA(1)
.flat_map([](int i) {
return ApiClient::getApiResponseB(i);
}, [](int i, std::string str) {
return "#" + str;
})
.subscribe([](std::string s) {
std::cout << s << std::endl;
},[](const std::exception_ptr& e) {
std::cout << "error." << std::endl;
});
return 0;
}
stdout
#result:1
finally
rx::observable<>::create<int>([](rx::subscriber<int> dest) {
dest.on_next(1);
// finallyを動かすには on_error か on_complete が実行される必要がある
dest.on_error(std::make_exception_ptr(std::runtime_error("a")));
dest.on_completed();
}).finally([]() {
std::cout << "finally" << std::endl;
}).subscribe(
[](int i) {std::cout << i << std::endl;},
[](const std::exception_ptr& e){ std::cout << "error." << std::endl; }
);
stdout
1
error.
finally
subject
#include "rxcpp/rx.hpp"
#include <iostream>
namespace rx = rxcpp;
namespace rxsub = rxcpp::subjects;
namespace rxu = rxcpp::util;
namespace rxs = rxcpp::sources;
int main()
{
std::cout << "start" << std::endl;
rxsub::subject<int> subject;
auto subscriber = subject.get_subscriber();
auto subscriber2 = subject.get_subscriber();
auto observable = subject.get_observable();
auto observable2 = subject.get_observable();
observable.subscribe(
[](const int v){ std::cout << "observable :" << v << std::endl; },// next
[](const std::exception_ptr& e){ std::cout << "error." << std::endl; }, // error
[](){ std::cout << "observable completed." << std::endl; });// complete
observable2.subscribe(
[](const int v){ std::cout << "observable2:" << v << std::endl; },
[](const std::exception_ptr& e){ std::cout << "error." << std::endl; },
[](){ std::cout << "observable2 completed." << std::endl; });
subscriber.on_next(1);
subscriber.on_next(2);
subscriber2.on_next(3);
subscriber.on_completed();
subscriber2.on_next(4);
std::cout << "end" << std::endl;
getchar();
return 0;
}
stdout
start
observable :1
observable2:1
observable :2
observable2:2
observable :3
observable2:3
observable completed.
observable2 completed.
end