LoginSignup
15
16

More than 5 years have passed since last update.

groupcache のピアを memberlist で自動管理

Last updated at Posted at 2014-04-01

groupcache‎ で組み込み型分散キャッシュで groupcache‎ について説明しました。

groupcache を使うにはピア管理が課題になります。解決方法はいくつかありますが、今回は serf がメンバー管理に使っている memberlist パッケージを使って自動化してみます。

コード

package main

import (
    "encoding/json"
    "fmt"
    "github.com/codegangsta/martini"
    "github.com/golang/groupcache"
    "github.com/hashicorp/memberlist"
    "log"
    "net/http"
    "os"
    "strings"
    "time"
)

const GroupcachePort = 8000

type eventDelegate struct {
    peers []string
    pool  *groupcache.HTTPPool
}

func (e *eventDelegate) NotifyJoin(node *memberlist.Node) {
    uri := e.groupcacheURI(node.Addr.String())
    e.removePeer(uri)
    e.peers = append(e.peers, uri)
    if e.pool != nil {
        e.pool.Set(e.peers...)
    }
    log.Print("Add peer: " + uri)
    log.Printf("Current peers: %v", e.peers)
}

func (e *eventDelegate) NotifyLeave(node *memberlist.Node) {
    uri := e.groupcacheURI(node.Addr.String())
    e.removePeer(uri)
    e.pool.Set(e.peers...)
    log.Print("Remove peer: " + uri)
    log.Printf("Current peers: %v", e.peers)
}

func (e *eventDelegate) NotifyUpdate(node *memberlist.Node) {
    log.Print("Update the node: %+v\n", node)
}

func (e *eventDelegate) groupcacheURI(addr string) string {
    return fmt.Sprintf("http://%s:%d", addr, GroupcachePort)
}

func (e *eventDelegate) removePeer(uri string) {
    for i := 0; i < len(e.peers); i++ {
        if e.peers[i] == uri {
            e.peers = append(e.peers[:i], e.peers[i+1:]...)
            i--
        }
    }
}

func initGroupCache() {
    eventHandler := &eventDelegate{}
    conf := memberlist.DefaultLANConfig()
    conf.Events = eventHandler
    if addr := os.Getenv("GROUPCACHE_ADDR"); addr != "" {
        conf.AdvertiseAddr = addr
    }

    list, err := memberlist.Create(conf)
    if err != nil {
        panic("Failed to created memberlist: " + err.Error())
    }

    self := list.Members()[0]
    addr := fmt.Sprintf("%s:%d", self.Addr, GroupcachePort)
    eventHandler.pool = groupcache.NewHTTPPool("http://" + addr)
    go http.ListenAndServe(addr, eventHandler.pool)

    if nodes := os.Getenv("JOIN_TO"); nodes != "" {
        if _, err := list.Join(strings.Split(nodes, ",")); err != nil {
            panic("Failed to join cluster: " + err.Error())
        }
    }
}

func main() {
    initGroupCache()
    heavy := groupcache.NewGroup("heavy", 64<<20, groupcache.GetterFunc(heavyTask))

    m := martini.Classic()
    m.Get("/_stats", func() []byte {
        v, err := json.Marshal(&heavy.Stats)
        if err != nil {
            panic(err)
        }
        return v
    })
    m.Get("/:key", func(params martini.Params) string {
        var result string
        if err := heavy.Get(nil, params["key"], groupcache.StringSink(&result)); err != nil {
            panic(err)
        }
        return result
    })
    m.Run()
}

func heavyTask(ctx groupcache.Context, key string, dst groupcache.Sink) error {
    time.Sleep(400 * time.Millisecond)
    dst.SetString("Value of " + key)
    return nil
}

ポイントは memberlist.EventDelegate インタフェースを実装して memberlist.Config.Events に代入することです。これにより、クラスタ構成に変化があったときにコールバックしてもらえます。

使い方

このプログラムは以下のポートを使います。

  • 3000: サービス提供
  • 7946: memberlist のクラスタ管理
  • 8000: groupcache の内部 RPC

ポートを変更できるようにしていないので、VM や Docker を使って2台以上のホストを用意します。

1つめを以下のコマンドで起動します。

$ GROUPCACHE_ADDR=192.168.158.1 go run main.go

複数の IP アドレスを持つホストの場合、GROUPCACHE_ADDR に groupcache で使いたい IP アドレスを指定します。memberlist もこのアドレスを使って死活監視を行います。GROUPCACHE_ADDR が指定されなかった場合は自動判別を試みますが、うまくいかない場合は明示的に指定してください。

2つめ以降を以下のコマンドで起動します。

$ GROUPCACHE_ADDR=192.168.158.2 JOIN_TO=192.168.158.1 go run main.go

JOIN_TO で指定したホストに起動時に自動接続します。以降は memberlist で死活監視を行い、ピアの DOWN/UP を検知すると自動的に groupcache のピアリストが更新されます。

curl を使ってそれぞれのプロセスにリクエストを出せば、groupcache のクラスタが正しく構成されていることが確認できます。

$ curl 192.168.158.1:3000/foo  # 400ms
$ curl 192.168.158.2:3000/foo  # <1ms

おわりに

groupcache と memberlist を組み合わせると、ビルドしたバイナリファイルを各ホストに置いて起動するだけで、キャッシュを共有しながらシステムをスケールアウトさせることができます。

死活監視も同じプロセス内で行っているため、非常にシンプルなシステム構成になっています。デプロイについてあれこれ考えなくてすむので、便利ですね。

15
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
16