groupcache で組み込み型分散キャッシュで groupcache について説明しました。
groupcache を使うにはピア管理が課題になります。解決方法はいくつかありますが、今回は serf がメンバー管理に使っている memberlist パッケージを使って自動化してみます。
コード
package main
import (
"encoding/json"
"fmt"
"github.com/codegangsta/martini"
"github.com/golang/groupcache"
"github.com/hashicorp/memberlist"
"log"
"net/http"
"os"
"strings"
"time"
)
const GroupcachePort = 8000
type eventDelegate struct {
peers []string
pool *groupcache.HTTPPool
}
func (e *eventDelegate) NotifyJoin(node *memberlist.Node) {
uri := e.groupcacheURI(node.Addr.String())
e.removePeer(uri)
e.peers = append(e.peers, uri)
if e.pool != nil {
e.pool.Set(e.peers...)
}
log.Print("Add peer: " + uri)
log.Printf("Current peers: %v", e.peers)
}
func (e *eventDelegate) NotifyLeave(node *memberlist.Node) {
uri := e.groupcacheURI(node.Addr.String())
e.removePeer(uri)
e.pool.Set(e.peers...)
log.Print("Remove peer: " + uri)
log.Printf("Current peers: %v", e.peers)
}
func (e *eventDelegate) NotifyUpdate(node *memberlist.Node) {
log.Print("Update the node: %+v\n", node)
}
func (e *eventDelegate) groupcacheURI(addr string) string {
return fmt.Sprintf("http://%s:%d", addr, GroupcachePort)
}
func (e *eventDelegate) removePeer(uri string) {
for i := 0; i < len(e.peers); i++ {
if e.peers[i] == uri {
e.peers = append(e.peers[:i], e.peers[i+1:]...)
i--
}
}
}
func initGroupCache() {
eventHandler := &eventDelegate{}
conf := memberlist.DefaultLANConfig()
conf.Events = eventHandler
if addr := os.Getenv("GROUPCACHE_ADDR"); addr != "" {
conf.AdvertiseAddr = addr
}
list, err := memberlist.Create(conf)
if err != nil {
panic("Failed to created memberlist: " + err.Error())
}
self := list.Members()[0]
addr := fmt.Sprintf("%s:%d", self.Addr, GroupcachePort)
eventHandler.pool = groupcache.NewHTTPPool("http://" + addr)
go http.ListenAndServe(addr, eventHandler.pool)
if nodes := os.Getenv("JOIN_TO"); nodes != "" {
if _, err := list.Join(strings.Split(nodes, ",")); err != nil {
panic("Failed to join cluster: " + err.Error())
}
}
}
func main() {
initGroupCache()
heavy := groupcache.NewGroup("heavy", 64<<20, groupcache.GetterFunc(heavyTask))
m := martini.Classic()
m.Get("/_stats", func() []byte {
v, err := json.Marshal(&heavy.Stats)
if err != nil {
panic(err)
}
return v
})
m.Get("/:key", func(params martini.Params) string {
var result string
if err := heavy.Get(nil, params["key"], groupcache.StringSink(&result)); err != nil {
panic(err)
}
return result
})
m.Run()
}
func heavyTask(ctx groupcache.Context, key string, dst groupcache.Sink) error {
time.Sleep(400 * time.Millisecond)
dst.SetString("Value of " + key)
return nil
}
ポイントは memberlist.EventDelegate
インタフェースを実装して memberlist.Config.Events
に代入することです。これにより、クラスタ構成に変化があったときにコールバックしてもらえます。
使い方
このプログラムは以下のポートを使います。
- 3000: サービス提供
- 7946: memberlist のクラスタ管理
- 8000: groupcache の内部 RPC
ポートを変更できるようにしていないので、VM や Docker を使って2台以上のホストを用意します。
1つめを以下のコマンドで起動します。
$ GROUPCACHE_ADDR=192.168.158.1 go run main.go
複数の IP アドレスを持つホストの場合、GROUPCACHE_ADDR
に groupcache で使いたい IP アドレスを指定します。memberlist もこのアドレスを使って死活監視を行います。GROUPCACHE_ADDR
が指定されなかった場合は自動判別を試みますが、うまくいかない場合は明示的に指定してください。
2つめ以降を以下のコマンドで起動します。
$ GROUPCACHE_ADDR=192.168.158.2 JOIN_TO=192.168.158.1 go run main.go
JOIN_TO
で指定したホストに起動時に自動接続します。以降は memberlist で死活監視を行い、ピアの DOWN/UP を検知すると自動的に groupcache のピアリストが更新されます。
curl を使ってそれぞれのプロセスにリクエストを出せば、groupcache のクラスタが正しく構成されていることが確認できます。
$ curl 192.168.158.1:3000/foo # 400ms
$ curl 192.168.158.2:3000/foo # <1ms
おわりに
groupcache と memberlist を組み合わせると、ビルドしたバイナリファイルを各ホストに置いて起動するだけで、キャッシュを共有しながらシステムをスケールアウトさせることができます。
死活監視も同じプロセス内で行っているため、非常にシンプルなシステム構成になっています。デプロイについてあれこれ考えなくてすむので、便利ですね。