1. 概要
この記事では、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿します。
2. はじめに
今回の記事では、前回の記事 “IPv4 マルチキャストによるフェイルオーバー” を応用した、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿します。
これは、ネットワークグループ内の個々のコンピュータをマルチキャストにより相互監視する事で、ネットワークグループへの参加時には、参加したコンピュータを自動認識し、マスター障害時には合議制によるフェイルオーバーを実現する自律ネットワークシステム構築のためのマイクロサンプルコードです。
3. 環境
- RHEL-7 系
- Go 1.9
4. 構成
- master x 1
- MIP: 192.0.2.1
- VIP: 192.0.2.9
- backup x 3
- MIP: 192.0.2.2
- MIP: 192.0.2.3
- MIP: 192.0.2.4
5. 設計
- 個ノード : 個々のノードがネットワークグループへ参加
- 全ノード : ネットワークグループ情報を更新
- 全ノード : 相互ノードのハートビートを送受信
- 全ノード : 一定間隔で相互ノードのハートビートを監視
- マスター : ダウン
- バックアップ: マスターノードからのハートビート不達を検知
- バックアップ: 自身をマスター候補としてネットワークグループへ立候補
- バックアップ: マスター候補をネットワークグループへ投票(自他薦含む)
- バックアップ: 過半数以上の合議でマスター候補を確定
- マスター候補: 一定回数のリトライ監視超過でマスターノードダウン判定
- マスター候補: VIP の奪取と、全ノード ARP テーブルへ VIP の更新をブロードキャスト
- 全ノード : ネットワークグループ情報を更新
6. Golang サンプルコード
@ master, backup1, 2, 3
$ sudo nvim ~/go/autonomous_network_multicast_ipv4.go
package main
import (
"fmt"
"net"
"os/exec"
"strings"
"sync"
"time"
)
func main() {
mcast_ip4 := "224.0.0.1"
mcast_port := ":56789"
mcast_addr := mcast_ip4 + mcast_port
pulse_interval := 1
pulse_retry := 5
mip := ""
mip_cidr := ""
mip_mask := ""
mip_port := ""
mip_addr := ""
vip4 := "192.0.2.9"
vip4_cidr := "/24"
vip4_mask := vip4 + vip4_cidr
vip4_if := "eth0"
master_ip := ""
master_port := ""
master_addr := ""
state := ""
nodes := make(map[string]map[string]string)
pulses := make(map[string]map[string]int64)
standings := make(map[string]bool)
votings := make(map[string]int)
sender_len := 0
standing_message := ""
voting_message := ""
nominated_message := ""
// Get node state
own_ip_masks, err := net.InterfaceAddrs()
_Error(err)
state = "backup"
for _, own_ip_mask := range own_ip_masks {
if own_ip_mask.String() != vip4_mask {
continue
}
state = "master"
break
}
fmt.Printf("State: %s\n", state)
// Start pulse sender
fmt.Printf("Start pulse sender to: %s\n", mcast_addr)
connector, err := net.Dial("udp", mcast_addr)
_Error(err)
defer connector.Close()
go func() {
for {
time.Sleep(time.Duration(pulse_interval) * time.Second)
// Send pulse
connector.Write([]byte(state))
fmt.Printf("Sent pulse: %s\n", state)
}
}()
// Start message receiver
fmt.Printf("Start receiver from: %s\n", mcast_addr)
mcast_byte, err := net.ResolveUDPAddr("udp", mcast_addr)
_Error(err)
listener, err := net.ListenMulticastUDP("udp", nil, mcast_byte)
_Error(err)
defer listener.Close()
buffer := make([]byte, 64)
go func() {
for {
// Receive message
length, sender_addr_byte, err := listener.ReadFrom(buffer)
_Error(err)
receive_message := string(buffer[:length])
// Split sender address to ip and port
sender_addr := sender_addr_byte.(*net.UDPAddr).String()
sender_addr_parts := strings.Split(sender_addr, ":")
sender_ip := sender_addr_parts[0]
sender_port := sender_addr_parts[1]
// Detect pulse message
if receive_message == "master" || receive_message == "backup" {
// Build pulse information
pulses[sender_ip] = make(map[string]int64)
pulses[sender_ip]["current"] = time.Now.UnixNano()
// Build nodes information
nodes[sender_ip] = make(map[string]string)
nodes[sender_ip]["state"] = receive_message
fmt.Printf("Received pulse: %s from: %s\n", receive_message, sender_ip)
}
// Build master information
if master_ip == "" {
if receive_message == "master" {
master_ip = sender_ip
master_port = sender_port
master_addr = master_ip + ":" + master_port
}
}
// Build own MIP information
if mip == "" {
for _, own_ip_mask := range own_ip_masks {
own_addr_parts := strings.Split(own_ip_mask.String(), "/")
if own_addr_parts[0] != sender_ip {
continue
}
mip = own_addr_parts[0]
mip_cidr = "/" + own_addr_parts[1]
mip_mask = mip + mip_cidr
mip_port = ":" + sender_port
mip_addr = mip + mip_port
break
}
}
// Receive standing message
_, flag := standings[receive_message]
if 0 == strings.Index(receive_message, "standing,") && !flag {
standings[receive_message] = true
fmt.Printf("Reveived standing message: %s\n", receive_message)
// Send voting message
voting_message = "voting," + sender_ip + "," + receive_message
connector.Write([]byte(voting_message))
fmt.Printf("Sent voting message: %s\n", voting_message)
}
// Receive voting message
if 0 == strings.Index(receive_message, "voting,"+mip+","+standing_message) {
if _, flag := votings[receive_message]; !flag {
votings[receive_message] = 0
}
votings[receive_message]++
nominated_message = receive_message
fmt.Printf("Receive voting message: %s\n", receive_message)
}
}
}()
// Start message checker
for {
// Init waiting group
wg := new(sync.WaitGroup)
// Check nodes count
sender_len = len(pulses)
// To serial processing
for sender, _ := range pulses {
// For parallel processing
_sender := sender
// Add waiting group
wg.Add(1)
// To parallel processing
go func() {
// Defer waiting group
defer wg.Done()
for {
// Check nodes count changing
if sender_len != len(pulses) {
return
}
// Sleep interval seconds
time.Sleep(time.Duration(pulse_interval) * time.Second)
// Detected normal pulse
if pulses[_sender]["prev"] != pulses[_sender]["current"] {
pulses[_sender]["prev"] = pulses[_sender]["current"]
pulses[_sender]["check"] = 0
continue
}
// Detected abnormal pulse
pulses[_sender]["check"]++
// Send master standing message
if pulses[_sender]["check"] == 1 &&
nodes[_sender]["state"] == "master" &&
nodes[mip]["state"] == "backup" {
standing_message = "standing," + _sender
connector.Write([]byte(standing_message))
fmt.Printf("Sent standing message: %v\n", standing_message)
}
// Retry pulse check
if pulses[_sender]["check"] <= int64(pulse_retry) {
fmt.Printf("Retry pulse check: %v\n", pulses[_sender]["check"])
continue
}
// Decided master node down
if nodes[_sender]["state"] == "master" {
nodes[_sender]["state"] = "down"
pulses[_sender]["current"] = 0
fmt.Println("Detected master node down.")
// Case self is backup
if nodes[mip]["state"] == "backup" {
// Promote to master
if votings[nominated_message] >= (len(pulses) / 2) {
// Reasign VIP
fmt.Printf("Reasign Unicast VIP: %s\n", vip4_mask)
err = exec.Command("ip", "-f", "inet", "addr", "add", vip4_mask, "dev", vip4_if).Run()
_Error(err)
// Execute arping
fmt.Println("Replace ARP tables.")
err = exec.Command("arping", "-q", "-U", "-c5", "-w1", vip4, "-I", vip4_if).Run()
_Error(err)
state = "master"
master_ip = ""
fmt.Println("Suceeded failover.")
}
// Case self is master
} else if nodes[mip]["state"] == "master" {
// Unasign VIP
fmt.Printf("unasign Unicast VIP: %s\n", vip4_mask)
err = exec.Command("ip", "-f", "inet", "addr", "delete", vip4_mask, "dev", vip4_if).Run()
_Error(err)
// Execute arping
fmt.Println("Replace ARP tables.")
err = exec.Command("arping", "-q", "-U", "-c5", "-w1", vip4, "-I", vip4_if).Run()
_Error(err)
state = "backup"
master_ip = ""
fmt.Println("Suceeded failover.")
}
// Decided backup node down
} else {
nodes[_sender]["state"] = "down"
pulses[_sender]["current"] = 0
fmt.Println("Detected backup node down.")
}
delete(pulses, _sender)
return
} // End for child process loop
}() // End child processes
} // End for range pulses
// Wait waiting group
wg.Wait()
} // End for pulse checker
}
func _Error(_err error) {
if _err != nil {
panic(_err)
}
}
7. Golang サンプル実行
@ master, bakup1, 2
$ sudo go run ~/go/autonomous_network_multicast_ipv4.go
State: master
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip
@ bakup3
$ sudo go run ~/go/autonomous_network_multicast_ipv4.go
State: backup
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip
@ master
[control]+[c]
State: master
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip
^Csignal: interrupt
@ backup 2
State: backup
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip
Sent standing message: standing,192.0.2.1
Retry pulse check: 1
Reveived standing message: standing,192.0.2.1
Sent voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Retry pulse check: 2
Retry pulse check: 3
Retry pulse check: 4
Retry pulse check: 5
Detected master node down.
Reasign Unicast VIP: 192.0.2.9/24
Replace ARP tables.
Suceeded failover.
Sent pulse: master
Received pulse: master from: 192.0.2.3
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.4
...snip
8. まとめ
今回の記事では、前回の記事 “IPv4 マルチキャストによるフェイルオーバー” を応用した、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿しました。
これは、ネットワークグループ内の個々のコンピュータをマルチキャストにより相互監視する事で、ネットワークグループへの参加時には、参加コンピュータを自動認識し、マスター障害時には合議制によりフェイルオーバーを実現する自律ネットワークシステムの構築のためのマイクロサンプルコードです。
次回以降の記事では、シンプルな同期の仕組みを投稿する予定をしてますが、これらのマイクロサービスを組み合わせることで、とてもシンプルなデータリソースの冗長化構成への応用が可能となります。