memberlist とは
go で記述されたクラスタメンバ管理ライブラリ(みんな大好きHashicorp製)です
gossipベースのプロトコルでクラスタメンバ管理やノード障害検出などの機能を備えています
https://github.com/hashicorp/memberlist
この記事の目的
memberlist は非常に簡単に使うことができるのと、使い方次第ではいろいろな実装パターンに応用ができるため個人的には好きなライブラリの一つです
いくつか実装を書いてきた中でよく使うベース機能のまとめを書いていきたいなと思ってます
今回のこの記事では、memberlist を使ったクラスタリングで、クラスタに参加する方法とクラスタから削除する方法を書きたいなと思います
なおこの記事で扱っているものは github にも上げています
https://github.com/octu0/example-memberlist/tree/master/01-cluster-join
最初のノードの起動
memberlist ノードを起動するには、 memberlist.Config の作成と memberlist.Create によって memberlist.Memberlist インスタンスを作成します
import (
  "log"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  log.Printf("%s", list.LocalNode())
  // このままだとすぐに終了するので何かしらの処理を書く(後述)
}
memberlist.Config を作成する時は、クラスタを置くネットワークに応じて DefaultLANConfig や DefaultWANConfig を使い分けると良いでしょう。 (LANConfig/WANConfig の使い分けなどはたぶん別の記事で書きます)
今回はローカルネットワーク上に設置するため DefaultLocalConfig を使っています。
最初のステップのため細かくは記述しませんが、今回はノード名(Name)だけを変更しています
この状態で起動しても 2019/01/26 17:27:56 node1 のようにログを出力しただけで停止してしまうため、シグナルが入るまで起動させっぱなしにするようにしましょう
シグナルハンドリング
シグナルハンドリングを行うには os.Signal と signal.Notify そして syscall.Signal を使って実装します
今回は Ctrl - c を使って SIGINT をトラップしたいのと、 kill でも止められるように SIGTERM をトラップするように作ります
import(
  "log"
  "os"
  "os/signal"
  "syscall"
)
func main(){
  defer func(){
    log.Printf("bye")
  }()
  signal_chan := make(chan os.Signal, 2)
  signal.Notify(signal_chan, syscall.SIGTERM)
  signal.Notify(signal_chan, syscall.SIGINT)
  log.Printf("wait for signal: pid=%d", os.Getpid())
  for {
    select {
    case s := <-signal_chan:
      log.Printf("signal %s happen", s.String())
      return
    }
  }
}
この実装によって起動してもすぐに終了しなくなります
$ go run foo.go
2019/01/26 17:41:07 wait for signal: pid=16563
ここで、C-c を入力して停止させたり、 kill コマンドを使って kill ${pid} の term シグナルで停止することを確認しておきます
$ go run foo.go
2019/01/26 17:41:07 wait for signal: pid=16563
^C2019/02/16 17:41:31 signal interrupt happen # C-c を入力
2019/02/16 17:41:31 bye
$ go run foo.go
2019/01/16 17:43:04 wait for signal: pid=16586
2019/01/16 17:43:10 signal terminated happen # kill 16586 を実行
2019/01/16 17:43:10 bye
組み合わせて起動させる
脱線してしまいましたが、 シグナルハンドリングも組み合わせて memberlist のインスタンスを起動させっぱなしにします
package main
import(
  "log"
  "os"
  "os/signal"
  "syscall"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node1"
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node1 at %s:%d", local.Addr.To4().String(), local.Port)
  log.Printf("wait for other member connections")
  wait_signal()
}
func wait_signal(){
  signal_chan := make(chan os.Signal, 2)
  signal.Notify(signal_chan, syscall.SIGTERM)
  signal.Notify(signal_chan, syscall.SIGINT)
  for {
    select {
    case s := <-signal_chan:
      log.Printf("signal %s happen", s.String())
      return
    }
  }
}
正常に起動したでしょうか、最初のコードと少し違い node名の他に ipアドレスとport番号を表示するようにしています
$ go run foo.go
2019/01/26 17:52:51 node1 at 172.16.0.96:7946 # ←ここのアドレスを使う
2019/01/26 17:52:51 wait for other member connections
これはアドレスは次に書くノードを参加させる時に利用します
クラスタにノードを参加させる(2台目以降)
さて、ここまででノードの起動ができたので、実際にクラスタを作成していきましょう
最初のノードに参加する別のノードを作成します
memberlist インスタンスの作成方法はほとんど同じ(いくつか注意点があります)で、 Memberlist.Join を使って参加させます
package main
import(
  "log"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  list, err := memberlist.Create(conf)
  if err != nil {
    //
    // **このままではエラーでJoinできない**
    //
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node2 at %s:%d", local.Addr.To4().String(), local.Port)
  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")
}
実はこのコードには問題があります
DefaultLocalConfig で作成した Config では、すでに起動している node1 のアドレスと被ってしまっています
BindAddr/BindPort また、ノードが通信で使用する AdvertiseAddr/AdvertisePort は分離されている必要があります
被らないようにConfigを修正し Join できるところまで進めます
今回は同一マシン上で行っているため、port番号を分けるようにしました
package main
import(
  "log"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947 // ポート番号が被らないように修正
  conf.AdvertisePort = conf.BindPort // AdvertisePort は今回はBindPortと同じにした
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node2 at %s:%d", local.Addr.To4().String(), local.Port)
  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")
}
これによって join successful が表示されるようになりました
$ go run bar.go
2019/01/26 18:09:00 node2 at 172.16.0.96:7947
2019/01/26 18:09:00 cluster join to 172.16.0.96:7946
2019/01/26 18:09:00 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/26 18:09:00 join successful.
ここまでできたら、port番号を変えながら他のノードを複数参加させてみて Members() から返ってくるノード情報が変化することを確認しましょう
クラスタ内のノード情報を取得する
無事にノードに参加することが出来たため、ノードに参加中のメンバー情報を取得してみます
取得するには memberlist.Members を使います
join sucessful の後で実行するようにしましょう
for _, member := range list.Members() {
  log.Printf("Member: %s(%s:%d)", member.Name, member.Addr.To4().String(), member.Port)
}
これで参加中のノードの情報が取れるようになりました
2019/01/26 18:14:41 join successful.
2019/01/26 18:14:41 Member: node1(172.16.0.96:7946)
2019/01/26 18:14:41 Member: node2(172.16.0.96:7947)
さぁこれで memberlist によるクラスタリングの最初の一歩ができました
(まだノードの情報が取れるところまでですが...。)
クラスタからノードを削除する
ノードに参加することができたので、次はノードから削除(離脱)も実装していきましょう
memberlist に備わっている標準のヘルスチェックでノードがダウンした時は自動でクラスタから削除されますが、今回は明示的にクラスタから離脱させます
離脱には Leave のメソッドを使用します
先ほどのシグナルハンドリングと組み合わせて、 SIGINT でクラスタから離脱し、SIGTERM でプロセスが落ちるように作成してみます
package main
import(
  "log"
  "time"
  "os"
  "os/signal"
  "syscall"
  "github.com/hashicorp/memberlist"
)
func main() {
  conf := memberlist.DefaultLocalConfig()
  conf.Name = "node2"
  conf.BindPort = 7947 // ポート番号が被らないように修正
  conf.AdvertisePort = conf.BindPort
  list, err := memberlist.Create(conf)
  if err != nil {
    log.Fatal(err)
  }
  local := list.LocalNode()
  log.Printf("node2(pid=%d) at %s:%d", os.Getpid(), local.Addr.To4().String(), local.Port)
  joinAddr := "172.16.0.96:7946"
  log.Printf("cluster join to %s", joinAddr)
  if _, err := list.Join([]string{joinAddr}); err != nil {
    log.Fatal(err)
  }
  log.Printf("join successful.")
  for _, member := range list.Members() {
    log.Printf("Member: %s(%s:%d)", member.Name, member.Addr.To4().String(), member.Port)
  }
  //
  // シグナルハンドリングと組み合わせた Leave の実装
  //
  signal_chan := make(chan os.Signal, 2)
  signal.Notify(signal_chan, syscall.SIGTERM)
  signal.Notify(signal_chan, syscall.SIGINT)
  for {
    select {
    case s := <-signal_chan:
      switch s {
      case syscall.SIGINT:
        log.Printf("SIGINT happen. cluter leaving")
        timeout := 10 * time.Second
        if err := list.Leave(timeout); err != nil {
          log.Fatal(err)
        }
        log.Printf("cluter left.")
      case syscall.SIGTERM:
        log.Printf("SIGTERM happen. bye.")
        return
      }
    }
  }
}
実行してみましょう
$ go run bar.go 
2019/01/26 18:35:00 node2(pid=17060) at 172.16.0.96:7947
2019/01/26 18:35:00 cluster join to 172.16.0.96:7946
2019/01/26 18:35:00 [DEBUG] memberlist: Initiating push/pull sync with: 172.16.0.96:7946
2019/01/26 18:35:00 join successful.
2019/01/26 18:35:00 Member: node1(172.16.0.96:7946)
2019/01/26 18:35:00 Member: node2(172.16.0.96:7947)
2019/01/26 18:35:02 [DEBUG] memberlist: Stream connection from=172.16.0.96:55892
# ここで kill -INT 17060 を実行
2019/01/26 18:35:04 SIGINT happen. cluter leaving
2019/01/26 18:35:05 cluter left.
# ここで kill -TERM 17060 を実行
2019/01/26 18:35:13 SIGTERM happen. bye.
明示的にクラスタから離脱できるようになったので、これによって安全にクラスタから外す事ができりょうになります
例えば、離脱前に処理中の worker プロセスを確実に停止させてから離脱させて、クラスタ内の処理を健全に保つようにしたりです。
今回はここまで
記事が長くなったので今回はここまでにします
今回の実装だけでも、ノードの追加・削除の管理ができるようになっているため、次のような実装を作ることもできると思います
- 起動中のノードの死活管理(join/leave)
- ノード情報の取得(node名/ip/port)
- 自動構築プログラムとの連携
今回紹介できなかった機能があるので、別途また書きたいなと思ってます
- ノードの参加/削除/更新のイベントの取得
- ノードに固有のメタデータ設定
- ノード間通信
- ノード全体への通信
(書いたらリンクを張りたいと思います)
