1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

memberlist を使ってクラスタリング - Nodeメタ情報

Last updated at Posted at 2019-03-03

memberlist とは

go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist

この記事の目的

memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていて、前回の記事の続きになります

今回は memberlist で提供されている Delegate.NodeMeta() を使って、ノードに含まれるメタデータ管理についてまとめて行きたいなと思っています

なお今回の記事で扱っているものは github にも上がっています
https://github.com/octu0/example-memberlist/tree/master/03-metadata

Delegate について

前回の記事では EventDelegate のインタフェースを使って、ノードイベントを受け取る例となっているのですが、今回は Delegate インタフェースを使ってノード固有の値を管理・共有する例を紹介していきます

さっそく実装していきます

さきに、メタデータを管理する構造体として MyMetaData を用意します
これは、AWS EC2 のインスタンスをイメージしています
Region/AZとノード固有の重み付けのようなものを管理したいと思います

import "encoding/json"

type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
} 

Bytes() の結果を JSON に変換していますが、
これは次に出てくる Delegate で扱えるメタデータが []byte で管理するため、今回は見慣れているであろう JSON にしています
Gob を使ったり任意のデータ構造を使っても構わないです

次に Delegate を実装していきます

type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}

Delegate から NodeMeta が呼び出された時に 先ほどの MyMetaData の json バイナリを返すように実装しました

それでは、実行できるように

func main() {
  m := MyMetaData{
    Region: "ap-northeast-1",
    Zone: "1a",
    Status: "initial",
    Weight: 0,
  }

  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Delegate = &MyDelegate{
    meta: m,
  }

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)

  log.Printf("node1 meta = %s", string(local.Meta))

  wait_signal()
}

wait_signal は以前作ったものにしています
また起動時に Node.Meta を使ってメタデータの表示をしています

$ go run foo.go 
2019/02/03 21:15:10 node1 at 172.16.0.96:7946
2019/02/03 21:15:10 node1 meta = {"region":"ap-northeast-1","zone":"1a","status":"initial","weight":0}

では、この情報を他のノードで取得してみましょう

package main

import(
  "log"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)

func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947
  conf.AdvertisePort = conf.BindPort

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")

  type OtherNodeMeta struct {
    R string `json:"region"`
    Z string `json:"zone"`
    S string `json:"status"`
    W uint64 `json:"weight"`
  }

  for _, node := range list.Members() {
    // node2 はメタデータを設定してないので skip
    if len(node.Meta) < 1 {
      continue
    }

    meta := new(OtherNodeMeta)
    if err := json.Unmarshal(node.Meta, meta); err != nil {
      log.Fatal(err)
    }

    log.Printf("node name: %s", node.Name)
    log.Printf("- meta region:%s zone:%s status:%s weight:%d", meta.R, meta.Z, meta.S, meta.W)
  }
}

この状態で起動すると、クラスタ参加時に node の情報を受け取る事ができます

$ go run bar.go 
2019/02/03 21:30:02 cluster join to 172.16.0.96:7946
2019/02/03 21:30:02 join successful.
2019/02/03 21:30:02 node name: node1
2019/02/03 21:30:02 - meta region:ap-northeast-1 zone:1a status:initial weight:0

JSONのデータを使っているので Unmarshal して構造体に入れ直していますが、問題なくデータを取れたようです
この状態でも起動しているノードの情報を受け取れるのですが、動的にノードの情報を受け取りたいので EventDelegate と組み合わせて動的に metadata を利用しましょう

EventDelegate との組み合わせ

EventDelegate を使えばノード個別の更新イベントを受け取れるので、
3秒おきに status が変わり、5秒おきに weight が変わる(つまりセットアップ -> プロビジョン, 負荷状況の変化)的なことをやっているノードを擬似的に作ってみます

まずは、わかりやすいように状況を観察するノード

package main
 
import(
  "log"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)

type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}

type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
}

type MyEventDelegate struct {
  Notify chan *memberlist.Node
}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
  d.Notify <- node
}

func main() {
  events := new(MyEventDelegate)
  events.Notify = make(chan *memberlist.Node)

  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  conf.Events = events

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)

  for {
    select {
    case updates := <-events.Notify:
      meta := new(MyMetaData)
      if err := json.Unmarshal(updates.Meta, meta); err != nil {
        log.Fatal(err)
      }

      log.Printf("node %s updated meta{status:%s, weight:%d}", updates.Name, meta.Status, meta.Weight)
    }
  }
}

次に 自身の情報を更新する node を作りましょう

package main

import(
  "log"
  "time"
  "encoding/json"
  "github.com/hashicorp/memberlist"
)

type MyDelegate struct {
  meta  MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
  return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
  // not use, noop
  return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
  // not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
  // not use, noop
  return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
  // not use
}

type MyMetaData struct {
  Region   string   `json:"region"`
  Zone     string   `json:"zone"`
  Status   string   `json:"status"`
  Weight   uint64   `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
  data, err := json.Marshal(m)
  if err != nil {
    return []byte("")
  }
  return data
}

func main() {
  delegate := new(MyDelegate)
  delegate.meta = MyMetaData{
    Region: "ap-northeast-1",
    Zone: "1c",
    Status: "initial",
    Weight: 0,
  }

  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947
  conf.AdvertisePort = conf.BindPort
  conf.Delegate = delegate

  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }

  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")

  statusTick := time.NewTicker(3 * time.Second)
  weightTick := time.NewTicker(5 * time.Second)
  for {
    select {
    case <-statusTick.C:
      switch delegate.meta.Status {
      case "initial":
        delegate.meta.Status = "running"
      case "running":
        delegate.meta.Status = "waiting"
      case "waiting":
        delegate.meta.Status = "initial"
      }
      timeout := 10 * time.Second
      list.UpdateNode(timeout)

    case <-weightTick.C:
      switch {
      case delegate.meta.Weight < 10:
        delegate.meta.Weight = 50
      case delegate.meta.Weight < 100:
        delegate.meta.Weight = 10
      }
      timeout := 10 * time.Second
      list.UpdateNode(timeout)
    }
  }
}

(なおMyMetaData は同じものを使っています、ここではわかりやすいようにどちらのコードにも登場してますが、中身は同じものです)

この状態で起動すると

# session1
$ go run foo.go
2019/02/03 23:25:54 node1 at 172.16.0.96:7946

# session2
$ go run bar.go 
2019/02/03 23:25:57 cluster join to 172.16.0.96:7946
2019/02/03 23:25:57 join successful.
:

ちゃんと UpdateNode イベントを受け取ってリアルタイムにノードのメタデータが更新されているのが取れました。(下記)

2019/02/03 23:26:00 node node2 updated meta{status:running, weight:0}
2019/02/03 23:26:02 node node2 updated meta{status:running, weight:50}
2019/02/03 23:26:03 node node2 updated meta{status:waiting, weight:50}
2019/02/03 23:26:06 node node2 updated meta{status:initial, weight:50}
2019/02/03 23:26:07 node node2 updated meta{status:initial, weight:10}
2019/02/03 23:26:09 node node2 updated meta{status:running, weight:10}

メタデータの応用

簡単なサンプルでしたがメタデータをリアルタイムに利用できることがわかりました(ちゃんと実装すれば双方向でメタデータを受け取れる)
これを利用することで単純なイベントデータ以外にもメタデータを利用して応用する幅が広がりそうです

例えば

  • Zone を利用した 冗長化設計
  • Status を使った 高可用設計
  • Weight による ロードバランシング

メタデータは自由に使える領域なのでZone/Status/Weight以外にも固有のデータを使ったクラスタリングができそうです。
今回紹介できなかった機能があるので、別途また書きたいなと思ってます

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?