memberlist とは
go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist
この記事の目的
memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていて、前回の記事の続きになります
今回は memberlist で提供される EventDelegate を使ってノード参加イベントをリアルタイムにハンドリングしていく方法をまとめておきたいなと思います
なおこの記事で扱っているものは github にも上げています
https://github.com/octu0/example-memberlist/tree/master/02-channel-event
EventDelegate について
EventDelegate は ノードがクラスタに参加/離脱のイベント、またノード情報の更新イベントを受け取る事ができるインタフェースです
このイベントを受け取ることで、動的にノード情報を受け取ることができます
早速 MyEventDelegate
として実装していきましょう
package main
import(
"log"
"github.com/hashicorp/memberlist"
)
type MyEventDelegate struct {}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
log.Printf("notify join %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
log.Printf("notify leave %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
log.Printf("notify update %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}
func main() {
conf := memberlist.DefaultLocalConfig()
conf.Name = "node1"
conf.Events = new(MyEventDelegate)
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
local := list.LocalNode()
log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
wait_signal()
}
wait_signal は前回作ったものを使って待機させています
$ go run foo.go
2019/01/27 19:54:29 notify join node1(172.16.0.96:7946)
2019/01/27 19:54:29 node1 at 172.16.0.96:7946
さて、この状態で、下記のようなノードを作成し各種イベントが届くようにしましょう
package main
import(
"log"
"time"
"github.com/hashicorp/memberlist"
)
func main() {
conf := memberlist.DefaultLocalConfig()
conf.Name = "node2"
conf.BindPort = 7947
conf.AdvertisePort = conf.BindPort
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
joinAddr := "172.16.0.96:7946"
log.Printf("cluster join to %s", joinAddr)
if _, err := list.Join([]string{joinAddr}); err != nil {
log.Fatal(err)
}
log.Printf("join successful.")
time.Sleep(3 * time.Second)
log.Printf("update.")
// 今回は空打ち、別途metaデータについて書きます
list.UpdateNode(1 * time.Second)
time.Sleep(3 * time.Second)
log.Printf("leave start")
if err := list.Leave(1 * time.Second); err != nil {
log.Fatal(err)
}
log.Printf("leave successful.")
time.Sleep(1 * time.Second)
log.Printf("bye.")
}
では、この状態で起動させてみます
数秒おきにイベントが発行されるようになっています
$ go run bar.go
2019/01/27 20:02:15 cluster join to 172.16.0.96:7946
2019/01/27 20:02:15 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/27 20:02:15 join successful.
2019/01/27 20:02:18 update.
2019/01/27 20:02:21 leave start
2019/01/27 20:02:21 leave successful.
2019/01/27 20:02:22 bye.
このイベントは foo.go
側では下記のように出力されていると思います
$ go run foo.go
:
2019/01/27 20:02:15 [DEBUG] memberlist: Stream connection from=172.16.0.96:54943
2019/01/27 20:02:15 notify join node2(172.16.0.96:7947) # ←ここと
2019/01/27 20:02:21 notify leave node2(172.16.0.96:7947) # ←ここ
正しくイベントが取れている事がわかりました
ただ、 UpdateNode のイベントは届いていないようです
UpdateNode イベントについては次回以降に書きたいなと思います
Consistent Hashing への応用
さて、ここで動的にノードの情報を取得することができるようになったため、 このノード情報を使って Consistent Hashing のハッシュテーブルに使ってみます
今回は Consistent Hashing を作るために serialx/hashring を使って実装します
(ちなみに、 consistent hashing の実装は他にも沢山ありますので色々みてみると良さそうです)
package main
import(
"log"
"fmt"
"github.com/hashicorp/memberlist"
"github.com/serialx/hashring"
)
type MyEventDelegate struct {
consistent *hashring.HashRing
}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
hostPort := fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)
log.Printf("notify join %s(%s)", node.Name, hostPort)
if d.consistent != nil {
d.consistent = d.consistent.AddNode(hostPort)
} else {
d.consistent = hashring.New([]string{hostPort})
}
d.checkSearchKey()
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
hostPort := fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)
log.Printf("notify leave %s(%s)", node.Name, hostPort)
if d.consistent != nil {
d.consistent = d.consistent.RemoveNode(hostPort)
}
d.checkSearchKey()
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
// nop
}
//
// ノードの増減でキーの結果が変化するか確認
//
func (d *MyEventDelegate) checkSearchKey() {
searchKeys := []string{"hello", "world"}
for _, key := range searchKeys {
node, ok := d.consistent.GetNode(key)
if ok != true {
log.Printf("key %s notfound", key)
continue
}
log.Printf("%s ==> %s", key, node)
}
}
func main() {
conf := memberlist.DefaultLocalConfig()
conf.Name = "node1"
conf.Events = new(MyEventDelegate)
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
local := list.LocalNode()
log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
wait_signal()
}
この実装ではノードが増減するたびに searchKey
として "hello"
と "world"
の文字列を検索させています
それではいくつかノードを参加させて、 searchKey の結果がどのように変化するかみてみましょう
$ go run bar.go
2019/01/27 20:22:03 cluster join to 172.16.0.96:7946
2019/01/27 20:22:03 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/27 20:22:03 join successful.
2019/01/27 20:22:06 update.
2019/01/27 20:22:09 leave start
2019/01/27 20:22:10 leave successful.
2019/01/27 20:22:11 bye.
このときの searchKey の推移です
$ go run foo.go
:
# node1 しかいなかったとき(node1だけがハッシュテーブルにいる)
2019/01/27 20:21:57 notify join node1(172.16.0.96:7946)
2019/01/27 20:21:57 hello ==> 172.16.0.96:7946
2019/01/27 20:21:57 world ==> 172.16.0.96:7946
:
# node2 が参加したとき(node1,node2がハッシュテーブルにいる)
2019/01/27 20:22:03 notify join node2(172.16.0.96:7947)
2019/01/27 20:22:03 hello ==> 172.16.0.96:7946
2019/01/27 20:22:03 world ==> 172.16.0.96:7947
:
# node2 が離脱したとき(node1だけがハッシュテーブルにいる)
2019/01/27 20:22:09 notify leave node2(172.16.0.96:7947)
2019/01/27 20:22:09 hello ==> 172.16.0.96:7946
2019/01/27 20:22:09 world ==> 172.16.0.96:7946
"hello"
のキーはノード増減しても常に node1 を参照することができているようです
今回は 2台のノードで行っていますが、 複数台で試してみるといいかもしれません
また、分かりやすさのために、consistent hashingを片方のnodeで行っていますが、複数のnodeで同じ値を持つように実装することもできます(今回は省略してます)
EventDelegateの応用
memberlist のノードイベントを取得できるようになったため、リアルタイムにノード情報を利用できるようになりました
今回は Consistent Hashing を利用しましたが、他にも応用ができる事が増えそうです
例えば
- 分散キャッシュ(memcachedのような) またはそのための補助として
- LoadBalancer(stickeyだったりroundrobinだったり)
- サービス・ディスカバリ
- 自動構築プログラムとの連携
今回紹介できなかった機能があるので、別途また書きたいなと思ってます