LoginSignup
1
1

More than 5 years have passed since last update.

IPv4 マルチユニキャスト、ニアキャスト、グループキャストによる自律型分散ネットワークシステムの構築

Last updated at Posted at 2019-01-11

1. 概要

この記事では、IPv4 マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 による自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿します。

2. はじめに

自律型の分散システムにおいては、IPv4 や TCP ユニキャストだけでなく、UDP ユニキャストや、マルチキャスト、IPv6 エニーキャストなどを目的に応じて適宜組み込むことによって、より効率的で弾力性の高いシステム設計や、利便性の高いサービスの実現が可能となっていくしょう。
しかし、環境によってはマルチキャストやエニーキャストなどはまだまだ現実的ではない側面があります。
前回の記事では、クラウド環境やグローバル環境におけるマルチキャスト代替策として、“IPv4 によるマルチユニキャスト” を投稿しましたが、今回の記事では更にこれを発展させ、マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 を組み込んだ自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿します。
これにより、通信制限の多い環境の影響を受けず、あるいは非汎用的プロトコルや通信機器などに依存せずに、セッション層の立場からトランスポート層やネットワーク層などの通信技術を擬似的に実現することが可能となります。
また、以前の記事で投稿した “IPv4 マルチキャストによる自律ネットワークシステムの構築” などを組み合わせることにより、より生産性の高い自律型分散ネットワークシステムの構築が可能になりえるかもしれません。

これらのユースケースとしては、ドローンなどのモビリティ系から、ロボットスマート家電などの IoT 系、あるいは AI 系DB 系ソーシャル系など、あらゆる用途の自律型分散ネットワークシステムへ応用することが可能となります。

adcast.png

3. 環境

  • RHEL-7 系
  • Go 1.9

4. 構成

  • node x 3
    • node1: 192.0.2.1
    • node2: 192.0.2.2
    • node3: 192.0.2.3

5. 設計

5-1. マルチユニキャスト1

multi-unicast_2.png

5-2. ニアキャスト2

nearcast.png

5-3. グループキャスト3

groupcast.png

6. Golang サンプルコード

サンプルコードでは、ほぼ正常系処理のみを実装しています。
実用には、適宜状況に応じて異常系処理を実装する必要があります。

6-1. 自律型分散ネットワークシステム

以下のサンプルコードでは、自 IP アドレスを静的に設定してますが、もし、ノードが 1 つの IP アドレスしか持たない場合には、“6-2. 自 IP アドレスを自動取得” にもサンプルコードを投稿しておきますので、このように自 IP アドレスを自動取得するのも手かもしれません。
また今回、サンプルには投稿しませんが、ノードが複数の IP アドレスを持つ場合でも、デフォルト IP アドレスを取得することは可能です。

@ node1, 2, 3

$ sudo nvim go ~/go/adcast_ipv4.go

~/go/adcast_ipv4.go
package main

import (
    "bufio"
    "encoding/json"
    "flag"
    "fmt"
    "net"
    "os"
    "os/exec"
    "sort"
    "strconv"
    "strings"
    "time"
)

var DefaultIp string = "192.0.2.1" // node2 は 192.0.2.2、node3 は 192.0.2.3
var DefaultPort string = "56789"
var DefaultAddr string = DefaultIp + ":" + DefaultPort
var ReceptorPort string = "56788"
var ReceptorAddr string = DefaultIp + ":" + ReceptorPort
var ApplyingToIp string = ""
var ApplyingToAddr string = ""
var ApplyingMessage string = "applying"
var EntryPrefix string = "entry:"
var BufferByte int = 1024
var NearcastCheckInterval int = 10
var Addresses AddressList

type AddressList []AddressMaps
type AddressMaps struct {
    Address string
    Neary   float64
    Group   int
}

func main() {

    // Start applying receptor
    receptor_addr_byte, err := net.ResolveUDPAddr("udp", ReceptorAddr)
    Error(err)

    // Listen applying receptor
    receptor, err := net.ListenUDP("udp", receptor_addr_byte)
    Error(err)
    defer receptor.Close()
    fmt.Printf("Listened applying receptor *:* > %s\n", ReceptorAddr)

    receptor_buffer := make([]byte, BufferByte)
    go func() {
        for {
            // Receive applying message
            length, applying_addr_byte, err := receptor.ReadFrom(receptor_buffer)
            Error(err)
            applying_message := string(receptor_buffer[:length])
            applying_addr := applying_addr_byte.(*net.UDPAddr).String()

            applying_addr_parts := strings.Split(applying_addr, ":")
            entry_ip := applying_addr_parts[0]
            entry_addr := entry_ip + ":" + DefaultPort

            if applying_message != ApplyingMessage {
                continue
            }
            fmt.Printf("Reveived applying %s > %s as “%s”\n", applying_addr, ReceptorAddr, applying_message)

            // Check duplicated addresses
            entered_flag := false
            for _, addr := range Addresses {
                if entry_addr == addr.Address {
                    entered_flag = true
                    fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr)
                    break
                }
            }

            if !entered_flag {
                // Append entry address
                entry := AddressMaps{Address: entry_addr, Neary: 0, Group: 0}
                Addresses = append(Addresses, entry)
                fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr)

                // Send entry message
                addresses_json, err := json.Marshal(Addresses)
                Error(err)
                entry_message := EntryPrefix + string(addresses_json)
                MultiUnicast(entry_message)
            }
        }
    }()

    // Apply own address to address group
    flag.Parse()
    ApplyingToIp = flag.Arg(0)
    ApplyingToAddr = ApplyingToIp + ":" + ReceptorPort
    Apply()

    // Check nearcast
    go CheckNearcast()

    // Start standard input
    go Stdin()

    // Start inbound
    inbound_to_addr_byte, err := net.ResolveUDPAddr("udp", DefaultAddr)
    Error(err)

    // Listen inbound
    inbound, err := net.ListenUDP("udp", inbound_to_addr_byte)
    Error(err)
    defer inbound.Close()
    fmt.Printf("Listened inbound *:* > %s\n", DefaultAddr)

    inbound_buffer := make([]byte, BufferByte)
    for {
        // Receive inbound message
        length, inbound_from_addr, err := inbound.ReadFrom(inbound_buffer)
        Error(err)
        inbound_message := string(inbound_buffer[:length])

        // Receive entry message
        if 0 == strings.Index(inbound_message, EntryPrefix) {
            fmt.Printf("Reveived entry message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)

            // Extract entry addresses
            entry_messages := strings.Split(inbound_message, EntryPrefix)
            entry_addresses_byte := []byte(entry_messages[1])
            var entry_addresses AddressList
            err := json.Unmarshal(entry_addresses_byte, &entry_addresses)
            Error(err)

            for _, entry_addr_map := range entry_addresses {
                entered_flag := false
                for _, addr_map := range Addresses {
                    if addr_map.Address != entry_addr_map.Address {
                        continue
                    }

                    entered_flag = true
                    fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr_map.Address)
                    break
                }

                if !entered_flag {
                    entry := AddressMaps{Address: entry_addr_map.Address, Neary: 0, Group: 0}
                    Addresses = append(Addresses, entry)
                    fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr_map.Address)
                }
            }
            // Receive nearcast message
        } else if 0 == strings.Index(inbound_message, "multi-unicast:") {
            fmt.Printf("Reveived multi-unicast message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
            // Receive nearcast message
        } else if 0 == strings.Index(inbound_message, "nearcast:") {
            fmt.Printf("Reveived nearcast message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
            // Receive groupcast message
        } else if 0 == strings.Index(inbound_message, "groupcast:") {
            fmt.Printf("Reveived group message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message)
            // others
        } else {
            // snip...
        }
    }
}

// Error
func Error(_err error) {
    if _err != nil {
        panic(_err)
    }
}

// Apply
func Apply() {
    applying_to_addr, err := net.ResolveUDPAddr("udp", ApplyingToAddr)
    applying, err := net.DialUDP("udp", nil, applying_to_addr)
    Error(err)
    defer applying.Close()
    fmt.Printf("Connected applying > %s\n", ApplyingToAddr)

    // Outbound applying message
    applying.Write([]byte(ApplyingMessage))
    fmt.Printf("Outbound applying > %s as “%s”\n", ApplyingToAddr, ApplyingMessage)
}

// Multi-unicast
func MultiUnicast(_message string) {
    for _, addr := range Addresses {

        go func(_addr AddressMaps) {
            outbound_to_addr, err := net.ResolveUDPAddr("udp", _addr.Address)
            multi_unicast, err := net.DialUDP("udp", nil, outbound_to_addr)
            Error(err)

            defer multi_unicast.Close()
            fmt.Printf("Connected multi-unicast > %s\n", outbound_to_addr)

            // Multi-unicast message
            multi_unicast.Write([]byte(_message))
            fmt.Printf("Multi-unicast > %v as “%s”\n", outbound_to_addr, _message)
        }(addr)
    }
}

// Check nearcast
func CheckNearcast() {
    for {
        time.Sleep(time.Duration(NearcastCheckInterval) * time.Second)

        if 0 == len(Addresses) {
            continue
        }

        for i, addr := range Addresses {
            go func(_i int, _addr AddressMaps) {
                addr_parts := strings.Split(_addr.Address, ":")
                stdout, err := exec.Command("traceroute", "-n", "-T", "-w", "1", "-q", "1", "-p", DefaultPort, addr_parts[0]).Output()
                Error(err)

                lines := strings.Split(string(stdout), "\n")
                for _, line := range lines {
                    line = strings.TrimSpace(line)
                    line = strings.Replace(line, "  ", " ", -1)
                    if line == "" {
                        continue
                    }
                    fmt.Printf("Check nearcast: %v\n", line)

                    columns := strings.Split(line, " ")
                    if columns[1] != addr_parts[0] {
                        continue
                    }

                    Addresses[_i].Neary, err = strconv.ParseFloat(columns[2], 64)
                    Error(err)
                    break
                }
            }(i, addr)
        }
    }
}

// Nearcast
func Nearcast(_message string) {
    // Reslice time for nearcast
    times := make([]float64, len(Addresses))
    for i, address := range Addresses {
        times[i] = address.Neary
    }

    // Sort time for nearcast
    sort.Slice(times, func(i, j int) bool {
        return times[i] < times[j]
    })

    // Extract fastest time for nearcast
    nearcast_addr := ""
ForNearcastBreak:
    for _, time := range times {
        for _, addr_map := range Addresses {
            if 1 < len(times) && addr_map.Address == DefaultIp {
                continue
            }

            if time == addr_map.Neary {
                nearcast_addr = addr_map.Address
                break ForNearcastBreak
            }
        }
    }

    if nearcast_addr != "" {
        // Connect nearcastic address
        outbound_to_addr, err := net.ResolveUDPAddr("udp", nearcast_addr)
        nearcast, err := net.DialUDP("udp", nil, outbound_to_addr)
        Error(err)

        defer nearcast.Close()
        fmt.Printf("Connected nearcast address > %v\n", outbound_to_addr)

        // Nearcast message
        nearcast.Write([]byte(_message))
        fmt.Printf("Nearcast > %v as “%s”\n", outbound_to_addr, _message)
    } else {
        // error
    }
}

// Groupcast
func Groupcast(_message string, _group int) {
    for _, addr_map := range Addresses {
        // Select groupcast
        if _group != addr_map.Group {
            continue
        }

        go func(_addr string) {
            // Connect groupcast address
            outbound_to_addr, err := net.ResolveUDPAddr("udp", _addr)
            groupcast, err := net.DialUDP("udp", nil, outbound_to_addr)
            Error(err)

            defer groupcast.Close()
            fmt.Printf("Connected groupcast > %s\n", outbound_to_addr)

            // Groupcast message
            groupcast.Write([]byte(_message))
            fmt.Printf("Groupcast > %v as “%s”\n", outbound_to_addr, _message)
        }(addr_map.Address)
    }
}

// Stdin
func Stdin() {
    for {
        //
        stdin := bufio.NewScanner(os.Stdin)
        for stdin.Scan() {
            // Extract command
            text := stdin.Text()
            text = strings.TrimSpace(text)
            text = strings.Replace(text, "  ", " ", -1)
            args := strings.Split(text, " ")

            // Check group command
            if args[0] == "group" {
                for i, addrmap := range Addresses {
                    addr_parts := strings.Split(addrmap.Address, ":")
                    ip_addr := addr_parts[0]

                    // Check address
                    if args[2] != ip_addr {
                        continue
                    }

                    // Update groupcast
                    if args[1] == "update" {
                        var err error
                        Addresses[i].Group, err = strconv.Atoi(args[3])
                        Error(err)
                        fmt.Printf("Update group %s to %s\n", ip_addr, args[3])
                        break
                        // Empty groupcast
                    } else if args[1] == "empty" {
                        Addresses[i].Group = 0
                        fmt.Printf("Update empty %s to 0\n", ip_addr)
                        break
                    }
                }
                // Multi-unicast
            } else if args[0] == "multi-unicast" {
                // Multi-unicast message
                MultiUnicast("multi-unicast:" + args[1])
                // Nearcast
            } else if args[0] == "nearcast" {
                // Nearcast message
                Nearcast("nearcast:" + args[1])
                // Groupcast
            } else if args[0] == "groupcast" {
                group, err := strconv.Atoi(args[2])
                Error(err)
                // Groupcast message
                Groupcast("groupcast:"+args[1], group)
            }
        }
    }
}

6-2. 自 IP アドレスを自動取得

自 IP アドレスを自動取得するサンプルコード

$ sudo gu run ~/go/ipv4_list.go

~/go/ipv4_list.go
package main

import (
    "fmt"
    "net"
)

func main() {
    addresses, err := net.InterfaceAddrs()
    Error(err)

    for _, address := range addresses {
        if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
            if ipnet.IP.To4() != nil {
                fmt.Printf("%s\n", ipnet.IP.String())
            }
        }
    }
}

func Error(_err error) {
    if _err != nil {
        panic(_err)
    }
}

7. Golang サンプル実行

7-1. 起動

1 台目のノードを起動します。

@ node1
$ sudo go run ~/go/adcast_ipv4.go 192.0.2.1

[user@node1 ~/go]$ go run ipv4.go 192.0.2.1
Listened applying receptor *:* > 192.0.2.1:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.1:56789
Reveived applying 192.0.2.1:40790 > 192.0.2.1:56788 as “applying”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Connected multi-unicast > 192.0.2.1:56789
Multi-unicast > 192.0.2.1:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0}]”
Reveived entry message 192.0.2.1:48914 > 192.0.2.1:56789 as “entry:[{"Address":"192.0.2.1.1:56789","Neary":0,"Group":0}]”
Duplicated entry address in multi-unicast addresses: 192.0.2.1:56789

7-2. 申請、参加

自律型分散ネットワークシステムへ申請・参加します。
このとき、コマンド引数に指定する IP アドレスは、自律型分散ネットワークシステムに参加済みノードの IP アドレスであれば何でも構いません。

@ node2
$ sugo go run ~/go/adcast_ipv4.go 192.0.2.1

[user@node2 ~/go]$ go run adcast_ipv4.go 192.0.2.1
Listened receptor *:* > 192.0.2.2:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.2:56789
Reveived entry message 192.0.2.1:49277 > 192.0.2.2:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0},{"Address":"192.0.2.2:56789","Neary":0,"Group":0}]”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Appended entry address into multi-unicast addresses: 192.0.2.2:56789

@ node3
$ sugo go run ~/go/adcast_ipv4.go 192.0.2.1

[user@node3 ~/go]$ go run adcast_ipv4.go 192.0.2.3
Listened receptor *:* > 192.0.2.3:56788
Connected applying > 192.0.2.1:56788
Outbound applying > 192.0.2.1:56788 as “applying”
Listened inbound *:* > 192.0.2.3:56789
Reveived entry message 192.0.2.1:49277 > 192.0.2.3:56789 as “entry:[{"Address":"192.0.2.1:56789","Neary":0,"Group":0},{"Address":"192.0.2.2:56789","Neary":0,"Group":0},{"Address":"192.0.2.3:56789","Neary":0,"Group":0}]”
Appended entry address into multi-unicast addresses: 192.0.2.1:56789
Appended entry address into multi-unicast addresses: 192.0.2.2:56789
Appended entry address into multi-unicast addresses: 192.0.2.3:56789

7-3. ニアキャストアドレス自動チェック

ニアキャスト2 を実現するために、各ノードとの距離(レスポンス時間)を予め取得しておきます。
ファイアウォール等で ICMP が drop されていたり、echo reply を切っていたり、UDP ソケットが解放されていない場合を考慮し、サンプルコードでは TCP 56789 ポートへダイレクトに traceroute を発行しています。
このへんは環境によって、適宜最適な内容に変更が必要でしょう。

コマンド例
$ sudo traceroute -n -T -w 1 -q 1 -p 56789 192.0.2.2

レスポンス例

traceroute to 192.0.2.2 (192.0.2.2), 30 hops max, 60 byte packets
 1  192.0.2.100  0.123 ms // <- Gateway
 2  192.0.2.2  1.453 ms // <- Direct traceroute

サンプルコードでは、10 秒ごとに各ノードへダイレクト traceroute を発行し、その所要時間を更新しています。

@ node1 (node2, 3)

Check nearcast time: traceroute to 192.0.2.1 (192.0.2.1), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.1 0.017 ms
Check nearcast time: traceroute to 192.0.2.2 (192.0.2.2), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.2 0.111 ms
Check nearcast time: traceroute to 192.0.2.3 (192.0.2.3), 30 hops max, 60 byte packets
Check nearcast time: 1 192.0.2.3 0.113 ms

7-4. マルチユニキャスト1

UDP のマルチキャストに似た概念です。
自律型分散ネットワークシステムの全ノードへまとめてデータを転送します。
これを更に応用すれば、TCP でもマルチキャストに似た処理が可能となりえます。

@ node1

multi-unicast TEST1 // <- 標準入力
Connected multi-unicast > 192.0.2.1:56789
Multi-unicast > 192.0.2.1:56789 as “multi-unicast:TEST1”
Connected multi-unicast > 192.0.2.2:56789
Multi-unicast > 192.0.2.2:56789 as “multi-unicast:TEST1”
Connected multi-unicast > 192.0.2.3:56789
Multi-unicast > 192.0.2.3:56789 as “multi-unicast:TEST1”
Reveived multi-unicast message 192.0.2.1:34105 > 192.0.2.1:56789 as “multi-unicast:TEST1”

7-5. ニアキャスト2

IPv6 のエニーキャストに似た概念です。
一番近い(レスポンスの早い)ノードを自動検出してそのノードへデータを転送します。
これにより、不特定の 1 台のノードへデータを転送することが IPv4 通信で可能になります。
その結果、マルチマスター構成や、マスターノードを意識させないアプリケーションの構築が実現可能になるかもしれません。
設計によっては、この転送先ノードにサーバントの役割を担わせるなど、いろいろ応用は可能でしょう。

@ node1

nearcast TEST2 // <- 標準入力
Connected nearcast address >  192.0.2.2:56789
Nearcast >  192.0.2.2:56789 as “nearcast:TEST2”

@ node2

Reveived nearcast message 192.0.2.1:51569 > 192.0.2.2:56789 as “nearcast:TEST2”

@ node3

7-6. グループキャスト3

任意のノード群をグルーピングし、グループへまとめてデータを転送します。
例えば、マルチユニキャストが非効率な場合や、情報をグループ単位でサイロ化したい場合などに有用です。

まずはじめに、任意のノード群をグルーピングしています。
(削除の場合は group remove コマンドとなります。)

@ node1, 2, 3

group update 192.0.2.1 1 // <- 標準入力
Update group 192.0.2.1 to 1
group update 192.0.2.2 2 // <- 標準入力
Update group 192.0.2.2 to 2
group update 192.0.2.3 1 // <- 標準入力
Update group 192.0.2.3 to 1

任意のグループへまとめてデータを送信します。
以下の実行例では、グループ 1 に対して TEST3 というデータを送信しています。

@ node1

groupcast TEST3 1 // <- 標準入力
Connected groupcast address >  192.0.2.1:56789
Groupcast >  192.0.2.1:56789 as “groupcast:TEST3”
Connected groupcast address >  192.0.2.3:56789
Groupcast >  192.0.2.3:56789 as “groupcast:TEST3”

@ node2

@ node3

Reveived groupcast message 192.0.2.1:51569 > 192.0.2.3:56789 as “groupcast:TEST3”

8. まとめ

前回の記事では、クラウド環境やグローバル環境対策として、“IPv4 によるマルチユニキャスト” を投稿しましたが、今回の記事では、更に応用し、マルチユニキャスト1 、ニアキャスト2 、グループキャスト3 を組み込んだ自律型分散ネットワークシステム構築のための Golang サンプルコードを投稿しました。
これらのユースケースとしては、ドローンなどのモビリティ系から、ロボットスマート家電などの IoT 系、あるいは AI 系DB 系ソーシャル系など、あらゆる用途の自律型分散ネットワークシステムへ応用することが可能です。
また、以前の記事で投稿した “IPv4 マルチキャストによる自律ネットワークシステムの構築” などを組み合わせることにより、より生産性の高い自律型分散ネットワークシステムの構築が可能になりえるかもしれません。


  1. マルチユニキャスト(v-multicast / Virtual Multicast でもいいかもしれません。):独自定義です。UDP の IP マルチキャストに似た概念です。全参加ノードへまとめてデータを転送します。これを更に応用すれば、TCP でもマルチキャストに似た処理が可能となりえます。 

  2. ニアキャスト(v-anycast / Virtual Anycast でもいいかもしれません。):独自定義です。IPv6 のエニーキャストに似た概念です。一番近い(レスポンスの早い)ノードを自動検出してそのノードへデータを転送します。これにより、不特定の 1 台のノードへデータを転送することが IPv4 通信で可能になります。その結果、マルチマスター構成や、マスターノードを意識させないアプリケーションの構築が実現可能になるかもしれません。設計によっては、この転送先ノードにサーバントの役割を担わせるなど、いろいろ応用は可能でしょう。 

  3. グループキャスト(※ NTT ドコモさんの昔の同名サービスとは一切関係ありません。):独自定義です。任意のノード群をグルーピングし、グループへまとめてデータを転送します。例えば、マルチユニキャストが非効率な場合や、情報をグループ単位でサイロ化したい場合などに有用です。 

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1