memberlist とは
go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist
この記事の目的
memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていて、前回の記事の続きになります
今回は memberlist で提供されている Delegate.NodeMeta() を使って、ノードに含まれるメタデータ管理についてまとめて行きたいなと思っています
なお今回の記事で扱っているものは github にも上がっています
https://github.com/octu0/example-memberlist/tree/master/03-metadata
Delegate について
前回の記事では EventDelegate のインタフェースを使って、ノードイベントを受け取る例となっているのですが、今回は Delegate インタフェースを使ってノード固有の値を管理・共有する例を紹介していきます
さっそく実装していきます
さきに、メタデータを管理する構造体として MyMetaData を用意します
これは、AWS EC2 のインスタンスをイメージしています
Region/AZとノード固有の重み付けのようなものを管理したいと思います
import "encoding/json"
type MyMetaData struct {
Region string `json:"region"`
Zone string `json:"zone"`
Status string `json:"status"`
Weight uint64 `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
data, err := json.Marshal(m)
if err != nil {
return []byte("")
}
return data
}
Bytes()
の結果を JSON に変換していますが、
これは次に出てくる Delegate で扱えるメタデータが []byte
で管理するため、今回は見慣れているであろう JSON にしています
Gob を使ったり任意のデータ構造を使っても構わないです
次に Delegate を実装していきます
type MyDelegate struct {
meta MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
// not use, noop
return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
// not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
// not use, noop
return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
// not use
}
Delegate から NodeMeta が呼び出された時に 先ほどの MyMetaData の json バイナリを返すように実装しました
それでは、実行できるように
func main() {
m := MyMetaData{
Region: "ap-northeast-1",
Zone: "1a",
Status: "initial",
Weight: 0,
}
conf := memberlist.DefaultLocalConfig()
conf.Name = "node1"
conf.Delegate = &MyDelegate{
meta: m,
}
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
local := list.LocalNode()
log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
log.Printf("node1 meta = %s", string(local.Meta))
wait_signal()
}
wait_signal は以前作ったものにしています
また起動時に Node.Meta を使ってメタデータの表示をしています
$ go run foo.go
2019/02/03 21:15:10 node1 at 172.16.0.96:7946
2019/02/03 21:15:10 node1 meta = {"region":"ap-northeast-1","zone":"1a","status":"initial","weight":0}
では、この情報を他のノードで取得してみましょう
package main
import(
"log"
"encoding/json"
"github.com/hashicorp/memberlist"
)
func main() {
conf := memberlist.DefaultLocalConfig()
conf.Name = "node2"
conf.BindPort = 7947
conf.AdvertisePort = conf.BindPort
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
joinAddr := "172.16.0.96:7946"
log.Printf("cluster join to %s", joinAddr)
if _, err := list.Join([]string{joinAddr}); err != nil {
log.Fatal(err)
}
log.Printf("join successful.")
type OtherNodeMeta struct {
R string `json:"region"`
Z string `json:"zone"`
S string `json:"status"`
W uint64 `json:"weight"`
}
for _, node := range list.Members() {
// node2 はメタデータを設定してないので skip
if len(node.Meta) < 1 {
continue
}
meta := new(OtherNodeMeta)
if err := json.Unmarshal(node.Meta, meta); err != nil {
log.Fatal(err)
}
log.Printf("node name: %s", node.Name)
log.Printf("- meta region:%s zone:%s status:%s weight:%d", meta.R, meta.Z, meta.S, meta.W)
}
}
この状態で起動すると、クラスタ参加時に node の情報を受け取る事ができます
$ go run bar.go
2019/02/03 21:30:02 cluster join to 172.16.0.96:7946
2019/02/03 21:30:02 join successful.
2019/02/03 21:30:02 node name: node1
2019/02/03 21:30:02 - meta region:ap-northeast-1 zone:1a status:initial weight:0
JSONのデータを使っているので Unmarshal して構造体に入れ直していますが、問題なくデータを取れたようです
この状態でも起動しているノードの情報を受け取れるのですが、動的にノードの情報を受け取りたいので EventDelegate と組み合わせて動的に metadata を利用しましょう
EventDelegate との組み合わせ
EventDelegate を使えばノード個別の更新イベントを受け取れるので、
3秒おきに status が変わり、5秒おきに weight が変わる(つまりセットアップ -> プロビジョン, 負荷状況の変化)的なことをやっているノードを擬似的に作ってみます
まずは、わかりやすいように状況を観察するノード
package main
import(
"log"
"encoding/json"
"github.com/hashicorp/memberlist"
)
type MyDelegate struct {
meta MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
// not use, noop
return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
// not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
// not use, noop
return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
// not use
}
type MyMetaData struct {
Region string `json:"region"`
Zone string `json:"zone"`
Status string `json:"status"`
Weight uint64 `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
data, err := json.Marshal(m)
if err != nil {
return []byte("")
}
return data
}
type MyEventDelegate struct {
Notify chan *memberlist.Node
}
func (d *MyEventDelegate) NotifyJoin(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyLeave(node *memberlist.Node) {
}
func (d *MyEventDelegate) NotifyUpdate(node *memberlist.Node) {
d.Notify <- node
}
func main() {
events := new(MyEventDelegate)
events.Notify = make(chan *memberlist.Node)
conf := memberlist.DefaultLocalConfig()
conf.Name = "node1"
conf.Events = events
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
local := list.LocalNode()
log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
for {
select {
case updates := <-events.Notify:
meta := new(MyMetaData)
if err := json.Unmarshal(updates.Meta, meta); err != nil {
log.Fatal(err)
}
log.Printf("node %s updated meta{status:%s, weight:%d}", updates.Name, meta.Status, meta.Weight)
}
}
}
次に 自身の情報を更新する node を作りましょう
package main
import(
"log"
"time"
"encoding/json"
"github.com/hashicorp/memberlist"
)
type MyDelegate struct {
meta MyMetaData
}
func (d *MyDelegate) NodeMeta(limit int) []byte {
return d.meta.Bytes()
}
func (d *MyDelegate) LocalState(join bool) []byte {
// not use, noop
return []byte("")
}
func (d *MyDelegate) NotifyMsg(msg []byte) {
// not use
}
func (d *MyDelegate) GetBroadcasts(overhead, limit int) [][]byte {
// not use, noop
return nil
}
func (d *MyDelegate) MergeRemoteState(buf []byte, join bool) {
// not use
}
type MyMetaData struct {
Region string `json:"region"`
Zone string `json:"zone"`
Status string `json:"status"`
Weight uint64 `json:"weight"`
}
func (m MyMetaData) Bytes() []byte {
data, err := json.Marshal(m)
if err != nil {
return []byte("")
}
return data
}
func main() {
delegate := new(MyDelegate)
delegate.meta = MyMetaData{
Region: "ap-northeast-1",
Zone: "1c",
Status: "initial",
Weight: 0,
}
conf := memberlist.DefaultLocalConfig()
conf.Name = "node2"
conf.BindPort = 7947
conf.AdvertisePort = conf.BindPort
conf.Delegate = delegate
list, err := memberlist.Create(conf)
if err != nil {
log.Fatal(err)
}
joinAddr := "172.16.0.96:7946"
log.Printf("cluster join to %s", joinAddr)
if _, err := list.Join([]string{joinAddr}); err != nil {
log.Fatal(err)
}
log.Printf("join successful.")
statusTick := time.NewTicker(3 * time.Second)
weightTick := time.NewTicker(5 * time.Second)
for {
select {
case <-statusTick.C:
switch delegate.meta.Status {
case "initial":
delegate.meta.Status = "running"
case "running":
delegate.meta.Status = "waiting"
case "waiting":
delegate.meta.Status = "initial"
}
timeout := 10 * time.Second
list.UpdateNode(timeout)
case <-weightTick.C:
switch {
case delegate.meta.Weight < 10:
delegate.meta.Weight = 50
case delegate.meta.Weight < 100:
delegate.meta.Weight = 10
}
timeout := 10 * time.Second
list.UpdateNode(timeout)
}
}
}
(なおMyMetaData は同じものを使っています、ここではわかりやすいようにどちらのコードにも登場してますが、中身は同じものです)
この状態で起動すると
# session1
$ go run foo.go
2019/02/03 23:25:54 node1 at 172.16.0.96:7946
# session2
$ go run bar.go
2019/02/03 23:25:57 cluster join to 172.16.0.96:7946
2019/02/03 23:25:57 join successful.
:
ちゃんと UpdateNode イベントを受け取ってリアルタイムにノードのメタデータが更新されているのが取れました。(下記)
2019/02/03 23:26:00 node node2 updated meta{status:running, weight:0}
2019/02/03 23:26:02 node node2 updated meta{status:running, weight:50}
2019/02/03 23:26:03 node node2 updated meta{status:waiting, weight:50}
2019/02/03 23:26:06 node node2 updated meta{status:initial, weight:50}
2019/02/03 23:26:07 node node2 updated meta{status:initial, weight:10}
2019/02/03 23:26:09 node node2 updated meta{status:running, weight:10}
メタデータの応用
簡単なサンプルでしたがメタデータをリアルタイムに利用できることがわかりました(ちゃんと実装すれば双方向でメタデータを受け取れる)
これを利用することで単純なイベントデータ以外にもメタデータを利用して応用する幅が広がりそうです
例えば
-
Zone
を利用した 冗長化設計 -
Status
を使った 高可用設計 -
Weight
による ロードバランシング
メタデータは自由に使える領域なのでZone/Status/Weight以外にも固有のデータを使ったクラスタリングができそうです。
今回紹介できなかった機能があるので、別途また書きたいなと思ってます