Posted at

etcd で分散ロックマネージャをつくる

More than 1 year has passed since last update.


はじめに


  • Google's SRE Book を読んで分散合意アルゴリズムかっこいいと思ったので etcd を試してみた

  • せっかくなので分散ロックマネージャを実装してみる


etcd とは


  • 分散合意アルゴリズムを使った KVS

  • 10000 write/sec 程度と高速で信頼性が高く k8s のバックエンドで利用されるなど実績もある

  • 公式ドキュメント


使ってみる


ダウンロード


実行

./etcd.ext


クライアントで動作確認

# 環境変数の設定

$ export ETCDCTL_API=3

# 値の記録
$ ./etcdctl.ext put mykey "value1"
OK

# 値の取得
./etcdctl.ext get mykey
mykey
value1

# 値の取得(メタデータも含めて JSON で取得)
$ ./etcdctl.ext get --write-out="json" mykey
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":4,"raft_term":2},"kvs":[{"key":"bXlrZXk=","create_revision":2,"mod_revision":4,"version":3,"value":"dmFsdWUgMQ=="}],"count":1}


複数サーバーで動作させる


  • 待ち受けポートを変えれば同じマシンで複数の etcd を動かせる


サーバー用ウィンドウその1で実行

TOKEN=token-01

CLUSTER_STATE=new
NAME_1=machine-1
NAME_2=machine-2
NAME_3=machine-3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
PORT_1A=2380
PORT_1B=2379
PORT_2A=2390
PORT_2B=2389
PORT_3A=2400
PORT_3B=2399
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_1A},${NAME_2}=http://${HOST_2}:${PORT_2A},${NAME_3}=http://${HOST_3}:${PORT_3A}

THIS_NAME=${NAME_1}
THIS_IP=${HOST_1}
THIS_PORT_A=${PORT_1A}
THIS_PORT_B=${PORT_1B}
./etcd --data-dir=data.1.etcd --name ${THIS_NAME} \
--initial-advertise-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" --listen-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" \
--advertise-client-urls "http://${THIS_IP}:${THIS_PORT_B}" --listen-client-urls "http://${THIS_IP}:${THIS_PORT_B}" \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}


サーバー用ウィンドウその2で実行

TOKEN=token-01

CLUSTER_STATE=new
NAME_1=machine-1
NAME_2=machine-2
NAME_3=machine-3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
PORT_1A=2380
PORT_1B=2379
PORT_2A=2390
PORT_2B=2389
PORT_3A=2400
PORT_3B=2399
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_1A},${NAME_2}=http://${HOST_2}:${PORT_2A},${NAME_3}=http://${HOST_3}:${PORT_3A}

THIS_NAME=${NAME_2}
THIS_IP=${HOST_2}
THIS_PORT_A=${PORT_2A}
THIS_PORT_B=${PORT_2B}
./etcd --data-dir=data.2.etcd --name ${THIS_NAME} \
--initial-advertise-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" --listen-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" \
--advertise-client-urls "http://${THIS_IP}:${THIS_PORT_B}" --listen-client-urls "http://${THIS_IP}:${THIS_PORT_B}" \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}


サーバー用ウィンドウその3で実行

TOKEN=token-01

CLUSTER_STATE=new
NAME_1=machine-1
NAME_2=machine-2
NAME_3=machine-3
HOST_1=127.0.0.1
HOST_2=127.0.0.1
HOST_3=127.0.0.1
PORT_1A=2380
PORT_1B=2379
PORT_2A=2390
PORT_2B=2389
PORT_3A=2400
PORT_3B=2399
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_1A},${NAME_2}=http://${HOST_2}:${PORT_2A},${NAME_3}=http://${HOST_3}:${PORT_3A}

THIS_NAME=${NAME_3}
THIS_IP=${HOST_3}
THIS_PORT_A=${PORT_3A}
THIS_PORT_B=${PORT_3B}
./etcd --data-dir=data.3.etcd --name ${THIS_NAME} \
--initial-advertise-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" --listen-peer-urls "http://${THIS_IP}:${THIS_PORT_A}" \
--advertise-client-urls "http://${THIS_IP}:${THIS_PORT_B}" --listen-client-urls "http://${THIS_IP}:${THIS_PORT_B}" \
--initial-cluster ${CLUSTER} \
--initial-cluster-state ${CLUSTER_STATE} --initial-cluster-token ${TOKEN}


クライアント用のウィンドウで動作確認

export ETCDCTL_API=3

HOST_1=10.240.0.17
HOST_2=10.240.0.18
HOST_3=10.240.0.19
ENDPOINTS=127.0.0.1:2379,127.0.0.1:2389,127.0.0.1:2399

./etcdctl --endpoints=$ENDPOINTS member list


Go のクライアントライブラリからアクセスしてみる


コード


client.go

package main

import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"log"
"golang.org/x/net/context"
)

var requestTimeout = 5 * time.Second

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379", "127.0.0.1:2389", "127.0.0.1:2399"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer cli.Close()

_, err = cli.Put(context.TODO(), "foo", "bar")
if err != nil {
log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
}
}



インストール



  • github.com/coreos/etcd は glide で依存パッケージを管理しているので glide から最新のバージョンをインストールする



    • go get とか govendor とかでインストールすると依存パッケージのバージョンがそろわずに正常に動作しなかった



# glide のインストール

go get github.com/Masterminds/glide
go install github.com/Masterminds/glide

# パッケージのインストール
glide create
glide install


動作確認

$ go run client.go

foo : bar


分散ロックマネージャを実装する


  • etcd はトランザクション機能で一貫性が保証されるので、これを利用して分散環境でリソースのロック情報を共有するための分散ロックマネージャを実装してみる


コード


  • REST API を提供するウェブサーバーとして実装した


    • PUT でロック取得

    • GET でステータス確認

    • DELETE でロック開放



  • etcd サーバーの設定は先に起動しているものをハードコーディングしてある

package main

import (
"encoding/json"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"log"
"net/http"
"time"
)

var servers = []string{"127.0.0.1:2379", "127.0.0.1:2389", "127.0.0.1:2399"}
var timeout = 5 * time.Second

func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "PUT":
lockHandler(w, r)
case "GET":
statusHandler(w, r)
case "DELETE":
unlockHandler(w, r)
}
})
log.Fatal(http.ListenAndServe(":8888", nil))
}

func lockHandler(w http.ResponseWriter, r *http.Request) {
// decode request
req := struct {
Resource string `json:"resource"`
UserID string `json:"user_id"`
}{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// create client
cli, err := clientv3.New(clientv3.Config{
Endpoints: servers,
DialTimeout: timeout,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer cli.Close()

// lock resource if unlocked
key, val := req.Resource, req.UserID
ctx, cancel := context.WithTimeout(context.Background(), timeout)
txnRes, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
Then(clientv3.OpPut(key, val)).
Commit()
cancel()

// create result
res := struct {
Succeeded bool `json:"succeeded"`
}{
Succeeded: txnRes.Succeeded,
}
json.NewEncoder(w).Encode(&res)
}

func statusHandler(w http.ResponseWriter, r *http.Request) {
// decode request
req := struct {
Resource string `json:"resource"`
}{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// create client
cli, err := clientv3.New(clientv3.Config{
Endpoints: servers,
DialTimeout: timeout,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer cli.Close()

// get status
ctx, cancel := context.WithTimeout(context.Background(), timeout)
getRes, err := cli.Get(ctx, req.Resource)
cancel()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// create result
res := struct {
Locked bool `json:"locked"`
UserID string `json:"user_id"`
}{
Locked: len(getRes.Kvs) > 0,
}
if res.Locked {
res.UserID = string(getRes.Kvs[0].Value)
}
json.NewEncoder(w).Encode(&res)
}

func unlockHandler(w http.ResponseWriter, r *http.Request) {
// decode request
req := struct {
Resource string `json:"resource"`
UserID string `json:"user_id"`
}{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// create client
cli, err := clientv3.New(clientv3.Config{
Endpoints: servers,
DialTimeout: timeout,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer cli.Close()

// unlock resource if locked
key, val := req.Resource, req.UserID
ctx, cancel := context.WithTimeout(context.Background(), timeout)
txnRes, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.Value(key), "=", val)).
Then(clientv3.OpDelete(key)).
Commit()
cancel()

// create response
res := struct {
Succeeded bool `json:"succeeded"`
}{
Succeeded: txnRes.Succeeded,
}
json.NewEncoder(w).Encode(&res)
}


動作確認

# ロックの取得

$ curl -H "Accept: application/json" -H "Content-type: application/json" -X PUT -d '{"resource":"/path/to/my/file1","user_id":"user1"}' http://localhost:8888
{"succeeded":true}

# ロックの確認
$ curl -H "Accept: application/json" -H "Content-type: application/json" -X GET -d '{"resource":"/path/to/my/file1"}' http://localhost:8888
{"locked":true,"user_id":"user1"}

# ロックの開放
$ curl -H "Accept: application/json" -H "Content-type: application/json" -X DELETE -d '{"resource":"/path/to/my/file1","user_id":"user1"}' http://localhost:8888
{"succeeded":true}

# 開放できていることの確認
$ curl -H "Accept: application/json" -H "Content-type: application/json" -X GET -d '{"resource":"/path/to/my/file1"}' http://localhost:8888
{"locked":false,"user_id":""}


簡単な解説

    // lock resource if unlocked

key, val := req.Resource, req.UserID
ctx, cancel := context.WithTimeout(context.Background(), timeout)
txnRes, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
Then(clientv3.OpPut(key, val)).
Commit()
cancel()

// create result
res := struct {
Succeeded bool `json:"succeeded"`
}{
Succeeded: txnRes.Succeeded,
}


  • トランザクション処理の部分が肝なのでそこだけ

  • トランザクション処理は (*clientv3.Client).Txn で開始


  • Txn.If でトランザクションの実行条件を指定


    • 上のコードの clientv3.Compare(clientv3.CreateRevision(key), "=", 0) は作成リビジョンが 0 の場合ということでキーが未登録の場合に成功処理を実行する




  • Txn.Then で実行条件が真の場合の処理を指定


    • 上のコードでは clientv3.OpPut(key, val) ということで、条件とあわせると「キーが未登録ならバリューをセットする」という処理になる




  • Txn.Else で実行条件が偽の場合の処理を指定


    • 上のコードでは必要がないので記載していない




  • Txn.Commit で一連の処理を実行する


  • github.com/coreos/etcd/clientv3/concurrency パッケージを使うと口座間の預金の振込みのようなさらに複雑なトランザクション処理もできるということだがこちらはまだ試していない