今回の趣旨
P2Pはじめたい!
なので大人気の言語Rustで書かれているほうのLibp2pを使ってまずはかんたんにmDNSで同じマシンのターミナルで動いてるピアを見つけようと思います。
背景の知識の簡単な説明
Tokio
RustのAsyncな関数や入出力や通信を実行するために使うものです。(人気だけで選びました)
mDNS
LAN中でのサービスを見つけるためのものです。
今回は同じマシンのピアを見つけるために使います。
(ゆくゆくはKademliaが使いたいです……)
とにかくやってみる
今回はTokioとmDNSを使いますので、Cargo.toml
はこんな感じにします。
[package]
name="qiita-article-libp2p"
version="0.0.0"
edition="2021"
[dependencies.libp2p]
version="^0.53.2"
features=[
"mdns",
"noise",
"tcp",
"tokio",
"yamux",
]
[dependencies.tokio]
version="^1.37.0"
features=[
"rt-multi-thread",
"macros",
]
src/main.rs
はこうしてみます。(こうできるまでが長かった……)
use libp2p::futures::StreamExt as _;
use std::error::Error;
#[tokio::main]
async fn main(
)->Result<(),Box<dyn Error>>{
let mut swarm=libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
libp2p::tcp::Config::default(),
libp2p::noise::Config::new,
||libp2p::yamux::Config::default(),
)?
.with_behaviour(
|keypair|{
libp2p::mdns::tokio::Behaviour::new(
libp2p::mdns::Config::default(),
keypair.public().into(),
).unwrap()
}
)?
.build();
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
loop{
println!("{:#?}",swarm.select_next_some().await);
}
}
まずSwarm
はmut
にします。あとでlisten_on
やselect_next_some
する時に必要です。
let mut swarm=libp2p::SwarmBuilder::with_new_identity()
tokio使用のためtokioで、
.with_tokio()
TCPをいい感じに設定します。
.with_tcp(
libp2p::tcp::Config::default(),
libp2p::noise::Config::new,
||libp2p::yamux::Config::default(),
)?
お目当てのmDNSはデフォのままです。
.with_behaviour(
|keypair|{
libp2p::mdns::tokio::Behaviour::new(
libp2p::mdns::Config::default(),
keypair.public().into(),
).unwrap()
}
)?
IPv4のアドレスは全部のTCPのポートはおまかせでLISTENします。
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
イベント全部println!
する無限ループにします。
loop{
println!("{:#?}",swarm.select_next_some().await);
}
結果
cargo run
します。無限ループするので2つ動かします。
2つ動かしたものはそれぞれこんなことになっています:
peer id: 12D3KooWKY715KuGyDdQTAL9qhbhvwqhyne7ARGRWWdH7fzhVhjt
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/127.0.0.1/tcp/40867",
}
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/10.0.2.15/tcp/40867",
}
Behaviour(
Discovered(
[
(
PeerId(
"12D3KooWJ1tK1vX2yggwwpriAkDaReCjiziY6xzau45u8Ka5ScZw",
),
"/ip4/10.0.2.15/tcp/35731/p2p/12D3KooWJ1tK1vX2yggwwpriAkDaReCjiziY6xzau45u8Ka5ScZw",
),
],
),
)
peer id: 12D3KooWJ1tK1vX2yggwwpriAkDaReCjiziY6xzau45u8Ka5ScZw
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/127.0.0.1/tcp/35731",
}
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/10.0.2.15/tcp/35731",
}
Behaviour(
Discovered(
[
(
PeerId(
"12D3KooWKY715KuGyDdQTAL9qhbhvwqhyne7ARGRWWdH7fzhVhjt",
),
"/ip4/10.0.2.15/tcp/40867/p2p/12D3KooWKY715KuGyDdQTAL9qhbhvwqhyne7ARGRWWdH7fzhVhjt",
),
],
),
)
ちゃんとお互いのノードを発見しているようです。
Behaviour(
Discovered(
[
(
PeerId(
"12D3KooWKY715KuGyDdQTAL9qhbhvwqhyne7ARGRWWdH7fzhVhjt",
),
"/ip4/10.0.2.15/tcp/40867/p2p/12D3KooWKY715KuGyDdQTAL9qhbhvwqhyne7ARGRWWdH7fzhVhjt",
),
],
),
)
こんな感じの出力のところに(自分のではなく)相手のIPやポート番号やPeerIdがしっかり出ています。
(一旦)やったね✨🎉
次はPingする
Cargo.toml
にはmacros
とping
を追加です。
[package]
name="handson-libp2p-cource-generated-by-ai"
version="0.0.0"
edition="2021"
[dependencies.libp2p]
version="^0.53.2"
features=[
"macros",
"mdns",
"noise",
"ping",
"tcp",
"tokio",
"yamux",
]
[dependencies.tokio]
version="^1.37.0"
features=[
"rt-multi-thread",
"macros",
]
src/main.rs
はちょっと変更が多めです。
use libp2p::futures::StreamExt as _;
use std::error::Error;
use std::time::Duration;
#[derive(libp2p::swarm::NetworkBehaviour)]
struct Behaviour{
mdns:libp2p::mdns::tokio::Behaviour,
ping:libp2p::ping::Behaviour,
}
impl Behaviour{
pub fn new(
mdns:libp2p::mdns::tokio::Behaviour,
ping:libp2p::ping::Behaviour,
)->Self{
Self{
mdns,
ping,
}
}
}
#[tokio::main]
async fn main(
)->Result<(),Box<dyn Error>>{
let mut swarm=libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
libp2p::tcp::Config::default(),
libp2p::noise::Config::new,
||libp2p::yamux::Config::default(),
)?
.with_behaviour(
|keypair|{
Behaviour::new(
libp2p::mdns::tokio::Behaviour::new(
libp2p::mdns::Config::default(),
keypair.public().into(),
).unwrap(),
libp2p::ping::Behaviour::new(
libp2p::ping::Config::new()
.with_timeout(Duration::from_secs(5))
.with_interval(Duration::from_secs(1)),
),
)
}
)?
.with_swarm_config(|config|config.with_idle_connection_timeout(Duration::from_secs(5)))
.build();
println!("peer id: {}",swarm.local_peer_id().to_string());
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
loop{
let ev=swarm
.select_next_some()
.await;
println!("{:#?}",ev);
if let libp2p::swarm::SwarmEvent::Behaviour(BehaviourEvent::Mdns(libp2p::mdns::Event::Discovered(e)))=ev{
for peer in e{
swarm.dial(peer.1)?;
}
}
}
}
やってることとしてはまず、NetworkBehaviourを2つ使うのでderive macroで1つにくっつけてます。
ここでderive
を使うためにCargo.toml
でlibp2p
のmacros
を使っています
#[derive(libp2p::swarm::NetworkBehaviour)]
struct Behaviour{
mdns:libp2p::mdns::tokio::Behaviour,
ping:libp2p::ping::Behaviour,
}
タイムアウト5秒の1秒周期でPingして
libp2p::ping::Behaviour::new(
libp2p::ping::Config::new()
.with_timeout(Duration::from_secs(5))
.with_interval(Duration::from_secs(1)),
),
なにもしてないとすぐ切断されるのでそのタイムアウトを5秒に伸ばします。
.with_swarm_config(|config|config.with_idle_connection_timeout(Duration::from_secs(5)))
ピアを見つけたらとにかく接続です。
if let libp2p::swarm::SwarmEvent::Behaviour(BehaviourEvent::Mdns(libp2p::mdns::Event::Discovered(e)))=ev{
for peer in e{
let _=swarm.dial(peer.1);
}
}
結果
cargo run
します。今回もやはり無限ループするので2つ動かします。
2つ動かしたものはそれぞれこんなことになっています:
peer id: 12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/127.0.0.1/tcp/38257",
}
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/10.0.2.15/tcp/38257",
}
Behaviour(
BehaviourEvent: Discovered([(PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), "/ip4/10.0.2.15/tcp/37901/p2p/12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a")]),
)
IncomingConnection {
connection_id: ConnectionId(
2,
),
local_addr: "/ip4/10.0.2.15/tcp/38257",
send_back_addr: "/ip4/10.0.2.15/tcp/41440",
}
ConnectionEstablished {
peer_id: PeerId(
"12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
),
connection_id: ConnectionId(
1,
),
endpoint: Dialer {
address: "/ip4/10.0.2.15/tcp/37901/p2p/12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
role_override: Dialer,
},
num_established: 1,
concurrent_dial_errors: Some(
[],
),
established_in: 12.930926ms,
}
ConnectionEstablished {
peer_id: PeerId(
"12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
),
connection_id: ConnectionId(
2,
),
endpoint: Listener {
local_addr: "/ip4/10.0.2.15/tcp/38257",
send_back_addr: "/ip4/10.0.2.15/tcp/41440",
},
num_established: 2,
concurrent_dial_errors: None,
established_in: 41.065112ms,
}
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(1), result: Ok(44.038478ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(44.330714ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(1), result: Ok(918.144µs) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(1.02102ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(1), result: Ok(851.047µs) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(1.389371ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(1), result: Ok(1.024088ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(900.088µs) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(1), result: Ok(506.884µs) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(1.833147ms) },
)
ConnectionClosed {
peer_id: PeerId(
"12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
),
connection_id: ConnectionId(
2,
),
endpoint: Listener {
local_addr: "/ip4/10.0.2.15/tcp/38257",
send_back_addr: "/ip4/10.0.2.15/tcp/41440",
},
num_established: 1,
cause: Some(
IO(
Custom {
kind: Other,
error: Error(
Right(
Closed,
),
),
},
),
),
}
ConnectionClosed {
peer_id: PeerId(
"12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
),
connection_id: ConnectionId(
1,
),
endpoint: Dialer {
address: "/ip4/10.0.2.15/tcp/37901/p2p/12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a",
role_override: Dialer,
},
num_established: 0,
cause: Some(
KeepAliveTimeout,
),
}
peer id: 12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/127.0.0.1/tcp/37901",
}
NewListenAddr {
listener_id: ListenerId(
1,
),
address: "/ip4/10.0.2.15/tcp/37901",
}
Behaviour(
BehaviourEvent: Discovered([(PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), "/ip4/10.0.2.15/tcp/38257/p2p/12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy")]),
)
IncomingConnection {
connection_id: ConnectionId(
2,
),
local_addr: "/ip4/10.0.2.15/tcp/37901",
send_back_addr: "/ip4/10.0.2.15/tcp/47314",
}
ConnectionEstablished {
peer_id: PeerId(
"12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
),
connection_id: ConnectionId(
1,
),
endpoint: Dialer {
address: "/ip4/10.0.2.15/tcp/38257/p2p/12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
role_override: Dialer,
},
num_established: 1,
concurrent_dial_errors: Some(
[],
),
established_in: 6.793408ms,
}
ConnectionEstablished {
peer_id: PeerId(
"12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
),
connection_id: ConnectionId(
2,
),
endpoint: Listener {
local_addr: "/ip4/10.0.2.15/tcp/37901",
send_back_addr: "/ip4/10.0.2.15/tcp/47314",
},
num_established: 2,
concurrent_dial_errors: None,
established_in: 51.313413ms,
}
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(1), result: Ok(44.250221ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(2), result: Ok(44.474519ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(1), result: Ok(1.071157ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(2), result: Ok(1.016736ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(1), result: Ok(1.020467ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(2), result: Ok(1.291714ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(1), result: Ok(1.086732ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(2), result: Ok(895.066µs) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(1), result: Ok(1.078993ms) },
)
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy"), connection: ConnectionId(2), result: Ok(903.222µs) },
)
ConnectionClosed {
peer_id: PeerId(
"12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
),
connection_id: ConnectionId(
1,
),
endpoint: Dialer {
address: "/ip4/10.0.2.15/tcp/38257/p2p/12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
role_override: Dialer,
},
num_established: 1,
cause: Some(
KeepAliveTimeout,
),
}
ConnectionClosed {
peer_id: PeerId(
"12D3KooWHbnn6as4CnjqkWdUeAeTS9BNPcufQfqxm7dtctFw5WWy",
),
connection_id: ConnectionId(
2,
),
endpoint: Listener {
local_addr: "/ip4/10.0.2.15/tcp/37901",
send_back_addr: "/ip4/10.0.2.15/tcp/47314",
},
num_established: 0,
cause: Some(
IO(
Custom {
kind: Other,
error: Error(
Right(
Closed,
),
),
},
),
),
}
Pingのイベントが生じているようです。
Behaviour(
BehaviourEvent: Event { peer: PeerId("12D3KooWEQDUXZAA2QUSV4YLRCfjhHzf1kGPinxExnGDTtEy2Q9a"), connection: ConnectionId(2), result: Ok(1.02102ms) },
)
↑こういうのがいっぱい出てます。そしてOkとあります。
やったね✨🎉🥳