LoginSignup
0

More than 5 years have passed since last update.

memberlist を使ってクラスタリング - ノードイベントを受け取る

Last updated at Posted at 2019-02-17

memberlist とは

go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist

この記事の目的

memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていて、前回の記事の続きになります

今回は memberlist で提供される EventDelegate を使ってノード参加イベントをリアルタイムにハンドリングしていく方法をまとめておきたいなと思います

なおこの記事で扱っているものは github にも上げています
https://github.com/octu0/example-memberlist/tree/master/02-channel-event

EventDelegate について

EventDelegate は ノードがクラスタに参加/離脱のイベント、またノード情報の更新イベントを受け取る事ができるインタフェースです
このイベントを受け取ることで、動的にノード情報を受け取ることができます

早速 MyEventDelegate として実装していきましょう

package main

import(
  "log"
  "github.com/hashicorp/memberlist"
)

type MyEventDelegate struct {}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
  log.Printf("notify join %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
  log.Printf("notify leave %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
  log.Printf("notify update %s(%s:%d)", node.Name, node.Addr.To4().String(), node.Port)
}

func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Events = new(MyEventDelegate)

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)

  wait_signal()
}

wait_signal は前回作ったものを使って待機させています

$ go run foo.go 
2019/01/27 19:54:29 notify join node1(172.16.0.96:7946)
2019/01/27 19:54:29 node1 at 172.16.0.96:7946

さて、この状態で、下記のようなノードを作成し各種イベントが届くようにしましょう

package main

import(
  "log"
  "time"
  "github.com/hashicorp/memberlist"
)

func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947
  conf.AdvertisePort = conf.BindPort

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")

  time.Sleep(3 * time.Second)

  log.Printf("update.")
  // 今回は空打ち、別途metaデータについて書きます
  list.UpdateNode(1 * time.Second)

  time.Sleep(3 * time.Second)
  log.Printf("leave start")
  if err := list.Leave(1 * time.Second); err != nil {
    log.Fatal(err)
  }
  log.Printf("leave successful.")

  time.Sleep(1 * time.Second)
  log.Printf("bye.")
}

では、この状態で起動させてみます
数秒おきにイベントが発行されるようになっています

$ go run bar.go 
2019/01/27 20:02:15 cluster join to 172.16.0.96:7946
2019/01/27 20:02:15 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/27 20:02:15 join successful.
2019/01/27 20:02:18 update.
2019/01/27 20:02:21 leave start
2019/01/27 20:02:21 leave successful.
2019/01/27 20:02:22 bye.

このイベントは foo.go 側では下記のように出力されていると思います

$ go run foo.go 
:
2019/01/27 20:02:15 [DEBUG] memberlist: Stream connection from=172.16.0.96:54943
2019/01/27 20:02:15 notify join node2(172.16.0.96:7947) # ←ここと
2019/01/27 20:02:21 notify leave node2(172.16.0.96:7947) # ←ここ

正しくイベントが取れている事がわかりました
ただ、 UpdateNode のイベントは届いていないようです
UpdateNode イベントについては次回以降に書きたいなと思います

Consistent Hashing への応用

さて、ここで動的にノードの情報を取得することができるようになったため、 このノード情報を使って Consistent Hashing のハッシュテーブルに使ってみます

今回は Consistent Hashing を作るために serialx/hashring を使って実装します
(ちなみに、 consistent hashing の実装は他にも沢山ありますので色々みてみると良さそうです)

package main

import(
  "log"
  "fmt"
  "github.com/hashicorp/memberlist"
  "github.com/serialx/hashring"
)

type MyEventDelegate struct {
  consistent *hashring.HashRing
}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
  hostPort := fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)
  log.Printf("notify join %s(%s)", node.Name, hostPort)

  if d.consistent != nil {
    d.consistent = d.consistent.AddNode(hostPort)
  } else {
    d.consistent = hashring.New([]string{hostPort})
  }

  d.checkSearchKey()
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
  hostPort := fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)
  log.Printf("notify leave %s(%s)", node.Name, hostPort)

  if d.consistent != nil {
    d.consistent = d.consistent.RemoveNode(hostPort)
  }

  d.checkSearchKey()
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
  // nop
}

//
// ノードの増減でキーの結果が変化するか確認
//
func (d *MyEventDelegate) checkSearchKey() {
  searchKeys := []string{"hello", "world"}

  for _, key := range searchKeys {
    node, ok := d.consistent.GetNode(key)
    if ok != true {
      log.Printf("key %s notfound", key)
      continue
    }
    log.Printf("%s ==> %s", key, node)
  }
}

func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Events = new(MyEventDelegate)

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)

  wait_signal()
}

この実装ではノードが増減するたびに searchKey として "hello""world" の文字列を検索させています

それではいくつかノードを参加させて、 searchKey の結果がどのように変化するかみてみましょう

$ go run bar.go 
2019/01/27 20:22:03 cluster join to 172.16.0.96:7946
2019/01/27 20:22:03 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/27 20:22:03 join successful.
2019/01/27 20:22:06 update.
2019/01/27 20:22:09 leave start
2019/01/27 20:22:10 leave successful.
2019/01/27 20:22:11 bye.

このときの searchKey の推移です

$ go run foo.go 
:
# node1 しかいなかったとき(node1だけがハッシュテーブルにいる)
2019/01/27 20:21:57 notify join node1(172.16.0.96:7946)
2019/01/27 20:21:57 hello ==> 172.16.0.96:7946
2019/01/27 20:21:57 world ==> 172.16.0.96:7946
:
# node2 が参加したとき(node1,node2がハッシュテーブルにいる)
2019/01/27 20:22:03 notify join node2(172.16.0.96:7947)
2019/01/27 20:22:03 hello ==> 172.16.0.96:7946
2019/01/27 20:22:03 world ==> 172.16.0.96:7947
:
# node2 が離脱したとき(node1だけがハッシュテーブルにいる)
2019/01/27 20:22:09 notify leave node2(172.16.0.96:7947)
2019/01/27 20:22:09 hello ==> 172.16.0.96:7946
2019/01/27 20:22:09 world ==> 172.16.0.96:7946

"hello" のキーはノード増減しても常に node1 を参照することができているようです

今回は 2台のノードで行っていますが、 複数台で試してみるといいかもしれません
また、分かりやすさのために、consistent hashingを片方のnodeで行っていますが、複数のnodeで同じ値を持つように実装することもできます(今回は省略してます)

EventDelegateの応用

memberlist のノードイベントを取得できるようになったため、リアルタイムにノード情報を利用できるようになりました
今回は Consistent Hashing を利用しましたが、他にも応用ができる事が増えそうです

例えば
- 分散キャッシュ(memcachedのような) またはそのための補助として
- LoadBalancer(stickeyだったりroundrobinだったり)
- サービス・ディスカバリ
- 自動構築プログラムとの連携

今回紹介できなかった機能があるので、別途また書きたいなと思ってます

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0