memberlist とは
go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist
この記事の目的
memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていて、前回の記事の続きになります
今回は memberlist で提供されている Delegate.NodeMeta() を使って、ノードに含まれるメタデータ管理についてまとめて行きたいなと思っています
なお今回の記事で扱っているものは github にも上がっています
https://github.com/octu0/example-memberlist/tree/master/03-metadata
Delegate について
前回の記事では EventDelegate のインタフェースを使って、ノードイベントを受け取る例となっているのですが、今回は Delegate インタフェースを使ってノード固有の値を管理・共有する例を紹介していきます
さっそく実装していきます
さきに、メタデータを管理する構造体として MyMetaData を用意します
これは、AWS EC2 のインスタンスをイメージしています
Region/AZとノード固有の重み付けのようなものを管理したいと思います
import "encoding/json"
type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
} 
Bytes() の結果を JSON に変換していますが、
これは次に出てくる Delegate で扱えるメタデータが []byte で管理するため、今回は見慣れているであろう JSON にしています
Gob を使ったり任意のデータ構造を使っても構わないです
次に Delegate を実装していきます
type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}
Delegate から NodeMeta が呼び出された時に 先ほどの MyMetaData の json バイナリを返すように実装しました
それでは、実行できるように
func main() {
  m := MyMetaData{
    Region: "ap-northeast-1",
    Zone: "1a",
    Status: "initial",
    Weight: 0,
  }
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Delegate = &MyDelegate{
    meta: m,
  }
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
  log.Printf("node1 meta = %s", string(local.Meta))
  wait_signal()
}
wait_signal は以前作ったものにしています
また起動時に Node.Meta を使ってメタデータの表示をしています
$ go run foo.go 
2019/02/03 21:15:10 node1 at 172.16.0.96:7946
2019/02/03 21:15:10 node1 meta = {"region":"ap-northeast-1","zone":"1a","status":"initial","weight":0}
では、この情報を他のノードで取得してみましょう
package main
import(
  "log"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947
  conf.AdvertisePort = conf.BindPort
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")
  type OtherNodeMeta struct {
    R string `json:"region"`
    Z string `json:"zone"`
    S string `json:"status"`
    W uint64 `json:"weight"`
  }
  for _, node := range list.Members() {
    // node2 はメタデータを設定してないので skip
    if len(node.Meta) < 1 {
      continue
    }
    meta := new(OtherNodeMeta)
    if err := json.Unmarshal(node.Meta, meta); err != nil {
      log.Fatal(err)
    }
    log.Printf("node name: %s", node.Name)
    log.Printf("- meta region:%s zone:%s status:%s weight:%d", meta.R, meta.Z, meta.S, meta.W)
  }
}
この状態で起動すると、クラスタ参加時に node の情報を受け取る事ができます
$ go run bar.go 
2019/02/03 21:30:02 cluster join to 172.16.0.96:7946
2019/02/03 21:30:02 join successful.
2019/02/03 21:30:02 node name: node1
2019/02/03 21:30:02 - meta region:ap-northeast-1 zone:1a status:initial weight:0
JSONのデータを使っているので Unmarshal して構造体に入れ直していますが、問題なくデータを取れたようです
この状態でも起動しているノードの情報を受け取れるのですが、動的にノードの情報を受け取りたいので EventDelegate と組み合わせて動的に metadata を利用しましょう
EventDelegate との組み合わせ
EventDelegate を使えばノード個別の更新イベントを受け取れるので、
3秒おきに status が変わり、5秒おきに weight が変わる(つまりセットアップ -> プロビジョン, 負荷状況の変化)的なことをやっているノードを擬似的に作ってみます
まずは、わかりやすいように状況を観察するノード
package main
 
import(
  "log"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)
type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}
type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
}
type MyEventDelegate struct {
  Notify chan *memberlist.Node
}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
  d.Notify <- node
}
func main() {
  events := new(MyEventDelegate)
  events.Notify = make(chan *memberlist.Node)
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Events = events
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
  for {
    select {
    case updates := <-events.Notify:
      meta := new(MyMetaData)
      if err := json.Unmarshal(updates.Meta, meta); err != nil {
        log.Fatal(err)
      }
      log.Printf("node %s updated meta{status:%s, weight:%d}", updates.Name, meta.Status, meta.Weight)
    }
  }
}
次に 自身の情報を更新する node を作りましょう
package main
import(
  "log"
  "time"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)
type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}
type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
}
func main() {
  delegate := new(MyDelegate)
  delegate.meta = MyMetaData{
    Region: "ap-northeast-1",
    Zone: "1c",
    Status: "initial",
    Weight: 0,
  }
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947
  conf.AdvertisePort = conf.BindPort
  conf.Delegate = delegate
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")
  statusTick := time.NewTicker(3 * time.Second)
  weightTick := time.NewTicker(5 * time.Second)
  for {
    select {
    case <-statusTick.C:
      switch delegate.meta.Status {
      case "initial":
        delegate.meta.Status = "running"
      case "running":
        delegate.meta.Status = "waiting"
      case "waiting":
        delegate.meta.Status = "initial"
      }
      timeout := 10 * time.Second
      list.UpdateNode(timeout)
    case <-weightTick.C:
      switch {
      case delegate.meta.Weight < 10:
        delegate.meta.Weight = 50
      case delegate.meta.Weight < 100:
        delegate.meta.Weight = 10
      }
      timeout := 10 * time.Second
      list.UpdateNode(timeout)
    }
  }
}
(なおMyMetaData は同じものを使っています、ここではわかりやすいようにどちらのコードにも登場してますが、中身は同じものです)
この状態で起動すると
# session1
$ go run foo.go
2019/02/03 23:25:54 node1 at 172.16.0.96:7946
# session2
$ go run bar.go 
2019/02/03 23:25:57 cluster join to 172.16.0.96:7946
2019/02/03 23:25:57 join successful.
:
ちゃんと UpdateNode イベントを受け取ってリアルタイムにノードのメタデータが更新されているのが取れました。(下記)
2019/02/03 23:26:00 node node2 updated meta{status:running, weight:0}
2019/02/03 23:26:02 node node2 updated meta{status:running, weight:50}
2019/02/03 23:26:03 node node2 updated meta{status:waiting, weight:50}
2019/02/03 23:26:06 node node2 updated meta{status:initial, weight:50}
2019/02/03 23:26:07 node node2 updated meta{status:initial, weight:10}
2019/02/03 23:26:09 node node2 updated meta{status:running, weight:10}
メタデータの応用
簡単なサンプルでしたがメタデータをリアルタイムに利用できることがわかりました(ちゃんと実装すれば双方向でメタデータを受け取れる)
これを利用することで単純なイベントデータ以外にもメタデータを利用して応用する幅が広がりそうです
例えば
- 
Zoneを利用した 冗長化設計
- 
Statusを使った 高可用設計
- 
Weightによる ロードバランシング
メタデータは自由に使える領域なのでZone/Status/Weight以外にも固有のデータを使ったクラスタリングができそうです。
今回紹介できなかった機能があるので、別途また書きたいなと思ってます
