parityのNetwork周りについて調べてみた

Ethereum Advent Calendar 2018 6日目の記事です。

parity-ethereumのネットワーク周りについて調べたので、それを紹介します。

多分、Ethereumでもノードの実装がどうなっているかなんて興味ある人はほとんどいないと思いますが...(ましてやparityなんて...)

parity-ethereumはpartity社が作っているEthereumのRust実装です。

この記事はv2.1.8のソースコードを元に記述しています。

parity-ethereumのネットワーク周りで関係してくるのが下記のライブラリです。(全部parity-ethereum内にあり、サブモジュール的な扱いになっています)


  • ethcore/service


    • ethereumのノードとなるClientServiceがある



  • util/io/


    • IoHandler・IoMessage・IoContext・IoManager・IoServiceなどparity-ethereumで使われるIOのラッパーがある



  • util/network/


    • ClientServiceで使われるp2pのNetworkConfiguration・NetworkContextの定義がある



  • ethcore/src/client



    • struct Clientchain: RwLock<Arc<BlockChain>>notify: RwLock<Vec<Weak<ChainNotify>>> をもっていて、イベントの通知などを管理している。


    • trait ChainNotifyがあり、blockchainの変更イベントのインターフェースが存在する。



  • ethcore/sync


    • 起動時などにblockをsyncするためのライブラリ、ブロック・トランザクションの受け取り・伝播のロジックがある

    • peerで接続しているnodeからtransaction情報やblock情報を伝播して貰う


    • impl ChainNotify for EthSyncしている


    • struct ChainSynctransactions_receivedなどがある



  • network-devp2p


    • p2pの接続ライブラリ、ここで接続したpeerとブロック・トランザクションの伝播を行う



他にもlight clientやwhisper用の処理もあるのですが、今回は割愛します。


全体の流れ

parity起動 -> devp2pで接続ノードを決定 -> 接続ノードからsync

という流れです。


ClientService周り

pub struct ClientService {

io_service: Arc<IoService<ClientIoMessage>>,
client: Arc<Client>,
...
}

io_service: Arc<IoService<ClientIoMessage>>channelでメッセージのやり取りをするチャンネルを作って、register_handlerでevent loopをつくるようです。

client: Arc<Client>はマイニング、メッセージの送受信、dbへの書き込み等何でもやるみたいです。

下がclientのネットワーク周りだけ抜粋しものです。

pub struct Client {

...
io_channel: RwLock<IoChannel<ClientIoMessage>>,
...
notify: RwLock<Vec<Weak<ChainNotify>>>,
...
queue_transactions: IoChannelQueue,
queue_ancient_blocks: IoChannelQueue,
...
queue_consensus_message: IoChannelQueue,
...
}

#[derive(Debug)]
pub enum ClientIoMessage {
/// Best Block Hash in chain has been changed
NewChainHead,
/// A block is ready
BlockVerified,
/// Begin snapshot restoration
BeginRestoration(ManifestData),
/// Feed a state chunk to the snapshot service
FeedStateChunk(H256, Bytes),
/// Feed a block chunk to the snapshot service
FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number.
TakeSnapshot(u64),
/// Execute wrapped closure
Execute(Callback),
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {

fn new_blocks(
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
_duration: Duration,
) {
// does nothing by default
}

fn start(&self) {
// does nothing by default
}

fn stop(&self) {
// does nothing by default
}

fn broadcast(&self, _message_type: ChainMessageType) {}

fn transactions_received(&self,
_txs: &[UnverifiedTransaction],
_peer_id: usize,
) {
// does nothing by default
}
}

ClientIoMessageがenumになっているのがジェネリクスがある言語の良いところですね。

あと、IoChannelQueueでメッセージをqueueで処理しているようです。

それと、ChainNotifyでObserverパターン的に通知をしているっぽいですね。

上の ChainNotify はinterface的な使い方のだとおもいますが、なんで何もしない実装を定義したのか?

テンプレートメソッドパターン的にimplで使わないのは何もしない処理を書きたくなかったのか...?

(OOP的にアンチパターンな気もしますが...)

実際のブロックチェーン・トランザクションの同期・伝播は下記の箇所で行っています。

// ethcore/sync/src/api.rs

impl ChainNotify for EthSync {
...
}


util/io/

IO周りのライブラリーです。IOがある他のプログラムは大体ここに依存しています。

型の定義にあるコメントを翻訳してます。

IoChannel、IoContext、IoHandler辺りがよく出てきますかね。

/// メッセージが他のスレッドのイベントループとコミニケーションするのに使う

#[derive(Clone)]
pub enum IoMessage<Message> where Message: Send + Sized {
/// Shutdown イベントループ
Shutdown,
/// 新しいプロトコルハンドラの登録
AddHandler {
handler: Arc<IoHandler<Message>+Send>,
},
RemoveHandler {
handler_id: HandlerId,
},
AddTimer {
handler_id: HandlerId,
token: TimerToken,
delay: Duration,
once: bool,
},
RemoveTimer {
handler_id: HandlerId,
token: TimerToken,
},
RegisterStream {
handler_id: HandlerId,
token: StreamToken,
},
DeregisterStream {
handler_id: HandlerId,
token: StreamToken,
},
UpdateStreamRegistration {
handler_id: HandlerId,
token: StreamToken,
},
/// 全てのプロトコルハンドラにメッセージをブロードキャスト
UserMessage(Arc<Message>)
}

/// ルートIOハンドラ。ユーザーハンドラメッセージIOタイマーの管理をする
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: deque::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
}

/// メッセージをイベントループに送信できるようにする。すべてのIOハンドラは `message`コールバックでメッセージを取得する
pub struct IoChannel<Message> where Message: Send {
channel: Option<Sender<IoMessage<Message>>>,
handlers: Handlers<Message>,
}

/// 汎用的なIO Service。イベントループを開始し、IO リクエストをディスパッチする
/// 'Message' は通知メッセージの種類
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Option<JoinHandle<()>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
}

/// IOアクセスポイント。 これはすべてのIOハンドラ に渡され、IOサブシステムへのインタフェースを提供する
pub struct IoContext<Message> where Message: Send + Sync + 'static {
channel: IoChannel<Message>,
handler: HandlerId,
}

/// 汎用的な IO handler
/// 全てのハンドラ関数はIOイベントループの中で呼ばれる
/// `Message`タイプは通知のデータに使われる
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
/// handlerの初期化
fn initialize(&self, _io: &IoContext<Message>) {}
/// `HandlerIo::timeout`によってくつられるタイムアウトの後に呼ばれるタイマー関数
fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
/// ブロードキャストされたメッセージが受信されたときに呼び出される。メッセージは、異なるIOハンドラからのみ送信できる.
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
/// IOストリームがクローズされたときに呼び出される
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// IOストリームを読み込めるときに呼び出される
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// IOストリームに書き込むことができるときに呼び出される
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
/// 新しいストリームをイベントループに登録する
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// イベントループでストリームを再登録する
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// ストリームを登録解除する。イベントループからストリームが削除されたときに呼び出される
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
}


util/network/

ネットワークのセットアップ時に使われるライブラリですかね。

そんなに使われている場所はないですが、ネットワークの接続や設定の型があります。

/// Messages used to communitate with the event loop from other threads.

#[derive(Clone)]
pub enum NetworkIoMessage {
/// Register a new protocol handler.
AddHandler {
/// Handler shared instance.
handler: Arc<NetworkProtocolHandler + Sync>,
/// Protocol Id.
protocol: ProtocolId,
/// Supported protocol versions and number of packet IDs reserved by the protocol (packet count).
versions: Vec<(u8, u8)>,
},
/// Register a new protocol timer
AddTimer {
/// Protocol Id.
protocol: ProtocolId,
/// Timer token.
token: TimerToken,
/// Timer delay.
delay: Duration,
},
/// Initliaze public interface.
InitPublicInterface,
/// Disconnect a peer.
Disconnect(PeerId),
/// Disconnect and temporary disable peer.
DisablePeer(PeerId),
/// Network has been started with the host as the given enode.
NetworkStarted(String),
}

/// Network service configuration
#[derive(Debug, PartialEq, Clone)]
pub struct NetworkConfiguration {
/// Directory path to store general network configuration. None means nothing will be saved
pub config_path: Option<String>,
/// Directory path to store network-specific configuration. None means nothing will be saved
pub net_config_path: Option<String>,
/// IP address to listen for incoming connections. Listen to all connections by default
pub listen_address: Option<SocketAddr>,
/// IP address to advertise. Detected automatically if none.
pub public_address: Option<SocketAddr>,
/// Port for UDP connections, same as TCP by default
pub udp_port: Option<u16>,
/// Enable NAT configuration
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
pub use_secret: Option<Secret>,
/// Minimum number of connected peers to maintain
pub min_peers: u32,
/// Maximum allowed number of peers
pub max_peers: u32,
/// Maximum handshakes
pub max_handshakes: u32,
/// Reserved protocols. Peers with <key> protocol get additional <value> connection slots.
pub reserved_protocols: HashMap<ProtocolId, u32>,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
/// IP filter
pub ip_filter: IpFilter,
/// Client identifier
pub client_version: String,
}

/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub trait NetworkContext {
/// Send a packet over the network to another peer.
fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;

/// Send a packet over the network to another peer using specified protocol.
fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;

/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;

/// Disconnect a peer and prevent it from connecting again.
fn disable_peer(&self, peer: PeerId);

/// Disconnect peer. Reconnect can be attempted later.
fn disconnect_peer(&self, peer: PeerId);

/// Check if the session is still active.
fn is_expired(&self) -> bool;

/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>;

/// Returns peer identification string
fn peer_client_version(&self, peer: PeerId) -> String;

/// Returns information on p2p session
fn session_info(&self, peer: PeerId) -> Option<SessionInfo>;

/// Returns max version for a given protocol.
fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8>;

/// Returns this object's subprotocol name.
fn subprotocol_name(&self) -> ProtocolId;

/// Returns whether the given peer ID is a reserved peer.
fn is_reserved_peer(&self, peer: PeerId) -> bool;
}

下が、util/io/src/lib.rsにあるIOのexampleになります。


//! General IO module.
//!
//! Example usage for creating a network service and adding an IO handler:
//!
extern crate ethcore_io;
use ethcore_io::*;
use std::sync::Arc;
use std::time::Duration;

struct MyHandler;

#[derive(Clone)]
struct MyMessage {
data: u32
}

impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(0, Duration::from_secs(1)).unwrap();
}

fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
println!("Timeout {}", timer);
}

fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
println!("Message {}", message.data);
}
}

fn main () {
let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(Arc::new(MyHandler)).unwrap();

// Wait for quit condition
// ...
// Drop the service

ドキュメントによるとIoServiceIoHandlerIoContextをこんな感じで使う用です。

util/ioは多分、parity-ethereum以外でもハイレベルな汎用的なIOとして使えるる気がします?


ethcore/src/client/

ここにあるclientがノード全体のハンドリングを担っているようです。

Client ChainNotify ClientIoMessageなどがここにあります。

parityはminingの処理が実装されていないせいで(ethminerなど別のライブラリを使ってマイニングする)、submit_workなどの関数を呼び出している箇所がなくこの辺りの処理を追うのがgethより辛いです...


ethcore/sync/src/chain/

ソースを追っていくと、worker -> network-devp2pでreadか呼ばれ、そこからSyncHandlerのon_packetまで流れていくので、network-devp2pでp2p接続時にコネクションをはってそのコネクションを利用してブロック・トランザクションの伝播しているようです。

// util/io/src/worker.rs

impl Worker {
...
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + 'static {
match work.work_type {
WorkType::Readable => {
work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token);
},
WorkType::Writable => {
...
}
}

// util/io/src/lib.rs
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
...
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken)
...
}

// util/network-devp2p/src/host.rs
impl IoHandler<NetworkIoMessage> for Host {
...
fn stream_readable(&self, io: &IoContext<NetworkIoMessage>, stream: StreamToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => self.discovery_readable(io),
TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"),
}
}
...
}

// util/network-devp2p/src/host.rs
impl Host {
...
fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage>) {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Vec<(ProtocolId, PacketId, Vec<u8>)> = Vec::new();
let mut kill = false;
let session = { self.sessions.read().get(token).cloned() };
let mut ready_id = None;
...
for (p, packet_id, data) in packet_data {
let reserved = self.reserved_nodes.read();
if let Some(h) = handlers.get(&p) {
h.read(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token, packet_id, &data);
}
}
}
}
...
}


impl NetworkProtocolHandler for SyncProtocolHandler {
...
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
...
}

impl ChainSync {
...
/// Dispatch incoming requests and responses
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
SyncSupplier::dispatch_packet(sync, io, peer, packet_id, data)
}
...
}

impl SyncSupplier {
/// Dispatch incoming requests and responses
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = Rlp::new(data);
let result = match packet_id {
GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),

GET_BLOCK_HEADERS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),

GET_RECEIPTS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),

GET_NODE_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),

GET_SNAPSHOT_MANIFEST_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_snapshot_manifest,
|e| format!("Error sending snapshot manifest: {:?}", e)),

GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
SyncSupplier::return_snapshot_data,
|e| format!("Error sending snapshot data: {:?}", e)),
CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp),
_ => {
sync.write().on_packet(io, peer, packet_id, data);
Ok(())
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
}
...
}

impl ChainSync {
...
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
SyncHandler::on_packet(self, io, peer, packet_id, data);
}
...
}

impl SyncHandler {
/// Handle incoming packet from peer
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
return;
}
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp),
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
Ok(())
}
};
...
}

SyncHandlerのon_packetに全て集約されている?


余談

devp2pとIOを使ったサンプルもプログラム内にありました(使う人いるのかな...?)


//! Network and general IO module.
//!
//! Example usage for creating a network service and adding an IO handler:

extern crate ethcore_network as net;
extern crate ethcore_network_devp2p as devp2p;
use net::*;
use devp2p::NetworkService;
use std::sync::Arc;
use std::time::Duration;

struct MyHandler;
impl NetworkProtocolHandler for MyHandler {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(0, Duration::from_secs(1));
}

fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer);
}

fn connected(&self, io: &NetworkContext, peer: &PeerId) {
println!("Connected {}", peer);
}

fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
println!("Disconnected {}", peer);
}
}

fn main () {
let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start().expect("Error starting service");
service.register_protocol(Arc::new(MyHandler), *b"myp", &[(1u8, 1u8)]);

// Wait for quit condition
// ...
// Drop the service
}

...

本当はmioについて書こうと思ったんですが、調べてたら、このissueにも上がってますが、parity-ethereumもmioをtokioにしたいみたいだったり、このPull Request

によると、軽量クライアントに対応したいが、mioがWASMに対応していないらしく、少しずつ依存が減らされてるため、その上のレイヤー処理を調べました...