Go
golang
groupcache
memberlist

groupcache のピアを memberlist で自動管理

More than 5 years have passed since last update.

groupcache‎ で組み込み型分散キャッシュで groupcache‎ について説明しました。

groupcache を使うにはピア管理が課題になります。解決方法はいくつかありますが、今回は serf がメンバー管理に使っている memberlist パッケージを使って自動化してみます。


コード

package main

import (
"encoding/json"
"fmt"
"github.com/codegangsta/martini"
"github.com/golang/groupcache"
"github.com/hashicorp/memberlist"
"log"
"net/http"
"os"
"strings"
"time"
)

const GroupcachePort = 8000

type eventDelegate struct {
peers []string
pool *groupcache.HTTPPool
}

func (e *eventDelegate) NotifyJoin(node *memberlist.Node) {
uri := e.groupcacheURI(node.Addr.String())
e.removePeer(uri)
e.peers = append(e.peers, uri)
if e.pool != nil {
e.pool.Set(e.peers...)
}
log.Print("Add peer: " + uri)
log.Printf("Current peers: %v", e.peers)
}

func (e *eventDelegate) NotifyLeave(node *memberlist.Node) {
uri := e.groupcacheURI(node.Addr.String())
e.removePeer(uri)
e.pool.Set(e.peers...)
log.Print("Remove peer: " + uri)
log.Printf("Current peers: %v", e.peers)
}

func (e *eventDelegate) NotifyUpdate(node *memberlist.Node) {
log.Print("Update the node: %+v\n", node)
}

func (e *eventDelegate) groupcacheURI(addr string) string {
return fmt.Sprintf("http://%s:%d", addr, GroupcachePort)
}

func (e *eventDelegate) removePeer(uri string) {
for i := 0; i < len(e.peers); i++ {
if e.peers[i] == uri {
e.peers = append(e.peers[:i], e.peers[i+1:]...)
i--
}
}
}

func initGroupCache() {
eventHandler := &eventDelegate{}
conf := memberlist.DefaultLANConfig()
conf.Events = eventHandler
if addr := os.Getenv("GROUPCACHE_ADDR"); addr != "" {
conf.AdvertiseAddr = addr
}

list, err := memberlist.Create(conf)
if err != nil {
panic("Failed to created memberlist: " + err.Error())
}

self := list.Members()[0]
addr := fmt.Sprintf("%s:%d", self.Addr, GroupcachePort)
eventHandler.pool = groupcache.NewHTTPPool("http://" + addr)
go http.ListenAndServe(addr, eventHandler.pool)

if nodes := os.Getenv("JOIN_TO"); nodes != "" {
if _, err := list.Join(strings.Split(nodes, ",")); err != nil {
panic("Failed to join cluster: " + err.Error())
}
}
}

func main() {
initGroupCache()
heavy := groupcache.NewGroup("heavy", 64<<20, groupcache.GetterFunc(heavyTask))

m := martini.Classic()
m.Get("/_stats", func() []byte {
v, err := json.Marshal(&heavy.Stats)
if err != nil {
panic(err)
}
return v
})
m.Get("/:key", func(params martini.Params) string {
var result string
if err := heavy.Get(nil, params["key"], groupcache.StringSink(&result)); err != nil {
panic(err)
}
return result
})
m.Run()
}

func heavyTask(ctx groupcache.Context, key string, dst groupcache.Sink) error {
time.Sleep(400 * time.Millisecond)
dst.SetString("Value of " + key)
return nil
}

ポイントは memberlist.EventDelegate インタフェースを実装して memberlist.Config.Events に代入することです。これにより、クラスタ構成に変化があったときにコールバックしてもらえます。


使い方

このプログラムは以下のポートを使います。


  • 3000: サービス提供

  • 7946: memberlist のクラスタ管理

  • 8000: groupcache の内部 RPC

ポートを変更できるようにしていないので、VM や Docker を使って2台以上のホストを用意します。

1つめを以下のコマンドで起動します。

$ GROUPCACHE_ADDR=192.168.158.1 go run main.go

複数の IP アドレスを持つホストの場合、GROUPCACHE_ADDR に groupcache で使いたい IP アドレスを指定します。memberlist もこのアドレスを使って死活監視を行います。GROUPCACHE_ADDR が指定されなかった場合は自動判別を試みますが、うまくいかない場合は明示的に指定してください。

2つめ以降を以下のコマンドで起動します。

$ GROUPCACHE_ADDR=192.168.158.2 JOIN_TO=192.168.158.1 go run main.go

JOIN_TO で指定したホストに起動時に自動接続します。以降は memberlist で死活監視を行い、ピアの DOWN/UP を検知すると自動的に groupcache のピアリストが更新されます。

curl を使ってそれぞれのプロセスにリクエストを出せば、groupcache のクラスタが正しく構成されていることが確認できます。

$ curl 192.168.158.1:3000/foo  # 400ms

$ curl 192.168.158.2:3000/foo # <1ms


おわりに

groupcache と memberlist を組み合わせると、ビルドしたバイナリファイルを各ホストに置いて起動するだけで、キャッシュを共有しながらシステムをスケールアウトさせることができます。

死活監視も同じプロセス内で行っているため、非常にシンプルなシステム構成になっています。デプロイについてあれこれ考えなくてすむので、便利ですね。