EOSDay 1

EOSのstatic_variantによるネットワークメッセージの定義を調べる

EOSのアドベントカレンダー初日です。

EOSについて深掘りした内容を書くほどEOSについて詳しくないので、被らないように最初にしました...

この記事はEOSのv1.4.4のソースコードを元に書いています。


static_variant

plugins/net_plugin/include/eosio/net_plugin/protocol.hppに EOSで使うメッセージの定義が下記のようにあります。


struct chain_size_message {
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
block_id_type head_id;
};

struct handshake_message {
uint16_t network_version = 0; ///< incremental value above a computed base
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time;
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
block_id_type head_id;
string os;
string agent;
int16_t generation;
};

enum go_away_reason {
no_reason, ///< no reason to go away
self, ///< the connection is to itself
duplicate, ///< the connection is redundant
wrong_chain, ///< the peer's chain id doesn't match
wrong_version, ///< the peer's network version doesn't match
forked, ///< the peer's irreversible blocks are different
unlinkable, ///< the peer sent a block we couldn't use
bad_transaction, ///< the peer sent a transaction that failed verification
validation, ///< the peer sent a block that failed validation
benign_other, ///< reasons such as a timeout. not fatal but warrant resetting
fatal_other, ///< a catch-all for errors we don't have discriminated
authentication ///< peer failed authenicatio
};

constexpr auto reason_str( go_away_reason rsn ) {
switch (rsn ) {
case no_reason : return "no reason";
case self : return "self connect";
case duplicate : return "duplicate";
case wrong_chain : return "wrong chain";
case wrong_version : return "wrong version";
case forked : return "chain is forked";
case unlinkable : return "unlinkable block received";
case bad_transaction : return "bad transaction";
case validation : return "invalid block";
case authentication : return "authentication failure";
case fatal_other : return "some other failure";
case benign_other : return "some other non-fatal condition";
default : return "some crazy reason";
}
}

struct go_away_message {
go_away_message (go_away_reason r = no_reason) : reason(r), node_id() {}
go_away_reason reason;
fc::sha256 node_id; ///< for duplicate notification
};

struct time_message {
tstamp org; //!< origin timestamp
tstamp rec; //!< receive timestamp
tstamp xmt; //!< transmit timestamp
mutable tstamp dst; //!< destination timestamp
};

enum id_list_modes {
none,
catch_up,
last_irr_catch_up,
normal
};

constexpr auto modes_str( id_list_modes m ) {
switch( m ) {
case none : return "none";
case catch_up : return "catch up";
case last_irr_catch_up : return "last irreversible";
case normal : return "normal";
default: return "undefined mode";
}
}

template<typename T>
struct select_ids {
select_ids () : mode(none),pending(0),ids() {}
id_list_modes mode;
uint32_t pending;
vector<T> ids;
bool empty () const { return (mode == none || ids.empty()); }
};

using ordered_txn_ids = select_ids<transaction_id_type>;
using ordered_blk_ids = select_ids<block_id_type>;

struct notice_message {
notice_message () : known_trx(), known_blocks() {}
ordered_txn_ids known_trx;
ordered_blk_ids known_blocks;
};

struct request_message {
request_message () : req_trx(), req_blocks() {}
ordered_txn_ids req_trx;
ordered_blk_ids req_blocks;
};

struct sync_request_message {
uint32_t start_block;
uint32_t end_block;
};

using net_message = static_variant<handshake_message,
chain_size_message,
go_away_message,
time_message,
notice_message,
request_message,
sync_request_message,
signed_block,
packed_transaction>;

注目するところは上の最後のところで下記に再度記載します。static_variantにメッセージのstructがジェリクスとして定義されています。

ジェネリクスがある言語はやっぱり、かっこいいですね。(個人の感想です...)

using net_message = static_variant<handshake_message,

chain_size_message,
go_away_message,
time_message,
notice_message,
request_message,
sync_request_message,
signed_block,
packed_transaction>;

net_messageを使っているのはplugins/net_plugin/net_plugin.cpp

とかです。ジェネリクスが上手く使われている感じがします。

   void connection::enqueue( const net_message &m, bool trigger_send ) {

go_away_reason close_after_send = no_reason;
if (m.contains<go_away_message>()) {
close_after_send = m.get<go_away_message>().reason;
}

uint32_t payload_size = fc::raw::pack_size( m );
char * header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);

size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
ds.write( header, header_size );
fc::raw::pack( ds, m );
connection_wptr weak_this = shared_from_this();
queue_write(send_buffer,trigger_send,
[weak_this, close_after_send](boost::system::error_code ec, std::size_t ) {
connection_ptr conn = weak_this.lock();
if (conn) {
if (close_after_send != no_reason) {
elog ("sent a go away message: ${r}, closing connection to ${p}",("r", reason_str(close_after_send))("p", conn->peer_name()));
my_impl->close(conn);
return;
}
} else {
fc_wlog(logger, "connection expired before enqueued net_message called callback!");
}
});
}

static_variantはサブモジュールのinclude/fc/static_variant.hppにあります。

ソースの上ののコメントを見ると https://github.com/kmicklas/variadic-static_variant から持って来たみたいですが、現在のリポジトリは https://github.com/kmicklas/variadic-variantこっちポイです。

 

で、 static_variant.hppを読んでみようと思うと、可変引数テンプレートと再帰の嵐でC++力の低い自分にはよく理解が出来ないです...

可変引数テンプレートreinterpret_castを駆使して任意の方のキャストが出来るようです。

`

    template<typename visitor>

typename visitor::result_type visit(const visitor& v)const {
return impl::storage_ops<0, Types...>::apply(_tag, storage, v);
}

上のコードの戻り値の型がなんなのかよく分かってないんですが、多分、下のコードの戻り値の型を抽象的に書いているのでしょうか?

template<typename Result>

struct visitor {
typedef Result result_type;
};

とりあえず、static_variantで任意の構造体を扱えるようになっているみたいですね。

Scalaのshapelessみたいなものでしょうか。


pack unpack

もう一つ興味深いのが、 connection::enqueueでも使われているのですが、 fc::raw::pack( ds, m );で任意の型をシリアライズしています。

include/fc/io/raw.hppにstatic_variantのpack、unpackの方法があります。

ここでも、ジェネリクスプログラミングを駆使しています...

    template<typename Stream> void pack( Stream& s, const UInt<256>& n );

template<typename Stream> void pack( Stream& s, const Int<256>& n );
template<typename Stream, typename T> void pack( Stream& s, const boost::multiprecision::number<T>& n );

template<typename Stream, typename Arg0, typename... Args>
inline void pack( Stream& s, const Arg0& a0, Args... args ) {
pack( s, a0 );
pack( s, args... );
}

template<typename Stream>
inline void pack( Stream& s, const fc::exception& e )
{
fc::raw::pack( s, e.code() );
fc::raw::pack( s, std::string(e.name()) );
fc::raw::pack( s, std::string(e.what()) );
fc::raw::pack( s, e.get_log() );
}

template<typename Stream>
inline void pack( Stream& s, const fc::log_message& msg )
{
fc::raw::pack( s, variant(msg) );
}

template<typename Stream>
inline void pack( Stream& s, const fc::path& tp )
{
fc::raw::pack( s, tp.generic_string() );
}

template<typename Stream>
inline void pack( Stream& s, const fc::time_point_sec& tp )
{
uint32_t usec = tp.sec_since_epoch();
s.write( (const char*)&usec, sizeof(usec) );
}

template<typename Stream>
inline void pack( Stream& s, const fc::time_point& tp )
{
uint64_t usec = tp.time_since_epoch().count();
s.write( (const char*)&usec, sizeof(usec) );
}

template<typename Stream>
inline void pack( Stream& s, const fc::microseconds& usec )
{
uint64_t usec_as_int64 = usec.count();
s.write( (const char*)&usec_as_int64, sizeof(usec_as_int64) );
}

template<typename Stream, typename T, size_t N>
inline auto pack( Stream& s, const fc::array<T,N>& v) -> std::enable_if_t<!is_trivial_array<T>>
{
static_assert( N <= MAX_NUM_ARRAY_ELEMENTS, "number of elements in array is too large" );
for (uint64_t i = 0; i < N; ++i)
fc::raw::pack(s, v.data[i]);
}

template<typename Stream, typename T, size_t N>
inline auto pack( Stream& s, const fc::array<T,N>& v) -> std::enable_if_t<is_trivial_array<T>>
{
static_assert( N <= MAX_NUM_ARRAY_ELEMENTS, "number of elements in array is too large" );
s.write((const char*)&v.data[0], N*sizeof(T));
}

template<typename Stream, typename T, size_t N>
inline auto unpack( Stream& s, fc::array<T,N>& v) -> std::enable_if_t<is_trivial_array<T>>
{ try {
static_assert( N <= MAX_NUM_ARRAY_ELEMENTS, "number of elements in array is too large" );
s.read((char*)&v.data[0], N*sizeof(T));
} FC_RETHROW_EXCEPTIONS( warn, "fc::array<${type},${length}>", ("type",fc::get_typename<T>::name())("length",N) ) }

template<typename Stream, typename T, size_t N>
inline void pack( Stream& s, T (&v)[N]) {
fc::raw::pack( s, unsigned_int((uint32_t)N) );
for (uint64_t i = 0; i < N; ++i)
fc::raw::pack(s, v[i]);
}

template<typename Stream, typename T>
inline void pack( Stream& s, const std::shared_ptr<T>& v)
{
fc::raw::pack( s, bool(!!v) );
if( !!v ) fc::raw::pack( s, *v );
}

template<typename Stream> inline void pack( Stream& s, const signed_int& v ) {
uint32_t val = (v.value<<1) ^ (v.value>>31);
do {
uint8_t b = uint8_t(val) & 0x7f;
val >>= 7;
b |= ((val > 0) << 7);
s.write((char*)&b,1);//.put(b);
} while( val );
}

template<typename Stream> inline void pack( Stream& s, const unsigned_int& v ) {
uint64_t val = v.value;
do {
uint8_t b = uint8_t(val) & 0x7f;
val >>= 7;
b |= ((val > 0) << 7);
s.write((char*)&b,1);//.put(b);
}while( val );
}

template<typename Stream> inline void pack( Stream& s, const char* v ) { fc::raw::pack( s, fc::string(v) ); }

template<typename Stream, typename T>
void pack( Stream& s, const safe<T>& v ) { fc::raw::pack( s, v.value ); }

template<typename Stream, typename T, unsigned int S, typename Align>
void pack( Stream& s, const fc::fwd<T,S,Align>& v ) {
fc::raw::pack( *v );
}

template<typename Stream, typename T>
void pack( Stream& s, const fc::smart_ref<T>& v ) { fc::raw::pack( s, *v ); }

// optional
template<typename Stream, typename T>
void pack( Stream& s, const fc::optional<T>& v ) {
fc::raw::pack( s, bool(!!v) );
if( !!v ) fc::raw::pack( s, *v );
}

// std::vector<char>
template<typename Stream> inline void pack( Stream& s, const std::vector<char>& value ) {
FC_ASSERT( value.size() <= MAX_SIZE_OF_BYTE_ARRAYS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
if( value.size() )
s.write( &value.front(), (uint32_t)value.size() );
}

// fc::string
template<typename Stream> inline void pack( Stream& s, const fc::string& v ) {
FC_ASSERT( v.size() <= MAX_SIZE_OF_BYTE_ARRAYS );
fc::raw::pack( s, unsigned_int((uint32_t)v.size()));
if( v.size() ) s.write( v.c_str(), v.size() );
}

// bip::basic_string
template<typename Stream> inline void pack( Stream& s, const shared_string& v ) {
FC_ASSERT( v.size() <= MAX_SIZE_OF_BYTE_ARRAYS );
fc::raw::pack( s, unsigned_int((uint32_t)v.size()));
if( v.size() ) s.write( v.c_str(), v.size() );
}

// bool
template<typename Stream> inline void pack( Stream& s, const bool& v ) { fc::raw::pack( s, uint8_t(v) ); }
template<typename Stream> inline void unpack( Stream& s, bool& v )
{
uint8_t b;
fc::raw::unpack( s, b );
FC_ASSERT( (b & ~1) == 0 );
v=(b!=0);
}

namespace detail {

template<typename Stream, typename Class>
struct pack_object_visitor {
pack_object_visitor(const Class& _c, Stream& _s)
:c(_c),s(_s){}

template<typename T, typename C, T(C::*p)>
void operator()( const char* name )const {
fc::raw::pack( s, c.*p );
}
private:
const Class& c;
Stream& s;
};

template<typename IsClass=fc::true_type>
struct if_class{
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) { s << v; }
};

template<>
struct if_class<fc::false_type> {
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) {
s.write( (char*)&v, sizeof(v) );
}
};

template<typename IsEnum=fc::false_type>
struct if_enum {
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) {
fc::reflector<T>::visit( pack_object_visitor<Stream,T>( v, s ) );
}
};
template<>
struct if_enum<fc::true_type> {
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) {
fc::raw::pack(s, (int64_t)v);
}
template<typename Stream, typename T>
static inline void unpack( Stream& s, T& v ) {
int64_t temp;
fc::raw::unpack(s, temp);
v = (T)temp;
}
};

template<typename IsReflected=fc::false_type>
struct if_reflected {
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) {
if_class<typename fc::is_class<T>::type>::pack(s,v);
}
};
template<>
struct if_reflected<fc::true_type> {
template<typename Stream, typename T>
static inline void pack( Stream& s, const T& v ) {
if_enum< typename fc::reflector<T>::is_enum >::pack(s,v);
}
};

} // namesapce detail

template<typename Stream, typename T>
inline void pack( Stream& s, const std::unordered_set<T>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}

template<typename Stream, typename K, typename V>
inline void pack( Stream& s, const std::pair<K,V>& value ) {
fc::raw::pack( s, value.first );
fc::raw::pack( s, value.second );
}

template<typename Stream, typename K, typename V>
inline void pack( Stream& s, const std::unordered_map<K,V>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}
template<typename Stream, typename K, typename V>
inline void pack( Stream& s, const std::map<K,V>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}

template<typename Stream, typename T>
inline void pack( Stream& s, const std::deque<T>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}

template<typename Stream, typename T>
inline void pack( Stream& s, const std::vector<T>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}

template<typename Stream, typename T>
inline void pack( Stream& s, const std::set<T>& value ) {
FC_ASSERT( value.size() <= MAX_NUM_ARRAY_ELEMENTS );
fc::raw::pack( s, unsigned_int((uint32_t)value.size()) );
auto itr = value.begin();
auto end = value.end();
while( itr != end ) {
fc::raw::pack( s, *itr );
++itr;
}
}

template<typename Stream, typename T, std::size_t S>
inline auto pack( Stream& s, const std::array<T, S>& value ) -> std::enable_if_t<is_trivial_array<T>>
{
s.write((const char*)value.data(), S * sizeof(T));
}

template<typename Stream, typename T, std::size_t S>
inline auto pack( Stream& s, const std::array<T, S>& value ) -> std::enable_if_t<!is_trivial_array<T>>
{
for( std::size_t i = 0; i < S; ++i ) {
fc::raw::pack( s, value[i] );
}
}

template<typename Stream, typename T>
inline void pack( Stream& s, const T& v ) {
fc::raw::detail::if_reflected< typename fc::reflector<T>::is_defined >::pack(s,v);
}

template<typename T>
inline size_t pack_size( const T& v )
{
datastream<size_t> ps;
fc::raw::pack(ps,v );
return ps.tellp();
}

template<typename T>
inline std::vector<char> pack( const T& v ) {
datastream<size_t> ps;
fc::raw::pack(ps,v );
std::vector<char> vec(ps.tellp());

if( vec.size() ) {
datastream<char*> ds( vec.data(), size_t(vec.size()) );
fc::raw::pack(ds,v);
}
return vec;
}

template<typename T, typename... Next>
inline std::vector<char> pack( const T& v, Next... next ) {
datastream<size_t> ps;
fc::raw::pack(ps,v,next...);
std::vector<char> vec(ps.tellp());

if( vec.size() ) {
datastream<char*> ds( vec.data(), size_t(vec.size()) );
fc::raw::pack(ds,v,next...);
}
return vec;
}

template<typename T>
inline void pack( char* d, uint32_t s, const T& v ) {
datastream<char*> ds(d,s);
fc::raw::pack(ds,v );
}

template<typename Stream>
struct pack_static_variant
{
Stream& stream;
pack_static_variant( Stream& s ):stream(s){}

typedef void result_type;
template<typename T> void operator()( const T& v )const
{
fc::raw::pack( stream, v );
}
};

};

template<typename Stream, typename... T>
void pack( Stream& s, const static_variant<T...>& sv )
{
fc::raw::pack( s, unsigned_int(sv.which()) );
sv.visit( pack_static_variant<Stream>(s) );
}

template<typename Stream, typename T> void pack( Stream& s, const boost::multiprecision::number<T>& n ) {
static_assert( sizeof( n ) == (std::numeric_limits<boost::multiprecision::number<T>>::digits+1)/8, "unexpected padding" );
s.write( (const char*)&n, sizeof(n) );
}

template<typename Stream> void pack( Stream& s, const UInt<256>& n ) {
pack( s, static_cast<UInt<128>>(n) );
pack( s, static_cast<UInt<128>>(n >> 128) );
}

packだけ抽出してきましたが、ひたすら色々な方のpackを実装しています。


EOSはこのシリアライズのあとzlibをつかって圧縮してネットワーク転送をしているようです。

EthereumはRLPを定義して色んな言語でノードを作れるようにして、1つの言語のノードに問題があった時でも、全体としては稼働する戦略をとっていますが、EOSはfcライブラリにどっぷり依存しているせいで多言語で実装するのは難しいのかなと感じました...