2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

IPv4 マルチキャストによる自律ネットワークシステムの構築

Last updated at Posted at 2019-01-01

1. 概要

この記事では、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿します。

2. はじめに

今回の記事では、前回の記事 “IPv4 マルチキャストによるフェイルオーバー” を応用した、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿します。
これは、ネットワークグループ内の個々のコンピュータをマルチキャストにより相互監視する事で、ネットワークグループへの参加時には、参加したコンピュータを自動認識し、マスター障害時には合議制によるフェイルオーバーを実現する自律ネットワークシステム構築のためのマイクロサンプルコードです。

3. 環境

  • RHEL-7 系
  • Go 1.9

4. 構成

  • master x 1
    • MIP: 192.0.2.1
    • VIP: 192.0.2.9
  • backup x 3
    • MIP: 192.0.2.2
    • MIP: 192.0.2.3
    • MIP: 192.0.2.4

5. 設計

autonomous_network_by_multicast_ipv4.png

  1. 個ノード  : 個々のノードがネットワークグループへ参加
  2. 全ノード  : ネットワークグループ情報を更新
  3. 全ノード  : 相互ノードのハートビートを送受信
  4. 全ノード  : 一定間隔で相互ノードのハートビートを監視
  5. マスター  : ダウン
  6. バックアップ: マスターノードからのハートビート不達を検知
  7. バックアップ: 自身をマスター候補としてネットワークグループへ立候補
  8. バックアップ: マスター候補をネットワークグループへ投票(自他薦含む)
  9. バックアップ: 過半数以上の合議でマスター候補を確定
  10. マスター候補: 一定回数のリトライ監視超過でマスターノードダウン判定
  11. マスター候補: VIP の奪取と、全ノード ARP テーブルへ VIP の更新をブロードキャスト
  12. 全ノード  : ネットワークグループ情報を更新

6. Golang サンプルコード

@ master, backup1, 2, 3
$ sudo nvim ~/go/autonomous_network_multicast_ipv4.go

~/go/autonomous_network_multicast_ipv4.go
package main

import (
	"fmt"
	"net"
	"os/exec"
	"strings"
	"sync"
	"time"
)

func main() {
	mcast_ip4 := "224.0.0.1"
	mcast_port := ":56789"
	mcast_addr := mcast_ip4 + mcast_port
	pulse_interval := 1
	pulse_retry := 5
	mip := ""
	mip_cidr := ""
	mip_mask := ""
	mip_port := ""
	mip_addr := ""
	vip4 := "192.0.2.9"
	vip4_cidr := "/24"
	vip4_mask := vip4 + vip4_cidr
	vip4_if := "eth0"
	master_ip := ""
	master_port := ""
	master_addr := ""
	state := ""
	nodes := make(map[string]map[string]string)
	pulses := make(map[string]map[string]int64)
	standings := make(map[string]bool)
	votings := make(map[string]int)
	sender_len := 0
	standing_message := ""
	voting_message := ""
	nominated_message := ""

	// Get node state
	own_ip_masks, err := net.InterfaceAddrs()
	_Error(err)

	state = "backup"
	for _, own_ip_mask := range own_ip_masks {
		if own_ip_mask.String() != vip4_mask {
			continue
		}
		state = "master"
		break
	}
	fmt.Printf("State: %s\n", state)

	// Start pulse sender
	fmt.Printf("Start pulse sender to: %s\n", mcast_addr)
	connector, err := net.Dial("udp", mcast_addr)
	_Error(err)
	defer connector.Close()

	go func() {
		for {
			time.Sleep(time.Duration(pulse_interval) * time.Second)

			// Send pulse
			connector.Write([]byte(state))
			fmt.Printf("Sent pulse: %s\n", state)
		}
	}()

	// Start message receiver
	fmt.Printf("Start receiver from: %s\n", mcast_addr)
	mcast_byte, err := net.ResolveUDPAddr("udp", mcast_addr)
	_Error(err)

	listener, err := net.ListenMulticastUDP("udp", nil, mcast_byte)
	_Error(err)
	defer listener.Close()

	buffer := make([]byte, 64)
	go func() {
		for {
			// Receive message
			length, sender_addr_byte, err := listener.ReadFrom(buffer)
			_Error(err)

			receive_message := string(buffer[:length])

			// Split sender address to ip and port
			sender_addr := sender_addr_byte.(*net.UDPAddr).String()
			sender_addr_parts := strings.Split(sender_addr, ":")
			sender_ip := sender_addr_parts[0]
			sender_port := sender_addr_parts[1]

			// Detect pulse message
			if receive_message == "master" || receive_message == "backup" {
				// Build pulse information
				pulses[sender_ip] = make(map[string]int64)
				pulses[sender_ip]["current"] = time.Now.UnixNano()

				// Build nodes information
				nodes[sender_ip] = make(map[string]string)
				nodes[sender_ip]["state"] = receive_message

				fmt.Printf("Received pulse: %s from: %s\n", receive_message, sender_ip)
			}

			// Build master information
			if master_ip == "" {
				if receive_message == "master" {
					master_ip = sender_ip
					master_port = sender_port
					master_addr = master_ip + ":" + master_port
				}
			}

			// Build own MIP information
			if mip == "" {
				for _, own_ip_mask := range own_ip_masks {
					own_addr_parts := strings.Split(own_ip_mask.String(), "/")
					if own_addr_parts[0] != sender_ip {
						continue
					}
					mip = own_addr_parts[0]
					mip_cidr = "/" + own_addr_parts[1]
					mip_mask = mip + mip_cidr
					mip_port = ":" + sender_port
					mip_addr = mip + mip_port
					break
				}
			}

			// Receive standing message
			_, flag := standings[receive_message]
			if 0 == strings.Index(receive_message, "standing,") && !flag {
				standings[receive_message] = true
				fmt.Printf("Reveived standing message: %s\n", receive_message)

				// Send voting message
				voting_message = "voting," + sender_ip + "," + receive_message
				connector.Write([]byte(voting_message))
				fmt.Printf("Sent voting message: %s\n", voting_message)
			}

			// Receive voting message
			if 0 == strings.Index(receive_message, "voting,"+mip+","+standing_message) {
				if _, flag := votings[receive_message]; !flag {
					votings[receive_message] = 0
				}
				votings[receive_message]++
				nominated_message = receive_message
				fmt.Printf("Receive voting message: %s\n", receive_message)
			}

		}
	}()

	// Start message checker
	for {
		// Init waiting group
		wg := new(sync.WaitGroup)

		// Check nodes count
		sender_len = len(pulses)

		// To serial processing
		for sender, _ := range pulses {
			// For parallel processing
			_sender := sender

			// Add waiting group
			wg.Add(1)

			// To parallel processing
			go func() {
				// Defer waiting group
				defer wg.Done()

				for {
					// Check nodes count changing
					if sender_len != len(pulses) {
						return
					}

					// Sleep interval seconds
					time.Sleep(time.Duration(pulse_interval) * time.Second)

					// Detected normal pulse
					if pulses[_sender]["prev"] != pulses[_sender]["current"] {
						pulses[_sender]["prev"] = pulses[_sender]["current"]
						pulses[_sender]["check"] = 0
						continue
					}

					// Detected abnormal pulse
					pulses[_sender]["check"]++

					// Send master standing message
					if pulses[_sender]["check"] == 1 &&
						nodes[_sender]["state"] == "master" &&
						nodes[mip]["state"] == "backup" {
						standing_message = "standing," + _sender
						connector.Write([]byte(standing_message))
						fmt.Printf("Sent standing message: %v\n", standing_message)
					}

					// Retry pulse check
					if pulses[_sender]["check"] <= int64(pulse_retry) {
						fmt.Printf("Retry pulse check: %v\n", pulses[_sender]["check"])
						continue
					}

					// Decided master node down
					if nodes[_sender]["state"] == "master" {

						nodes[_sender]["state"] = "down"
						pulses[_sender]["current"] = 0
						fmt.Println("Detected master node down.")

						// Case self is backup
						if nodes[mip]["state"] == "backup" {

							// Promote to master
							if votings[nominated_message] >= (len(pulses) / 2) {

								// Reasign VIP
								fmt.Printf("Reasign Unicast VIP: %s\n", vip4_mask)
								err = exec.Command("ip", "-f", "inet", "addr", "add", vip4_mask, "dev", vip4_if).Run()
								_Error(err)

								// Execute arping
								fmt.Println("Replace ARP tables.")
								err = exec.Command("arping", "-q", "-U", "-c5", "-w1", vip4, "-I", vip4_if).Run()
								_Error(err)

								state = "master"
								master_ip = ""

								fmt.Println("Suceeded failover.")
							}

							// Case self is master
						} else if nodes[mip]["state"] == "master" {

							// Unasign VIP
							fmt.Printf("unasign Unicast VIP: %s\n", vip4_mask)
							err = exec.Command("ip", "-f", "inet", "addr", "delete", vip4_mask, "dev", vip4_if).Run()
							_Error(err)

							// Execute arping
							fmt.Println("Replace ARP tables.")
							err = exec.Command("arping", "-q", "-U", "-c5", "-w1", vip4, "-I", vip4_if).Run()
							_Error(err)

							state = "backup"
							master_ip = ""

							fmt.Println("Suceeded failover.")
						}
						// Decided backup node down
					} else {
						nodes[_sender]["state"] = "down"
						pulses[_sender]["current"] = 0

						fmt.Println("Detected backup node down.")
					}

					delete(pulses, _sender)
					return
				} // End for child process loop
			}() // End child processes
		} // End for range pulses

		// Wait waiting group
		wg.Wait()

	} // End for pulse checker
}

func _Error(_err error) {
	if _err != nil {
		panic(_err)
	}
}

7. Golang サンプル実行

@ master, bakup1, 2
$ sudo go run ~/go/autonomous_network_multicast_ipv4.go

State: master
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip

@ bakup3
$ sudo go run ~/go/autonomous_network_multicast_ipv4.go

State: backup
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip

@ master
[control]+[c]

State: master
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: master
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip
^Csignal: interrupt

@ backup 2

State: backup
Start pulse sender to: 224.0.0.1:56789
Start pulse receiver from: 224.0.0.1:56789
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
...snip
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
Sent pulse: backup
Received pulse: master from: 192.0.2.1
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.3
Received pulse: backup from: 192.0.2.4
...snip
Sent standing message: standing,192.0.2.1
Retry pulse check: 1
Reveived standing message: standing,192.0.2.1
Sent voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Receive voting message: voting,192.0.2.3,standing,192.0.2.1
Retry pulse check: 2
Retry pulse check: 3
Retry pulse check: 4
Retry pulse check: 5
Detected master node down.
Reasign Unicast VIP: 192.0.2.9/24
Replace ARP tables.
Suceeded failover.
Sent pulse: master
Received pulse: master from: 192.0.2.3
Received pulse: backup from: 192.0.2.2
Received pulse: backup from: 192.0.2.4
...snip

8. まとめ

今回の記事では、前回の記事 “IPv4 マルチキャストによるフェイルオーバー” を応用した、IPv4 マルチキャストによる自律ネットワークシステム構築のための Golang マイクロサンプルコードを投稿しました。
これは、ネットワークグループ内の個々のコンピュータをマルチキャストにより相互監視する事で、ネットワークグループへの参加時には、参加コンピュータを自動認識し、マスター障害時には合議制によりフェイルオーバーを実現する自律ネットワークシステムの構築のためのマイクロサンプルコードです。

次回以降の記事では、シンプルな同期の仕組みを投稿する予定をしてますが、これらのマイクロサービスを組み合わせることで、とてもシンプルなデータリソースの冗長化構成への応用が可能となります。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?