Consul用分散環境をDockerComposeで作る
分散環境の用意
分散環境を再現するのが面倒で色々試していたが、色々なプロジェクトでDockerComposeが用意されているので、
自分でも使いたくなり簡単な分散環境用プログラムを作ったので、そのメモ。
Consulへのjoin,leaveを検出したときに、特定の動作をするgoプログラム。
これを拡大すれば、死活監視やノード追加時の初期化処理(デプロイとか)などできる気がする。
Consul
Consulは複数ノードを管理、解決するためのもの。
https://www.consul.io/
DockerCompose
Dockerを複数実行するときに、一括管理できるツール。
一括管理が前提なので本番では使わない。
https://docs.docker.com/compose/overview/
Consulによるメンバー管理
Dockerfile
Consul用のDockerはあるが、練習のために最初から全部作る
公式からconsulをダウンロードする。基本的なコマンドもないので使うものはダウンロードする。
FROM ubuntu:16.10
RUN apt update
RUN apt install -y wget
RUN apt install -y unzip
RUN apt install -y net-tools
RUN apt install -y gawk
WORKDIR /mine
RUN wget https://releases.hashicorp.com/consul/0.6.4/consul_0.6.4_linux_amd64.zip
RUN unzip consul_0.6.4_linux_amd64.zip
docker-compose.yml
Consulにつなげるために、一つだけポートを空けておく。
Consulにjoinするために、自分のipを知りたかったが、
コンテナ内からipを取得する方法がわからなかったので、とりあえずifconfigからとってきた。
また、プログラムを楽に共有するためにマウントしておく。
version: "2"
services:
consul_main:
build: .
ports:
- "8500:8500"
command: bash -c "ifconfig eth0 | grep 172 | awk '{print $$2}'| xargs -i ./consul agent -dev -bind={}"
volumes:
- ./:/code
consul:
build: .
depends_on:
- consul_server
command: bash -c "ifconfig eth0 | grep 172 | awk '{print $$2}'| xargs -i ./consul agent -dev -bind={} -join=consul_main"
#command: bash -c "ifconfig eth0 | grep 172 | awk '{print $$2}'| xargs -i ./consul agent -dev -bind={}"
consul
Consulのクライアントにはwatchコマンドがあるが、apiにはwatchがないので、作る。
watchはBlockingQueryのエイリアスらしいのでBlockingQueryを利用する。
BlockingQueryは、指定したidを超えたら返信してくる。idが変わるタイミングは対象リソースに何かが起きたとき(変更、追加、削除)
(命令伝達の前後関係を把握するために使われるidじゃないかと、妄想している)
コードを全部書くと長いので、端折って書く
func watchNode(client *api.Client, joinFunc func(*api.Node), leaveFunc func(*api.Node)) error {
currentNodes, meta, _ := getNodes(client, 0, 0 * time.Second)
lastIndex := meta.LastIndex
for true {
nodes, meta, _ := getNodes(client, lastIndex, 3 * time.Second)
lastIndex = meta.LastIndex + 1
newNodes, leavedNodes := extractDifferentNodes(nodes, currentNodes)
if len(newNodes) != 0 {
for _, node := range(newNodes) {
joinFunc(node)
}
}
if len(leavedNodes) != 0 {
for _, node := range(leavedNodes) {
leaveFunc(node)
}
}
currentNodes = nodes
}
return nil
}
func getNodes(client *api.Client, index uint64, duration time.Duration) ([]*api.Node, *api.QueryMeta, error){
nodes, meta, _ := client.Catalog().Nodes(&api.QueryOptions{
WaitIndex: index,
WaitTime: duration,
})
return nodes, meta, nil
}
func extractDifferentNodes(currentNodes []*api.Node, oldNodes []*api.Node) ([]*api.Node, []*api.Node) {
currentNodeMap := map[string]*api.Node{}
oldNodeMap := map[string]*api.Node{}
for _, node := range(currentNodes) {
currentNodeMap[node.Node] = node
}
for _, node := range(oldNodes) {
oldNodeMap[node.Node] = node
}
newNodes := []*api.Node{}
leavedNodes := []*api.Node{}
for name, node := range(currentNodeMap) {
if _, ok := oldNodeMap[name]; ok == false {
newNodes = append(newNodes, node)
}
}
for name, node := range(oldNodeMap) {
if _, ok := currentNodeMap[name]; ok == false {
leavedNodes = append(leavedNodes, node)
}
}
return newNodes, leavedNodes
}
func main() {
client, _ := api.NewClient(api.DefaultConfig())
watchNode(
client,
func (node *api.Node) {
fmt.Printf("NEW : %s (%s)\n", node.Node, node.Address)
},
func (node *api.Node) {
fmt.Printf("LEAVE: %s (%s)\n", node.Node, node.Address)
}
)
}