2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

C++Advent Calendar 2018

Day 7

boost::signals2とかboost::asio::io_serviceをEOSのプログラムを見ながら理解する

Posted at

C++ Advent Calendar 2018の7日目の記事になります。

普段C++を書いているわけではないのですが、招待いただいたので書いてみました...
お手柔らかにお願いします。

EOSv1.4.4 のソースを追いながら、boost::signals2とかboost::asio::io_serviceの使い方を見てみたいと思います。

boost::signals2はだいたいここを見れば分かるのですが、、、EOSのプログラムで実際使われているところを追っていきます。

// libraries/chain/include/eosio/chain/controller.hpp
   class controller {
      public:
         ...
         signal<void(const block_state_ptr&)>          irreversible_block;
         ...
   }

block_state_ptr&を受け取るシグナルirreversible_blockを定義しています。
補足ですが、irreversible_blockとはEOSにおける、確定したブロックのことで、それを受信している辺りのをこの記事では追っています。(EOSはfinalityはあるが、一時的にforkする可能性があります)

// libraries/chain/controller.cpp
struct controller_impl {
   /**
    *  Plugins / observers listening to signals emited (such as accepted_transaction) might trigger
    *  errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or
    *  cause a node to fork.
    *
    *  If it is ever desirable to let a signal handler bubble an exception out of this method
    *  a full audit of its uses needs to be undertaken.
    *
    */
   template<typename Signal, typename Arg>
   void emit( const Signal& s, Arg&& a ) {
      try {
        s(std::forward<Arg>(a));
      } catch (boost::interprocess::bad_alloc& e) {
         wlog( "bad alloc" );
         throw e;
      } catch ( controller_emit_signal_exception& e ) {
         wlog( "${details}", ("details", e.to_detail_string()) );
         throw e;
      } catch ( fc::exception& e ) {
         wlog( "${details}", ("details", e.to_detail_string()) );
      } catch ( ... ) {
         wlog( "signal handler threw exception" );
      }
   }

   void on_irreversible( const block_state_ptr& s ) {
         ...
         emit(self.irreversible_block, s);
      }
   }

   void push_block( const signed_block_ptr& b, controller::block_status s ) {
         ...
         bool trust = !conf.force_all_checks && (s == controller::block_status::irreversible || s == controller::block_status::validated);
         auto new_header_state = fork_db.add( b, trust );
         ...
         if( s == controller::block_status::irreversible )
            emit( self.irreversible_block, new_header_state );

      } FC_LOG_AND_RETHROW( )
   }
}

C++初心者には難解なstd::forwardを使ってsignalハンドラ呼び出しています。
std::forwardを理解するにはC++の右辺値(rvalue)と左辺値(lvalue)とムーブについての理解が必要でしょうか...?この辺りを読むと理解出来るような...

// plugins/chain_plugin/chain_plugin.cpp
class chain_plugin_impl {
public:
   chain_plugin_impl()
   ...
   ,irreversible_block_channel(app().get_channel<channels::irreversible_block>())
   ...
   channels::irreversible_block::channel_type&     irreversible_block_channel;
   ...
}

void chain_plugin::plugin_initialize(const variables_map& options) {
      ...
      my->irreversible_block_connection = my->chain->irreversible_block.connect( [this]( const block_state_ptr& blk ) {
         my->irreversible_block_channel.publish( blk );
      } );
      ...
}

ここで、ハンドラを設定してますが、さらに別のirreversible_block_channel というのにpublishしています。
Observerパターンでしょうか、ブロックチェーンのネットワーク周りはこのパターンが多いです。

   namespace channels {
      ...
      using irreversible_block     = channel_decl<struct irreversible_block_tag,    block_state_ptr>;
      ...
}

irreversible_blockはchannel_declという型のようです。

// libraries/appbase/include/appbase/channel.hpp
namespace appbase {

   using erased_channel_ptr = std::unique_ptr<void, void(*)(void*)>;
   ...
   /**
    * A channel is a loosely bound asynchronous data pub/sub concept.
    *
    * This removes the need to tightly couple different plugins in the application for the use-case of
    * sending data around
    *
    * Data passed to a channel is *copied*, consider using a shared_ptr if the use-case allows it
    *
    * @tparam Data - the type of data to publish
    */
   template<typename Data, typename DispatchPolicy>
   class channel final {
      public:
         using ios_ptr_type = std::shared_ptr<boost::asio::io_service>;

         /**
          * Type that represents an active subscription to a channel allowing
          * for ownership via RAII and also explicit unsubscribe actions
          */
         class handle {
            public:
               ~handle() {
                  unsubscribe();
               }

               /**
                * Explicitly unsubcribe from channel before the lifetime
                * of this object expires
                */
               void unsubscribe() {
                  if (_handle.connected()) {
                     _handle.disconnect();
                  }
               }
               ...
            private:
               using handle_type = boost::signals2::connection;
               handle_type _handle;

               /**
                * Construct a handle from an internal represenation of a handle
                * In this case a boost::signals2::connection
                *
                * @param _handle - the boost::signals2::connection to wrap
                */
               handle(handle_type&& _handle)
               :_handle(std::move(_handle))
               {}
               friend class channel;
         };

         /**
          * Publish data to a channel.  This data is *copied* on publish.
          * @param data - the data to publish
          */
         void publish(const Data& data) {
            if (has_subscribers()) {
               // this will copy data into the lambda
               ios_ptr->post([this, data]() {
                  _signal(data);
               });
            }
         }

         /**
          * subscribe to data on a channel
          * @tparam Callback the type of the callback (functor|lambda)
          * @param cb the callback
          * @return handle to the subscription
          */
         template<typename Callback>
         handle subscribe(Callback cb) {
            return handle(_signal.connect(cb));
         }

         /**
          * set the dispatcher according to the DispatchPolicy
          * this can be used to set a stateful dispatcher
          *
          * This method is only available when the DispatchPolicy is copy constructible due to implementation details
          *
          * @param policy - the DispatchPolicy to copy
          */
         auto set_dispatcher(const DispatchPolicy& policy ) -> std::enable_if_t<std::is_copy_constructible<DispatchPolicy>::value,void>
         {
            _signal.set_combiner(policy);
         }

         /**
          * Returns whether or not there are subscribers
          */
         bool has_subscribers() {
            auto connections = _signal.num_slots();
            return connections > 0;
         }

      private:
         explicit channel(const ios_ptr_type& ios_ptr)
         :ios_ptr(ios_ptr)
         {
         }

         virtual ~channel() = default;

         /**
          * Proper deleter for type-erased channel
          * note: no type checking is performed at this level
          *
          * @param erased_channel_ptr
          */
         static void deleter(void* erased_channel_ptr) {
            auto ptr = reinterpret_cast<channel*>(erased_channel_ptr);
            delete ptr;
         }

         /**
          * get the channel back from an erased pointer
          *
          * @param ptr - the type-erased channel pointer
          * @return - the type safe channel pointer
          */
         static channel* get_channel(erased_channel_ptr& ptr) {
            return reinterpret_cast<channel*>(ptr.get());
         }

         /**
          * Construct a unique_ptr for the type erased method poiner
          * @return
          */
         static erased_channel_ptr make_unique(const ios_ptr_type& ios_ptr)
         {
            return erased_channel_ptr(new channel(ios_ptr), &deleter);
         }

         ios_ptr_type ios_ptr;
         boost::signals2::signal<void(const Data&), DispatchPolicy> _signal;

         friend class appbase::application;
   };

   /**
    *
    * @tparam Tag - API specific discriminator used to distinguish between otherwise identical data types
    * @tparam Data - the typ of the Data the channel carries
    * @tparam DispatchPolicy - The dispatch policy to use for this channel (defaults to @ref drop_exceptions)
    */
   template< typename Tag, typename Data, typename DispatchPolicy = drop_exceptions >
   struct channel_decl {
      using channel_type = channel<Data, DispatchPolicy>;
      using tag_type = Tag;
   };

   template <typename...Ts>
   std::true_type is_channel_decl_impl(const channel_decl<Ts...>*);

   std::false_type is_channel_decl_impl(...);

   template <typename T>
   using is_channel_decl = decltype(is_channel_decl_impl(std::declval<T*>()));
}

どうやら、boost::asio::io_serviceなどをつかってsignalを非同期処理にするライブラリを自作しています...
boost::asio::io_serviceは非同期処理のライブラリですかね。
この記事が詳しいでしょうか?
publishの中で、非同期処理のpostを実行しています。

// plugins/bnet_plugin/bnet_plugin.cpp
  class session : public std::enable_shared_from_this<session>
  {
        ...
        boost::asio::io_service&                                       _ios;
        boost::asio::io_service&                                       _app_ios;
        ...
  }

   class bnet_plugin_impl : public std::enable_shared_from_this<bnet_plugin_impl> {
      public:
         ...
         channels::irreversible_block::channel_type::handle     _on_irb_handle;
         ...
         template<typename Call>
         void for_each_session( Call callback ) {
            app().get_io_service().post([this, callback = callback] {
               for (const auto& item : _sessions) {
                  if (auto ses = item.second.lock()) {
                     ses->_ios.post(boost::asio::bind_executor(
                           ses->_strand,
                           [ses, cb = callback]() { cb(ses); }
                     ));
                  }
               }
            });
         }

         /**
          * Notify all active connection of the new irreversible block so they
          * can purge their block cache
          */
         void on_irreversible_block( block_state_ptr s ) {
            for_each_session( [s]( auto ses ){ ses->on_new_lib( s ); } );
         }
   ...
   void bnet_plugin::plugin_startup() {
      ...
      my->_on_irb_handle = app().get_channel<channels::irreversible_block>()
                                .subscribe( [this]( block_state_ptr s ){
                                       my->on_irreversible_block(s);
                                });
      ...
}

この辺りで、セットアップ時にsubscribeして通知を受け取るように定義しています。
そして、on_irreversible_blockで非同期にpostしています。
app().get_io_service()はlibraries/appbase/include/appbase/application.hpp
でまたしても自前でaaplication frameworkみたいなのを自作していてそ、の中のメソッドになります。

んー、C++使う人には分かりやすいのでしょうか...?
C++を普段やっていない身としては複雑なテンプレートの嵐とboostの多用で???なのですが、C++で非同期とイベントハンドリングをやるには参考になるプログラムかも知れないですね???
読んでて面白いなーと思います。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?