1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

C++の非同期ライブラリを作る(JavascriptライクなPromiseとRxCppの代替)

Last updated at Posted at 2021-12-30

ことの発端

ウチの会社の業務では非同期処理が多いのです。
OSやら各種ライブラリやらのAPIなどで採用されている非同期処理はノンブロッキングI/Oとか非同期I/Oになるのですが、如何せん、そのまま使うとコードが分散したり、ネストが深くなったり、そんでもってエラー処理どうするの?的な感じになるので、なんとかならんかと思いました。
そんな中、C# では async/await なるとびきり素敵なソリューションが登場した訳ですが、C++ ではどうしようと悩んで色々と非同期ライブラリを探したところ ReactiveX に出会った訳です。

まぁ、そんなこんなで ReactiveX とは永い付き合いになったのですが、ここ2〜3年で状況が変わってきました。

というのも、Observable のメソッドチェーンが千個以上になってくると、コンパイル時間が長かったり(1行変更10分コース、フルビルドなら1時間コース)、デバッガがクラッシュするなど、ReactiveX のデバッグって結構面倒なのに、更に開発環境が悪化することでストレスを抱えることになりました。もはや、as_dynamic()だけでは改善することができなくなりました。そのうち、PCのスペックが上がれば改善されるかとたかをくくっていたのですが、PCスペックの向上よりもメソッドチェーンの増加が上回ってしまい、開発環境は悪化の一途です。

で、「元々何をしたいのか?」と自分に問うと、「非同期処理をメンテナンスしやすくしたい」ということであって、決して ReactiveX を使いたいというモチベーションではないということです。(ReactiveXは素晴らしいのですが)

そこで、もう一度、非同期処理をどうしようか考えたときに、そう言えば標準ライブラリの std::featurestd::promise があるなぁ〜と思ったのですが、どうにもしっくりこない。

せめて、JavascriptPromise 的なのが無いのか調べたところ、GitHub に何種類かあったのですが、「シングルスレッドで使ってね」的なものだったり、「そもそもスレッドセーフなのか?」みたいなのが多かったので、結局自作したというオチです。

JPromise

一応、JavascriptPromise に準拠しつつ、スレッドセーフに作りました。(作ったつもりになっているだけかも?)

template <typename T, typename TT = typename std::remove_const<typename std::remove_reference<T>::type>::type>
auto pvalue(T&& value, int delay = 0) -> typename Promise<TT>::sp {
  auto _value = std::forward<T>(value);
  return Promise<>::create<TT>([&_value, delay](auto resolver) {
    if(delay == 0) resolver.resolve(_value);
    else{
      setTimeout([resolver, _value]() {
        resolver.resolve(std::move(_value));
      }, delay);
    }
  });
}

void test_7() {
  const auto r = pvalue(1, 100)
  ->then([](const auto& x){
    log() << x << std::endl;
    return pvalue(x + 1, 100)
    ->then([](const auto& x){
      log() << x << std::endl;
      return x + 1;
    })
    ->then([](const auto& x){
      log() << x << std::endl;
      return pvalue(x + 1, 100)
      ->then([](const auto& x){
        log() << x << std::endl;
        return pvalue(x + 1, 100);
      });
    });
  })
  ->then([](const auto& x){
    log() << x << std::endl;
    std::stringstream ss;
    ss << "result = " << x << std::endl;
    return ss.str();
  })
  ->wait();
  log() << r << std::endl;
}

Promise 破棄することで、メソッドチェーンの中断処理なんかも実装しています。
あと、異なる型の Promise を型安全にしたりしてます。

  {
    auto p1 = pvalue(1, 900);
    auto p2 = pvalue<double>(1.23, 1200);
    auto p3 = pvalue<std::string>("abc", 500);

    auto p = Promise<>::all_any(p1, p2, p3)
    ->then([](const auto& x){ /* x = std::tuple<int, double, std::string> */
      std::cout << std::get<0>(x) << std::endl; /** int 1 */
      std::cout << std::get<1>(x) << std::endl; /** double 1.23 */
      std::cout << std::get<2>(x) << std::endl; /** string "abc" */
      assert(std::get<0>(x) == 1);
      assert(std::get<1>(x) == 1.23);
      assert(std::get<2>(x) == "abc");
    });

    log() << "wait 2 sec" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
  }

と、まぁ、これはこれで運用で使い込んで行こうと思うのですが、Promise は値が1個しか発行することができず、やはり、連続的にデータを捌くケースでは ReactiveX 的なのが欲しい。

ということで、恐れ多いのですが、この勢いで RxCpp とは別の RxCpp 的なのを作ってみようと考えました。

another-rxcpp

RxCpp で生成されるシンボルが巨大過ぎてデバッガが死ぬという原因は、間違いなくテンプレートパラメータの第2引数以降にある(主に上流のソース(observable)の型)と考え、上流の型を関数オブジェクトでくるんでしまう手法で実装してみました。つまり、多くは固定的なテンプレートパラメータになるため、テンプレートの具体化時のコストも減るし、シンボルもそう長くならないのではと夢見てます。(もしかすると関数オブジェクトの生成量が増えてしまい、結果、夢に終わるかも知れませんが。。。)

template <typename T, typename TT = typename std::remove_const<typename std::remove_reference<T>::type>::type>
auto ovalue(T&& value, int delay = 0) -> observable<TT> {
  auto _value = std::forward<T>(value);
  return observable<>::create<TT>([_value, delay](subscriber<TT> s) {
    if(delay == 0){
      s.on_next(std::move(_value));
      s.on_completed();
    }
    else{
      setTimeout([s, _value]() {
        s.on_next(std::move(_value));
        s.on_completed();
      }, delay);
    }
  });
}

template <typename T> auto doSubscribe(T source) {
  log() << "doSubscribe" << std::endl;
  return source.subscribe({
    .on_next = [](auto&& x) {
      log() << "  [on_next] " << x << std::endl;
    },
    .on_error = [](std::exception_ptr err) {
      log() << "  [on_error] " << std::endl;
    },
    .on_completed = []() {
      log() << "  [on_completed] " << std::endl;
    }
  });
}

void test_observable() {
  log() << "test_observable -- begin" << std::endl;

  {
    log() << "#1" << std::endl;
    auto ob = ovalue(123)
    | flat_map([](int&& x){
      log() << x << std::endl;
      return ovalue(std::string("abc"))
      | map([](std::string&& x){
        log() << x << std::endl;
        return 456;
      });
    });
    doSubscribe(ob);
  }

  {
    auto ob = ovalue(1)
    | flat_map([](int&& x){
      log() << x << std::endl;
      return ovalue(std::string("abc"), 500);
    })
    | flat_map([](std::string&& x){
      log() << x << std::endl;
      return ovalue(5);
    })
    | flat_map([](int&& x){
      log() << x << std::endl;
      return ovalue(x + 1, 500);
    })
    | flat_map([](int&& x){
      log() << x << std::endl;
      return ovalue(x + 1);
    });
    {
      log() << "#2 wait with notify_on_unsubscribe()" << std::endl;
      auto x = doSubscribe(ob);
      std::mutex mtx;
      std::unique_lock<std::mutex> lock(mtx);
      x.unsubscribe_notice()->wait(lock, [x](){ return x.is_subscribed(); });
    }
    {
      log() << "#3 wait until is_subscribed() == true" << std::endl;
      auto x = doSubscribe(ob);
      while(x.is_subscribed()) {}
    }
  }
}

void test_subject() {
  log() << "test_subject -- begin" << std::endl;

  auto sbj = std::make_shared<subjects::subject<int>>();

  std::weak_ptr<subjects::subject<int>> weak_sbj = sbj;
  std::thread([weak_sbj]() mutable {
    for(int i = 0; i < 100; i++){
      std::this_thread::sleep_for(std::chrono::seconds(1));
      auto p = weak_sbj.lock();
      if(p) p->as_subscriber().on_next(i);
    }
    auto p = weak_sbj.lock();
    if(p) p->as_subscriber().on_completed();
  }).detach();

  wait(2500);
  auto s1 = doSubscribe(sbj->as_observable());

  wait(1000);
  auto s2 = doSubscribe(sbj->as_observable());

  wait(1000);
  auto s3 = doSubscribe(sbj->as_observable());

  wait(500);
  s1.unsubscribe();

  wait(500);
  s2.unsubscribe();

  wait(500);
  s3.unsubscribe();

  wait(1000);
  auto s4 = doSubscribe(sbj->as_observable());

  wait(2000);

  sbj.reset();

  log() << "test_subject -- end" << std::endl << std::endl;
}

ヨチヨチですが、まぁ動いています。(苦笑)
こちらは、もう少しオペレータを追加しないと業務では使い物にならないので、絶賛コーディング中になりますが、ある程度実装が終わったら現状の RxCpp と置き換えてみてコンパイル速度やデバッガの負荷などみてみたいと思います。
ただ、本家の RxCpp のデバッガに優しい感じになってきたら、元サヤに戻ろうかと。

2020/01/15追記

その後、 another-rxcpp にオペレータを追加したりバグ修正を行いました。
また、RxCpp を使用しているプロジェクトでコード修正しなくて済むよう互換オプションも追加しました。
そして、弊社で開発している RxCpp を使用している中規模アプリで置き換えてみたところ以下のような結果となりました。

  • コンパイル時間が約1/3以下に短縮
  • デバッグシンボルが1/10程度に減少
  • デバッガの起動時間は1/10程度に短縮

唯一、スケジューラの解放タイミングがRxCppよりも遅いため、一時的にスレッド数が増える傾向がありますが、それ以外は概ね問題なさそうです。(メモリリークはValgrindで確認しましたが問題ないです)
まぁ、RxCpp を採用しているプロジェクトは金融系なので、おいそれと基本ライブラリを変更する訳にはいかない(テストフェーズがハンパない)のですが、リリースタイミングを見計らって全てanother-rxcppに置き換えていこうかと思います。

年末・年始は、この関数型プログラミングの悪夢で何度も「はっ!!」と起きてコーディングする日々が続いてましたが、そろそろゆっくり眠れそうです。

1
0
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?