13
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rxcpp memo

Last updated at Posted at 2015-04-21

RxCppを使うにあたっての自分向けのメモ

observable::create.filter.map.subscribe

#include "rxcpp/rx.hpp"

namespace rx=rxcpp;
namespace rxsub=rxcpp::subjects;
namespace rxu=rxcpp::util;
namespace rxs=rxcpp::sources;

int main()
{
    std::cout << "start" << std::endl;
    
    rx::observable<>::create<int>([](rx::subscriber<int> dest) {
        dest.on_next(1);
        dest.on_next(10);
        dest.on_next(100);
    }).filter([](int i) {
        return i != 10;
    }).map(/* Selector s */[](int i) {
        std::cout << i << std::endl;
        return i * 2;
    }).subscribe([](int i) {
        std::cout << i << std::endl;
    });
    
    std::cout << "end" << std::endl;

    return 0;
}
stdout
start
1
2
100
200
end

observable::create.subscribe(on{next, error, complete})

    rx::observable<>::create<int>([](rx::subscriber<int> dest) {
        try {
            dest.on_next(1);
            dest.on_next(2);
            throw std::make_exception_ptr(std::runtime_error("error"));
            dest.on_next(3);
            dest.on_completed();
        } catch(std::exception_ptr e) {
            dest.on_error(e);
        }
    }).subscribe(
        // on next
        [](const int v){ std::cout << v << std::endl; },
        // on error
        [](const std::exception_ptr& e){ std::cout << "error." << std::endl; },
        // on completed
        [](){ std::cout << " done." << std::endl; }
    );
stdout
1
2
error.

return observable


rxcpp::observable<int> a() {
    return rx::observable<>::from<int>(1, 2, 3);
}

int main()
{
    a().subscribe([](int i) {
        std::cout << i << std::endl;
    });
    return 1;
}
stdout
1
2
3

range.flat_map

    rxs::range(1, 3)
    .flat_map(
              [](int i) {
                  return rxs::range(1, i)
                  .as_dynamic();
              },
              [](int i, int j) {
                  std::cout << "# " << i << ", " << j << std::endl;
                  return i;
              }
    )
    .subscribe(
               [](int i) {
                   std::cout << "subscribe: " << i << std::endl;
               }
    );
stdout
# 1, 1
subscribe: 1
# 2, 1
subscribe: 2
# 2, 2
subscribe: 2
# 3, 1
subscribe: 3
# 3, 2
subscribe: 3
# 3, 3
subscribe: 3

streamの中でstreamにつなげる

    rxs::range(1, 3)
    .flat_map(
        /* CollectionSelector &&s */
        [](int i) {
            return rx::observable<>::create<int>([=](rx::subscriber<int> dest) {
                dest.on_error(std::make_exception_ptr(std::runtime_error("error")));
            });
        },
        /* ResultSelector &&rs */
        [](int i, int j) {
            std::cout << "# " << i << ", " << j << std::endl;
            return i;
        })
    .subscribe(
        [](int i) {
            std::cout << i << std::endl;
        },
        [](const std::exception_ptr& e) {
            std::cout << "error." << std::endl;
        });
stdout
error.

実用例っぽいobservableの連結

  • ApiClient::getApiResponseA(int)
  • ApiClient::getApiResponseB(int)
    があり、A の結果を使い B を叩く例
class ApiClient
{
public:
    static rxcpp::observable<int> getApiResponseA(const int i) {
        return rx::observable<>::create<int>([=](rx::subscriber<int> dest) {
            try {
                // なんらかのAPI処理
                dest.on_next(i);
            } catch (std::exception_ptr e) {
                dest.on_error(e);
            }
            dest.on_completed();
        });
    }
    
    static rxcpp::observable<std::string> getApiResponseB(const int i) {
        //throw std::make_exception_ptr(std::runtime_error("error"));
        return rx::observable<>::create<std::string>([=](rx::subscriber<std::string> dest) {
            try {
                // なんらかのAPI処理
                // throw std::make_exception_ptr(std::runtime_error("error"));
                auto str = std::string("result:") + std::to_string(i);
                dest.on_next(str);
            } catch (std::exception_ptr e) {
                dest.on_error(e);
            }
            dest.on_completed();
        });
    }
};

int main()
{
    ApiClient::getApiResponseA(1)
    .flat_map([](int i) {
            return ApiClient::getApiResponseB(i);
        }, [](int i, std::string str) {
            return "#" + str;
        })
    .subscribe([](std::string s) {
            std::cout << s << std::endl;
        },[](const std::exception_ptr& e) {
            std::cout << "error." << std::endl;
        });
    
    return 0;
}
stdout
#result:1

finally

rx::observable<>::create<int>([](rx::subscriber<int> dest) {
    dest.on_next(1);
    // finallyを動かすには on_error か on_complete が実行される必要がある
    dest.on_error(std::make_exception_ptr(std::runtime_error("a")));
    dest.on_completed();
}).finally([]() {
    std::cout << "finally" << std::endl;
}).subscribe(
    [](int i) {std::cout << i << std::endl;},
    [](const std::exception_ptr& e){ std::cout << "error." << std::endl; }
);
stdout
1
error.
finally

subject

#include "rxcpp/rx.hpp"
#include <iostream>

namespace rx = rxcpp;
namespace rxsub = rxcpp::subjects;
namespace rxu = rxcpp::util;
namespace rxs = rxcpp::sources;

int main()
{
	std::cout << "start" << std::endl;

	rxsub::subject<int> subject;

	auto subscriber = subject.get_subscriber();
	auto subscriber2 = subject.get_subscriber();
	auto observable = subject.get_observable();
	auto observable2 = subject.get_observable();
	observable.subscribe(
		[](const int v){ std::cout << "observable :" << v << std::endl; },// next
		[](const std::exception_ptr& e){ std::cout << "error." << std::endl; }, // error
		[](){ std::cout << "observable  completed." << std::endl; });// complete
	observable2.subscribe(
		[](const int v){ std::cout << "observable2:" << v << std::endl; },
		[](const std::exception_ptr& e){ std::cout << "error." << std::endl; },
		[](){ std::cout << "observable2 completed." << std::endl; });
	subscriber.on_next(1);
	subscriber.on_next(2);
	subscriber2.on_next(3);
	subscriber.on_completed();
	subscriber2.on_next(4);

	std::cout << "end" << std::endl;

	getchar();

	return 0;
}
stdout
start
observable :1
observable2:1
observable :2
observable2:2
observable :3
observable2:3
observable  completed.
observable2 completed.
end
13
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?