0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

libp2pでピア間をストリーム通信

Posted at

libp2p という P2P 用のネットワークライブラリがあります。
Go言語用のライブラリ go-libp2p を使ってピア間のメッセージ通信を試してみました。

はじめに

今回は、libp2p のノードを 2種類(とりあえず server と client にしました1)実行し、以下のようなメッセージ通信を libp2p のストリームで実施します。

client が now\n というメッセージを送信すると server から現在時刻を返します。
client はこれを 3秒毎に 5回繰り返して終了する事にします。

改行(\n)をメッセージの区切り文字とします。

(a) アドレス指定の接続

まずは、接続先をアドレス指定して client から server へ接続してみます。

libp2p.New() でノードが起動しますが、デフォルトでは複数のプロトコル構成 2 で起動するようなので、ここでは ip4 の tcp に限定しました。

この場合は client から /ip4/<ip address>/tcp/<port>/p2p/<peer id> というアドレスで server へ接続する事になります。

リッスンするポート番号を 0 にした場合は空いているポート番号が使われます。

server 実装

server はこのようになりました。

sample1/server/main.go
package main

import (
    ...省略
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/network"
)

func main() {
	node, err := libp2p.New(
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)

	if err != nil {
		log.Fatal(err)
	}
	defer node.Close()

	node.SetStreamHandler("/clock", func(s network.Stream) {
		defer s.Close()

		log.Println("called stream handler")

		buf := bufio.NewReader(s)

		for {
            // ストリームからの読み取り
			msg, err := buf.ReadString('\n')

			if err != nil {
				log.Printf("failed read stream: %v", err)
				s.Reset() // ストリームのリセット
				break
			}

			t := ""

			if msg == "now\n" {
				t = time.Now().Format(time.RFC3339)
			}
            // ストリームへの書き込み
			s.Write([]byte(t + "\n"))
		}
	})

	addr := fmt.Sprintf("%s/p2p/%s", node.Addrs()[0], node.ID())
    // 接続用のアドレスを出力
	log.Printf("started: peer=%s", addr)

    // ctrl + c 等の実施まで待機
	awaitTerminate()
}

func awaitTerminate() {
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt)

		<-sig

		log.Print("stop")

		cancel()
	}()

	<-ctx.Done()
}

SetStreamHandler でストリームのハンドラ関数を設定します。(ストリームのプロトコルIDは /clock としました)

改行(\n)を区切りにストリームからメッセージを読み取って現在時刻を書き込んでいます。
client と複数回のやりとりを行うためエラーになるまでループ処理するようにしました。

ついでに、エラーが発生した場合はストリームを Reset するようにしています。3

そのままだとプロセスが終了してしまうので、context を使って終了しないようにしています。

また、ここでは実施していませんが、github.com/ipfs/go-log/v2 を使ってログレベルを変更すると go-libp2p の詳細なログを出力する事も可能でした。

client 実装

client はこのようになりました。

sample1/client/main.go
package main

import (
    ...省略
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/peer"
)

func main() {
	targetPeer := os.Args[1]

	node, err := libp2p.New(
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)

	if err != nil {
		log.Fatal(err)
	}
	defer node.Close()

	info, err := peer.AddrInfoFromString(targetPeer)

	if err != nil {
		log.Fatal(err)
	}
    // server への接続
	node.Connect(context.Background(), *info)
    // ストリームのオープン
	s, err := node.NewStream(context.Background(), info.ID, "/clock")

	if err != nil {
		log.Fatal(err)
	}
	defer s.Close()

	buf := bufio.NewReader(s)

	for _ = range 5 {
		_, err = s.Write([]byte("now\n"))

		if err != nil {
			log.Fatal(err)
		}

		res, _ := buf.ReadString('\n')

		log.Printf("now=%s", res)

		time.Sleep(3 * time.Second)
	}
}

server の実行時に出力したアドレスをコマンドライン引数で渡すようにしています。

このアドレスを peer.AddrInfoFromString で処理すると peer.AddrInfo を取得でき、これを Connect する事で接続できます。4

NewStream で server のピアIDとSetStreamHandler時に指定したプロトコルID(/clock)を指定する事でストリームをオープンできます。

動作確認

server を実行した状態で client を実行した結果はこのようになりました。

server 実行結果

server 実行
$ cd sample1/server
$ go run main.go
server 出力結果
2024/09/28 18:13:31 started: peer=/ip4/127.0.0.1/tcp/58385/p2p/61A0CbiWKN3Kbq1FJB8spoZ4zJi6MNpmAMschHCgDkMmkxE2zk7x
2024/09/28 18:13:51 called stream handler
2024/09/28 18:14:06 failed read stream: EOF

client 実行結果

client 実行
$ cd sample1/client
$ go run main.go /ip4/127.0.0.1/tcp/58385/p2p/61A0CbiWKN3Kbq1FJB8spoZ4zJi6MNpmAMschHCgDkMmkxE2zk7x
client 出力結果
2024/09/28 18:13:51 now=2024-09-28T18:13:51+09:00
2024/09/28 18:13:54 now=2024-09-28T18:13:54+09:00
2024/09/28 18:13:57 now=2024-09-28T18:13:57+09:00
2024/09/28 18:14:00 now=2024-09-28T18:14:00+09:00
2024/09/28 18:14:03 now=2024-09-28T18:14:03+09:00

(b) mDNSによる検出

次は、mDNS(マルチキャストDNS)で接続先を自動検出するようにしてみます。

mDNS は NewMdnsService で作成し、Start で適用できるようです。

NewMdnsService にはサービス名と検出した際のハンドラメソッド(HandlePeerFound)を定義した構造体を渡せば良さそうです。

サービス名は mDNS による検出グループのようなもので、同一のサービス名を使っているものが検出対象となるようです。

server 実装

serverは検出した AddrInfo を処理する必要が特にないので、ここではログ出力するだけにしました。

sample2/server/main.go
package main

import (
    ...省略
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/network"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)

type notifee struct{}
// mDNS 検出時のハンドリング
func (n *notifee) HandlePeerFound(info peer.AddrInfo) {
	log.Printf("peer found: id=%s, addrs=%v", info.ID, info.Addrs)
}

func main() {
	serviceName := "sample"

	node, err := libp2p.New(
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)

	if err != nil {
		log.Fatal(err)
	}
	defer node.Close()

	node.SetStreamHandler("/clock", func(s network.Stream) {
        ...省略
	})
    // mDNS 適用
	ds := mdns.NewMdnsService(node, serviceName, &notifee{})

	if err = ds.Start(); err != nil {
		log.Fatal(err)
	}

	log.Printf("started: id=%s", node.ID())

	awaitTerminate()
}
...省略

client 実装

serverと同じサービス名を指定して NewMdnsService します。

HandlePeerFound メソッドへ渡ってきた AddrInfo を使って server へ Connect で接続し、/clock のストリームを開始するようにしました。

client を複数起動した際に他の client も検出してしまうので、ストリームを開始できた場合にのみチャネルを使って結果(ストリーム)を返すようにしました。

sample2/client/main.go
package main

import (
    ...省略
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/host"
	"github.com/libp2p/go-libp2p/core/network"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)

type notifee struct {
	Node       host.Host
	StreamChan chan network.Stream
}

func (n *notifee) HandlePeerFound(info peer.AddrInfo) {
	log.Printf("peer found: id=%s, addrs=%v", info.ID, info.Addrs)

	if err := n.Node.Connect(context.Background(), info); err != nil {
		log.Printf("failed connect: %v", err)
		return
	}

	s, err := n.Node.NewStream(context.Background(), info.ID, "/clock")

	if err != nil {
		log.Printf("failed new stream: %v", err)
		return
	}

	n.StreamChan <- s
}

func main() {
	serviceName := "sample"

	node, err := libp2p.New(
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)

	if err != nil {
		log.Fatal(err)
	}
	defer node.Close()

	n := notifee{node, make(chan network.Stream)}
    // mDNS 適用
	ds := mdns.NewMdnsService(node, serviceName, &n)

	if err = ds.Start(); err != nil {
		log.Fatal(err)
	}

	s := <-n.StreamChan
	defer s.Close()

	buf := bufio.NewReader(s)

	for _ = range 5 {
		_, err = s.Write([]byte("now\n"))

        ...省略
	}
}

動作確認

server と client を 2つ実行した場合の結果は次のようになりました。

server 実行結果

まずは server の実行結果です。

server 実行
$ cd sample2/server
$ go run main.go
server 出力結果
2024/09/28 21:15:26 started: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF
2024/09/28 21:15:29 peer found: id=61A0CbiWGwN7UZ5Yb1yTiuSmDz8S11nxY7AyjVVW8EKxPuLML5qX, addrs=[/ip4/127.0.0.1/tcp/58622]
2024/09/28 21:15:29 called stream handler
2024/09/28 21:15:34 peer found: id=61A0CbiWGSNgxiXmWoE1P13Qwz4Y3VevgxitmJGCKAZHNc84Y3zG, addrs=[/ip4/127.0.0.1/tcp/58625]
2024/09/28 21:15:34 called stream handler
2024/09/28 21:15:44 failed read stream: EOF
2024/09/28 21:15:49 failed read stream: EOF

client 実行結果

1つ目の client の実行結果です。

client1 実行
$ cd sample2/client
$ go run main.go
client1 出力結果
2024/09/28 21:15:29 peer found: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF, addrs=[/ip4/127.0.0.1/tcp/58619]
2024/09/28 21:15:29 now=2024-09-28T21:15:29+09:00
2024/09/28 21:15:32 now=2024-09-28T21:15:32+09:00
2024/09/28 21:15:34 peer found: id=61A0CbiWGSNgxiXmWoE1P13Qwz4Y3VevgxitmJGCKAZHNc84Y3zG, addrs=[/ip4/127.0.0.1/tcp/58625]
2024/09/28 21:15:34 failed new stream: failed to negotiate protocol: protocols not supported: [/clock]
2024/09/28 21:15:35 now=2024-09-28T21:15:35+09:00
2024/09/28 21:15:38 now=2024-09-28T21:15:38+09:00
2024/09/28 21:15:41 now=2024-09-28T21:15:41+09:00

2つ目の client の実行結果です。

client2 実行
$ cd sample2/client
$ go run main.go
client2 出力結果
2024/09/28 21:15:34 peer found: id=61A0CbiWQhcFpo7b2N99T3QtVyXnLPeehPTjHuM8wMpBMtPz14JF, addrs=[/ip4/127.0.0.1/tcp/58619]
2024/09/28 21:15:34 peer found: id=61A0CbiWGwN7UZ5Yb1yTiuSmDz8S11nxY7AyjVVW8EKxPuLML5qX, addrs=[/ip4/127.0.0.1/tcp/58622]
2024/09/28 21:15:34 now=2024-09-28T21:15:34+09:00
2024/09/28 21:15:34 failed new stream: failed to negotiate protocol: protocols not supported: [/clock]
2024/09/28 21:15:37 now=2024-09-28T21:15:37+09:00
2024/09/28 21:15:40 now=2024-09-28T21:15:40+09:00
2024/09/28 21:15:43 now=2024-09-28T21:15:43+09:00
2024/09/28 21:15:46 now=2024-09-28T21:15:46+09:00
  1. P2Pに適さない表現かもしれませんが

  2. ip4とip6でそれぞれtcpとudp(quic や webrtc)の構成で起動しました

  3. mplexのドキュメント によると、Reset はエラー時にストリームを即時クローズするために使うようでした

  4. Connectで接続しなくても、Peerstoreへ追加する事で接続できるようにする方法もある

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?