1. 概要
この記事では、IPv4 マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 による自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿します。
2. はじめに
自律型の分散システムにおいては、IPv4 や TCP ユニキャストだけでなく、UDP ユニキャストや、マルチキャスト、IPv6 エニーキャストなどを目的に応じて適宜組み込むことによって、より効率的で弾力性の高いシステム設計や、利便性の高いサービスの実現が可能となっていくしょう。
しかし、環境によってはマルチキャストやエニーキャストなどはまだまだ現実的ではない側面があります。
前回の記事では、クラウド環境やグローバル環境におけるマルチキャスト代替策として、“IPv4 によるマルチユニキャスト” を投稿しましたが、今回の記事では更にこれを発展させ、マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 を組み込んだ自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿します。
これにより、通信制限の多い環境の影響を受けず、あるいは非汎用的プロトコルや通信機器などに依存せずに、セッション層の立場からトランスポート層やネットワーク層などの通信技術を擬似的に実現することが可能となります。
また、以前の記事で投稿した “IPv4 マルチキャストによる自律ネットワークシステムの構築” などを組み合わせることにより、より生産性の高い自律型分散ネットワークシステムの構築が可能になりえるかもしれません。
これらのユースケースとしては、ドローン、車などのモビリティ系から、ロボット、スマート家電などの IoT 系、あるいは AI 系、 DB 系、ソーシャル系など、あらゆる用途の自律型分散ネットワークシステムへ応用することが可能となります。
3. 環境
- RHEL-7 系
- Go 1.9
4. 構成
- node x 3
- node1: 192.0.2.1
- node2: 192.0.2.2
- node3: 192.0.2.3
5. 設計
5-1. マルチユニキャスト1
5-2. ニアキャスト2
5-3. グループキャスト3
6. Golang サンプルコード
サンプルコードでは、ほぼ正常系処理のみを実装しています。
実用には、適宜状況に応じて異常系処理を実装する必要があります。
6-1. 自律型分散ネットワークシステム
以下のサンプルコードでは、自 IP アドレスを静的に設定してますが、もし、ノードが 1 つの IP アドレスしか持たない場合には、“6-2. 自 IP アドレスを自動取得” にもサンプルコードを投稿しておきますので、このように自 IP アドレスを自動取得するのも手かもしれません。
また今回、サンプルには投稿しませんが、ノードが複数の IP アドレスを持つ場合でも、デフォルト IP アドレスを取得することは可能です。
@ node1, 2, 3
$ sudo nvim go ~/go/adcast_ipv4.go
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"net"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"time"
)
var DefaultIp string = "192.0.2.1" // node2 は 192.0.2.2、node3 は 192.0.2.3
var DefaultPort string = "56789"
var DefaultAddr string = DefaultIp + ":" + DefaultPort
var ReceptorPort string = "56788"
var ReceptorAddr string = DefaultIp + ":" + ReceptorPort
var ApplyingToIp string = ""
var ApplyingToAddr string = ""
var ApplyingMessage string = "applying"
var EntryPrefix string = "entry:"
var BufferByte int = 1024
var NearcastCheckInterval int = 10
var Addresses AddressList
type AddressList []AddressMaps
type AddressMaps struct {
Address string
Neary float64
Group int
}
func main() {
// Start applying receptor
receptor_addr_byte, err := net.ResolveUDPAddr("udp", ReceptorAddr)
Error(err)
// Listen applying receptor
receptor, err := net.ListenUDP("udp", receptor_addr_byte)
Error(err)
defer receptor.Close()
fmt.Printf("Listened applying receptor *:* > %s\n", ReceptorAddr)
receptor_buffer := make([]byte, BufferByte)
go func() {
for {
// Receive applying message
length, applying_addr_byte, err := receptor.ReadFrom(receptor_buffer)
Error(err)
applying_message := string(receptor_buffer[:length])
applying_addr := applying_addr_byte.(*net.UDPAddr).String()
applying_addr_parts := strings.Split(applying_addr, ":")
entry_ip := applying_addr_parts[0]
entry_addr := entry_ip + ":" + DefaultPort
if applying_message != ApplyingMessage {
continue
}
fmt.Printf("Reveived applying %s > %s as “%s”\n", applying_addr, ReceptorAddr, applying_message)
// Check duplicated addresses
entered_flag := false
for _, addr := range Addresses {
if entry_addr == addr.Address {
entered_flag = true
fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr)
break
}
}
if !entered_flag {
// Append entry address
entry := AddressMaps{Address: entry_addr, Neary: 0, Group: 0}
Addresses = append(Addresses, entry)
fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr)
// Send entry message
addresses_json, err := json.Marshal(Addresses)
Error(err)
entry_message := EntryPrefix + string(addresses_json)
MultiUnicast(entry_message)
}
}
}()
// Apply own address to address group
flag.Parse()
ApplyingToIp = flag.Arg(0)
ApplyingToAddr = ApplyingToIp + ":" + ReceptorPort
Apply()
// Check nearcast
go CheckNearcast()
// Start standard input
go Stdin()
// Start inbound
inbound_to_addr_byte, err := net.ResolveUDPAddr("udp", DefaultAddr)
Error(err)
// Listen inbound
inbound, err := net.ListenUDP("udp", inbound_to_addr_byte)
Error(err)
defer inbound.Close()
fmt.Printf("Listened inbound *:* > %s\n", DefaultAddr)
inbound_buffer := make([]byte, BufferByte)
for {
// Receive inbound message
length, inbound_from_addr, err := inbound.ReadFrom(inbound_buffer)
Error(err)
inbound_message := string(inbound_buffer[:length])
// Receive entry message
if 0 == strings.Index(inbound_message, EntryPrefix) {
fmt.Printf("Reveived entry message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
// Extract entry addresses
entry_messages := strings.Split(inbound_message, EntryPrefix)
entry_addresses_byte := []byte(entry_messages[1])
var entry_addresses AddressList
err := json.Unmarshal(entry_addresses_byte, &entry_addresses)
Error(err)
for _, entry_addr_map := range entry_addresses {
entered_flag := false
for _, addr_map := range Addresses {
if addr_map.Address != entry_addr_map.Address {
continue
}
entered_flag = true
fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr_map.Address)
break
}
if !entered_flag {
entry := AddressMaps{Address: entry_addr_map.Address, Neary: 0, Group: 0}
Addresses = append(Addresses, entry)
fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr_map.Address)
}
}
// Receive nearcast message
} else if 0 == strings.Index(inbound_message, "multi-unicast:") {
fmt.Printf("Reveived multi-unicast message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
// Receive nearcast message
} else if 0 == strings.Index(inbound_message, "nearcast:") {
fmt.Printf("Reveived nearcast message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
// Receive groupcast message
} else if 0 == strings.Index(inbound_message, "groupcast:") {
fmt.Printf("Reveived group message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
// others
} else {
// snip...
}
}
}
// Error
func Error(_err error) {
if _err != nil {
panic(_err)
}
}
// Apply
func Apply() {
applying_to_addr, err := net.ResolveUDPAddr("udp", ApplyingToAddr)
applying, err := net.DialUDP("udp", nil, applying_to_addr)
Error(err)
defer applying.Close()
fmt.Printf("Connected applying > %s\n", ApplyingToAddr)
// Outbound applying message
applying.Write([]byte(ApplyingMessage))
fmt.Printf("Outbound applying > %s as “%s”\n", ApplyingToAddr, ApplyingMessage)
}
// Multi-unicast
func MultiUnicast(_message string) {
for _, addr := range Addresses {
go func(_addr AddressMaps) {
outbound_to_addr, err := net.ResolveUDPAddr("udp", _addr.Address)
multi_unicast, err := net.DialUDP("udp", nil, outbound_to_addr)
Error(err)
defer multi_unicast.Close()
fmt.Printf("Connected multi-unicast > %s\n", outbound_to_addr)
// Multi-unicast message
multi_unicast.Write([]byte(_message))
fmt.Printf("Multi-unicast > %v as “%s”\n", outbound_to_addr, _message)
}(addr)
}
}
// Check nearcast
func CheckNearcast() {
for {
time.Sleep(time.Duration(NearcastCheckInterval) * time.Second)
if 0 == len(Addresses) {
continue
}
for i, addr := range Addresses {
go func(_i int, _addr AddressMaps) {
addr_parts := strings.Split(_addr.Address, ":")
stdout, err := exec.Command("traceroute", "-n", "-T", "-w", "1", "-q", "1", "-p", DefaultPort, addr_parts[0]).Output()
Error(err)
lines := strings.Split(string(stdout), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
line = strings.Replace(line, " ", " ", -1)
if line == "" {
continue
}
fmt.Printf("Check nearcast: %v\n", line)
columns := strings.Split(line, " ")
if columns[1] != addr_parts[0] {
continue
}
Addresses[_i].Neary, err = strconv.ParseFloat(columns[2], 64)
Error(err)
break
}
}(i, addr)
}
}
}
// Nearcast
func Nearcast(_message string) {
// Reslice time for nearcast
times := make([]float64, len(Addresses))
for i, address := range Addresses {
times[i] = address.Neary
}
// Sort time for nearcast
sort.Slice(times, func(i, j int) bool {
return times[i] < times[j]
})
// Extract fastest time for nearcast
nearcast_addr := ""
ForNearcastBreak:
for _, time := range times {
for _, addr_map := range Addresses {
if 1 < len(times) && addr_map.Address == DefaultIp {
continue
}
if time == addr_map.Neary {
nearcast_addr = addr_map.Address
break ForNearcastBreak
}
}
}
if nearcast_addr != "" {
// Connect nearcastic address
outbound_to_addr, err := net.ResolveUDPAddr("udp", nearcast_addr)
nearcast, err := net.DialUDP("udp", nil, outbound_to_addr)
Error(err)
defer nearcast.Close()
fmt.Printf("Connected nearcast address > %v\n", outbound_to_addr)
// Nearcast message
nearcast.Write([]byte(_message))
fmt.Printf("Nearcast > %v as “%s”\n", outbound_to_addr, _message)
} else {
// error
}
}
// Groupcast
func Groupcast(_message string, _group int) {
for _, addr_map := range Addresses {
// Select groupcast
if _group != addr_map.Group {
continue
}
go func(_addr string) {
// Connect groupcast address
outbound_to_addr, err := net.ResolveUDPAddr("udp", _addr)
groupcast, err := net.DialUDP("udp", nil, outbound_to_addr)
Error(err)
defer groupcast.Close()
fmt.Printf("Connected groupcast > %s\n", outbound_to_addr)
// Groupcast message
groupcast.Write([]byte(_message))
fmt.Printf("Groupcast > %v as “%s”\n", outbound_to_addr, _message)
}(addr_map.Address)
}
}
// Stdin
func Stdin() {
for {
//
stdin := bufio.NewScanner(os.Stdin)
for stdin.Scan() {
// Extract command
text := stdin.Text()
text = strings.TrimSpace(text)
text = strings.Replace(text, " ", " ", -1)
args := strings.Split(text, " ")
// Check group command
if args[0] == "group" {
for i, addrmap := range Addresses {
addr_parts := strings.Split(addrmap.Address, ":")
ip_addr := addr_parts[0]
// Check address
if args[2] != ip_addr {
continue
}
// Update groupcast
if args[1] == "update" {
var err error
Addresses[i].Group, err = strconv.Atoi(args[3])
Error(err)
fmt.Printf("Update group %s to %s\n", ip_addr, args[3])
break
// Empty groupcast
} else if args[1] == "empty" {
Addresses[i].Group = 0
fmt.Printf("Update empty %s to 0\n", ip_addr)
break
}
}
// Multi-unicast
} else if args[0] == "multi-unicast" {
// Multi-unicast message
MultiUnicast("multi-unicast:" + args[1])
// Nearcast
} else if args[0] == "nearcast" {
// Nearcast message
Nearcast("nearcast:" + args[1])
// Groupcast
} else if args[0] == "groupcast" {
group, err := strconv.Atoi(args[2])
Error(err)
// Groupcast message
Groupcast("groupcast:"+args[1], group)
}
}
}
}
6-2. 自 IP アドレスを自動取得
自 IP アドレスを自動取得するサンプルコード
$ sudo gu run ~/go/ipv4_list.go
package main
import (
"fmt"
"net"
)
func main() {
addresses, err := net.InterfaceAddrs()
Error(err)
for _, address := range addresses {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
fmt.Printf("%s\n", ipnet.IP.String())
}
}
}
}
func Error(_err error) {
if _err != nil {
panic(_err)
}
}
7. Golang サンプル実行
7-1. 起動
1 台目のノードを起動します。
@ node1
$ sudo go run ~/go/adcast_ipv4.go 192.0.2.1
[user@node1 ~/go]$ go run ipv4.go 192.0.2.1
Listened applying receptor *:* > 192.0.2.1:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.1:56789
Reveived applying 192.0.2.1:40790 > 192.0.2.1:56788 as “applying”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Connected multi-unicast > 192.0.2.1:56789
Multi-unicast > 192.0.2.1:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0}]”
Reveived entry message 192.0.2.1:48914 > 192.0.2.1:56789 as “entry:[{"Address":"192.0.2.1.1:56789","Neary":0,"Group":0}]”
Duplicated entry address in multi-unicast addresses: 192.0.2.1:56789
7-2. 申請、参加
自律型分散ネットワークシステムへ申請・参加します。
このとき、コマンド引数に指定する IP アドレスは、自律型分散ネットワークシステムに参加済みノードの IP アドレスであれば何でも構いません。
@ node2
$ sugo go run ~/go/adcast_ipv4.go 192.0.2.1
[user@node2 ~/go]$ go run adcast_ipv4.go 192.0.2.1
Listened receptor *:* > 192.0.2.2:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.2:56789
Reveived entry message 192.0.2.1:49277 > 192.0.2.2:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0},{"Address":"192.0.2.2:56789","Neary":0,"Group":0}]”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Appended entry address into multi-unicast addresses: 192.0.2.2:56789
@ node3
$ sugo go run ~/go/adcast_ipv4.go 192.0.2.1
[user@node3 ~/go]$ go run adcast_ipv4.go 192.0.2.3
Listened receptor *:* > 192.0.2.3:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.3:56789
Reveived entry message 192.0.2.1:49277 > 192.0.2.3:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0},{"Address":"192.0.2.2:56789","Neary":0,"Group":0},{"Address":"192.0.2.3:56789","Neary":0,"Group":0}]”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Appended entry address into multi-unicast addresses: 192.0.2.2:56789
Appended entry address into multi-unicast addresses: 192.0.2.3:56789
7-3. ニアキャストアドレス自動チェック
ニアキャスト2 を実現するために、各ノードとの距離(レスポンス時間)を予め取得しておきます。
ファイアウォール等で ICMP が drop されていたり、echo reply を切っていたり、UDP ソケットが解放されていない場合を考慮し、サンプルコードでは TCP 56789 ポートへダイレクトに traceroute を発行しています。
このへんは環境によって、適宜最適な内容に変更が必要でしょう。
コマンド例
$ sudo traceroute -n -T -w 1 -q 1 -p 56789 192.0.2.2
レスポンス例
traceroute to 192.0.2.2 (192.0.2.2), 30 hops max, 60 byte packets
1 192.0.2.100 0.123 ms // <- Gateway
2 192.0.2.2 1.453 ms // <- Direct traceroute
サンプルコードでは、10 秒ごとに各ノードへダイレクト traceroute を発行し、その所要時間を更新しています。
@ node1 (node2, 3)
Check nearcast time: traceroute to 192.0.2.1 (192.0.2.1), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.1 0.017 ms
Check nearcast time: traceroute to 192.0.2.2 (192.0.2.2), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.2 0.111 ms
Check nearcast time: traceroute to 192.0.2.3 (192.0.2.3), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.3 0.113 ms
7-4. マルチユニキャスト1
UDP のマルチキャストに似た概念です。
自律型分散ネットワークシステムの全ノードへまとめてデータを転送します。
これを更に応用すれば、TCP でもマルチキャストに似た処理が可能となりえます。
@ node1
multi-unicast TEST1 // <- 標準入力
Connected multi-unicast > 192.0.2.1:56789
Multi-unicast > 192.0.2.1:56789 as “multi-unicast:TEST1”
Connected multi-unicast > 192.0.2.2:56789
Multi-unicast > 192.0.2.2:56789 as “multi-unicast:TEST1”
Connected multi-unicast > 192.0.2.3:56789
Multi-unicast > 192.0.2.3:56789 as “multi-unicast:TEST1”
Reveived multi-unicast message 192.0.2.1:34105 > 192.0.2.1:56789 as “multi-unicast:TEST1”
7-5. ニアキャスト2
IPv6 のエニーキャストに似た概念です。
一番近い(レスポンスの早い)ノードを自動検出してそのノードへデータを転送します。
これにより、不特定の 1 台のノードへデータを転送することが IPv4 通信で可能になります。
その結果、マルチマスター構成や、マスターノードを意識させないアプリケーションの構築が実現可能になるかもしれません。
設計によっては、この転送先ノードにサーバントの役割を担わせるなど、いろいろ応用は可能でしょう。
@ node1
nearcast TEST2 // <- 標準入力
Connected nearcast address > 192.0.2.2:56789
Nearcast > 192.0.2.2:56789 as “nearcast:TEST2”
@ node2
Reveived nearcast message 192.0.2.1:51569 > 192.0.2.2:56789 as “nearcast:TEST2”
@ node3
7-6. グループキャスト3
任意のノード群をグルーピングし、グループへまとめてデータを転送します。
例えば、マルチユニキャストが非効率な場合や、情報をグループ単位でサイロ化したい場合などに有用です。
まずはじめに、任意のノード群をグルーピングしています。
(削除の場合は group remove コマンドとなります。)
@ node1, 2, 3
group update 192.0.2.1 1 // <- 標準入力
Update group 192.0.2.1 to 1
group update 192.0.2.2 2 // <- 標準入力
Update group 192.0.2.2 to 2
group update 192.0.2.3 1 // <- 標準入力
Update group 192.0.2.3 to 1
任意のグループへまとめてデータを送信します。
以下の実行例では、グループ 1 に対して TEST3 というデータを送信しています。
@ node1
groupcast TEST3 1 // <- 標準入力
Connected groupcast address > 192.0.2.1:56789
Groupcast > 192.0.2.1:56789 as “groupcast:TEST3”
Connected groupcast address > 192.0.2.3:56789
Groupcast > 192.0.2.3:56789 as “groupcast:TEST3”
@ node2
@ node3
Reveived groupcast message 192.0.2.1:51569 > 192.0.2.3:56789 as “groupcast:TEST3”
8. まとめ
前回の記事では、クラウド環境やグローバル環境対策として、“IPv4 によるマルチユニキャスト” を投稿しましたが、今回の記事では、更に応用し、マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 を組み込んだ自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿しました。
これらのユースケースとしては、ドローン、車などのモビリティ系から、ロボット、スマート家電などの IoT 系、あるいは AI 系、 DB 系、ソーシャル系など、あらゆる用途の自律型分散ネットワークシステムへ応用することが可能です。
また、以前の記事で投稿した “IPv4 マルチキャストによる自律ネットワークシステムの構築” などを組み合わせることにより、より生産性の高い自律型分散ネットワークシステムの構築が可能になりえるかもしれません。
-
マルチユニキャスト(v-multicast / Virtual Multicast でもいいかもしれません。):独自定義です。UDP の IP マルチキャストに似た概念です。全参加ノードへまとめてデータを転送します。これを更に応用すれば、TCP でもマルチキャストに似た処理が可能となりえます。 ↩
-
ニアキャスト(v-anycast / Virtual Anycast でもいいかもしれません。):独自定義です。IPv6 のエニーキャストに似た概念です。一番近い(レスポンスの早い)ノードを自動検出してそのノードへデータを転送します。これにより、不特定の 1 台のノードへデータを転送することが IPv4 通信で可能になります。その結果、マルチマスター構成や、マスターノードを意識させないアプリケーションの構築が実現可能になるかもしれません。設計によっては、この転送先ノードにサーバントの役割を担わせるなど、いろいろ応用は可能でしょう。 ↩
-
グループキャスト(※ NTT ドコモさんの昔の同名サービスとは一切関係ありません。):独自定義です。任意のノード群をグルーピングし、グループへまとめてデータを転送します。例えば、マルチユニキャストが非効率な場合や、情報をグループ単位でサイロ化したい場合などに有用です。 ↩