libp2p という P2P 用のネットワークライブラリがあります。
Go言語用のライブラリ go-libp2p を使ってピア間のメッセージ通信を試してみました。
はじめに
今回は、libp2p のノードを 2種類(とりあえず server と client にしました1)実行し、以下のようなメッセージ通信を libp2p のストリームで実施します。
client が now\n
というメッセージを送信すると server から現在時刻を返します。
client はこれを 3秒毎に 5回繰り返して終了する事にします。
改行(\n
)をメッセージの区切り文字とします。
(a) アドレス指定の接続
まずは、接続先をアドレス指定して client から server へ接続してみます。
libp2p.New()
でノードが起動しますが、デフォルトでは複数のプロトコル構成 2 で起動するようなので、ここでは ip4 の tcp に限定しました。
この場合は client から /ip4/<ip address>/tcp/<port>/p2p/<peer id>
というアドレスで server へ接続する事になります。
リッスンするポート番号を 0
にした場合は空いているポート番号が使われます。
server 実装
server はこのようになりました。
package main
import (
...省略
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
)
func main() {
node, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
log.Fatal(err)
}
defer node.Close()
node.SetStreamHandler("/clock", func(s network.Stream) {
defer s.Close()
log.Println("called stream handler")
buf := bufio.NewReader(s)
for {
// ストリームからの読み取り
msg, err := buf.ReadString('\n')
if err != nil {
log.Printf("failed read stream: %v", err)
s.Reset() // ストリームのリセット
break
}
t := ""
if msg == "now\n" {
t = time.Now().Format(time.RFC3339)
}
// ストリームへの書き込み
s.Write([]byte(t + "\n"))
}
})
addr := fmt.Sprintf("%s/p2p/%s", node.Addrs()[0], node.ID())
// 接続用のアドレスを出力
log.Printf("started: peer=%s", addr)
// ctrl + c 等の実施まで待機
awaitTerminate()
}
func awaitTerminate() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
log.Print("stop")
cancel()
}()
<-ctx.Done()
}
SetStreamHandler
でストリームのハンドラ関数を設定します。(ストリームのプロトコルIDは /clock
としました)
改行(\n
)を区切りにストリームからメッセージを読み取って現在時刻を書き込んでいます。
client と複数回のやりとりを行うためエラーになるまでループ処理するようにしました。
ついでに、エラーが発生した場合はストリームを Reset するようにしています。3
そのままだとプロセスが終了してしまうので、context を使って終了しないようにしています。
また、ここでは実施していませんが、github.com/ipfs/go-log/v2
を使ってログレベルを変更すると go-libp2p の詳細なログを出力する事も可能でした。
client 実装
client はこのようになりました。
package main
import (
...省略
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
)
func main() {
targetPeer := os.Args[1]
node, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
log.Fatal(err)
}
defer node.Close()
info, err := peer.AddrInfoFromString(targetPeer)
if err != nil {
log.Fatal(err)
}
// server への接続
node.Connect(context.Background(), *info)
// ストリームのオープン
s, err := node.NewStream(context.Background(), info.ID, "/clock")
if err != nil {
log.Fatal(err)
}
defer s.Close()
buf := bufio.NewReader(s)
for _ = range 5 {
_, err = s.Write([]byte("now\n"))
if err != nil {
log.Fatal(err)
}
res, _ := buf.ReadString('\n')
log.Printf("now=%s", res)
time.Sleep(3 * time.Second)
}
}
server の実行時に出力したアドレスをコマンドライン引数で渡すようにしています。
このアドレスを peer.AddrInfoFromString
で処理すると peer.AddrInfo
を取得でき、これを Connect
する事で接続できます。4
NewStream
で server のピアIDとSetStreamHandler
時に指定したプロトコルID(/clock
)を指定する事でストリームをオープンできます。
動作確認
server を実行した状態で client を実行した結果はこのようになりました。
server 実行結果
$ cd sample1/server
$ go run main.go
2024/09/28 18:13:31 started: peer=/ip4/127.0.0.1/tcp/58385/p2p/61A0CbiWKN3Kbq1FJB8spoZ4zJi6MNpmAMschHCgDkMmkxE2zk7x
2024/09/28 18:13:51 called stream handler
2024/09/28 18:14:06 failed read stream: EOF
client 実行結果
$ cd sample1/client
$ go run main.go /ip4/127.0.0.1/tcp/58385/p2p/61A0CbiWKN3Kbq1FJB8spoZ4zJi6MNpmAMschHCgDkMmkxE2zk7x
2024/09/28 18:13:51 now=2024-09-28T18:13:51+09:00
2024/09/28 18:13:54 now=2024-09-28T18:13:54+09:00
2024/09/28 18:13:57 now=2024-09-28T18:13:57+09:00
2024/09/28 18:14:00 now=2024-09-28T18:14:00+09:00
2024/09/28 18:14:03 now=2024-09-28T18:14:03+09:00
(b) mDNSによる検出
次は、mDNS(マルチキャストDNS)で接続先を自動検出するようにしてみます。
mDNS は NewMdnsService
で作成し、Start
で適用できるようです。
NewMdnsService にはサービス名と検出した際のハンドラメソッド(HandlePeerFound
)を定義した構造体を渡せば良さそうです。
サービス名は mDNS による検出グループのようなもので、同一のサービス名を使っているものが検出対象となるようです。
server 実装
serverは検出した AddrInfo を処理する必要が特にないので、ここではログ出力するだけにしました。
package main
import (
...省略
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)
type notifee struct{}
// mDNS 検出時のハンドリング
func (n *notifee) HandlePeerFound(info peer.AddrInfo) {
log.Printf("peer found: id=%s, addrs=%v", info.ID, info.Addrs)
}
func main() {
serviceName := "sample"
node, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
log.Fatal(err)
}
defer node.Close()
node.SetStreamHandler("/clock", func(s network.Stream) {
...省略
})
// mDNS 適用
ds := mdns.NewMdnsService(node, serviceName, ¬ifee{})
if err = ds.Start(); err != nil {
log.Fatal(err)
}
log.Printf("started: id=%s", node.ID())
awaitTerminate()
}
...省略
client 実装
serverと同じサービス名を指定して NewMdnsService
します。
HandlePeerFound
メソッドへ渡ってきた AddrInfo を使って server へ Connect
で接続し、/clock
のストリームを開始するようにしました。
client を複数起動した際に他の client も検出してしまうので、ストリームを開始できた場合にのみチャネルを使って結果(ストリーム)を返すようにしました。
package main
import (
...省略
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)
type notifee struct {
Node host.Host
StreamChan chan network.Stream
}
func (n *notifee) HandlePeerFound(info peer.AddrInfo) {
log.Printf("peer found: id=%s, addrs=%v", info.ID, info.Addrs)
if err := n.Node.Connect(context.Background(), info); err != nil {
log.Printf("failed connect: %v", err)
return
}
s, err := n.Node.NewStream(context.Background(), info.ID, "/clock")
if err != nil {
log.Printf("failed new stream: %v", err)
return
}
n.StreamChan <- s
}
func main() {
serviceName := "sample"
node, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
log.Fatal(err)
}
defer node.Close()
n := notifee{node, make(chan network.Stream)}
// mDNS 適用
ds := mdns.NewMdnsService(node, serviceName, &n)
if err = ds.Start(); err != nil {
log.Fatal(err)
}
s := <-n.StreamChan
defer s.Close()
buf := bufio.NewReader(s)
for _ = range 5 {
_, err = s.Write([]byte("now\n"))
...省略
}
}
動作確認
server と client を 2つ実行した場合の結果は次のようになりました。
server 実行結果
まずは server の実行結果です。
$ cd sample2/server
$ go run main.go
2024/09/28 21:15:26 started: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF
2024/09/28 21:15:29 peer found: id=61A0CbiWGwN7UZ5Yb1yTiuSmDz8S11nxY7AyjVVW8EKxPuLML5qX, addrs=[/ip4/127.0.0.1/tcp/58622]
2024/09/28 21:15:29 called stream handler
2024/09/28 21:15:34 peer found: id=61A0CbiWGSNgxiXmWoE1P13Qwz4Y3VevgxitmJGCKAZHNc84Y3zG, addrs=[/ip4/127.0.0.1/tcp/58625]
2024/09/28 21:15:34 called stream handler
2024/09/28 21:15:44 failed read stream: EOF
2024/09/28 21:15:49 failed read stream: EOF
client 実行結果
1つ目の client の実行結果です。
$ cd sample2/client
$ go run main.go
2024/09/28 21:15:29 peer found: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF, addrs=[/ip4/127.0.0.1/tcp/58619]
2024/09/28 21:15:29 now=2024-09-28T21:15:29+09:00
2024/09/28 21:15:32 now=2024-09-28T21:15:32+09:00
2024/09/28 21:15:34 peer found: id=61A0CbiWGSNgxiXmWoE1P13Qwz4Y3VevgxitmJGCKAZHNc84Y3zG, addrs=[/ip4/127.0.0.1/tcp/58625]
2024/09/28 21:15:34 failed new stream: failed to negotiate protocol: protocols not supported: [/clock]
2024/09/28 21:15:35 now=2024-09-28T21:15:35+09:00
2024/09/28 21:15:38 now=2024-09-28T21:15:38+09:00
2024/09/28 21:15:41 now=2024-09-28T21:15:41+09:00
2つ目の client の実行結果です。
$ cd sample2/client
$ go run main.go
2024/09/28 21:15:34 peer found: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF, addrs=[/ip4/127.0.0.1/tcp/58619]
2024/09/28 21:15:34 peer found: id=61A0CbiWGwN7UZ5Yb1yTiuSmDz8S11nxY7AyjVVW8EKxPuLML5qX, addrs=[/ip4/127.0.0.1/tcp/58622]
2024/09/28 21:15:34 now=2024-09-28T21:15:34+09:00
2024/09/28 21:15:34 failed new stream: failed to negotiate protocol: protocols not supported: [/clock]
2024/09/28 21:15:37 now=2024-09-28T21:15:37+09:00
2024/09/28 21:15:40 now=2024-09-28T21:15:40+09:00
2024/09/28 21:15:43 now=2024-09-28T21:15:43+09:00
2024/09/28 21:15:46 now=2024-09-28T21:15:46+09:00
-
P2Pに適さない表現かもしれませんが ↩
-
ip4とip6でそれぞれtcpとudp(quic や webrtc)の構成で起動しました ↩
-
mplexのドキュメント によると、Reset はエラー時にストリームを即時クローズするために使うようでした ↩
-
Connectで接続しなくても、Peerstoreへ追加する事で接続できるようにする方法もある ↩