0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NMOS IS-04 Node開発ハンズオン

Last updated at Posted at 2025-04-04

目次

  1. はじめに
  2. NMOSとは
  3. IS-04とは
  4. Node開発の概要
  5. 開発環境の準備
  6. RDSの探索実装
  7. Registration API連携
  8. Node APIサーバー実装
  9. まとめ

はじめに

NMOS(Networked Media Open Specifications)は、Advanced Media Workflow Association(AMWA)によって開発された、ネットワークメディア向けのオープン仕様群です。特にIS-04は、ネットワーク上のリソースの登録と発見を担当する重要な仕様です。

本記事では、Go言語を使用してNMOS IS-04に準拠したNodeを開発する方法を、ハンズオン形式で解説します。開発フローに沿って、RDSの探索、Registration APIの利用、Node APIサーバーの実装までを段階的に進めていきます。

NMOSとは

NMOSは、プロフェッショナル向けネットワークメディアのための仕様群です。異なるベンダーの機器が相互運用できるようにするためのオープン仕様として、AMWAによって開発されています。

NMOSは以下のような特徴を持っています:

  • 異なるベンダーの機器間での相互運用性を確保
  • IPネットワーク上でのメディア機器の発見と接続を自動化
  • RESTful APIを使用した標準化されたインターフェース
  • JSONベースのデータモデル

NMOSは複数の仕様(IS-04, IS-05など)から構成されており、それぞれが特定の機能を担当しています。

IS-04とは

IS-04は「Discovery and Registration Specification」と呼ばれ、ネットワーク上のリソース(Node, Device, Sender, Receiver, Source, Flowなど)を登録・発見するための仕様です。

IS-04の主な目的は:

  • ネットワーク上のリソースを自動的に発見する仕組みを提供
  • リソース間の接続を容易にするためのメタデータを提供
  • システム構築の手動作業を削減し、自動化を促進
  • 動的なデプロイメントを可能にする

IS-04は3つの主要なAPIから構成されています:

  1. Registration API: Nodeがリソースを登録するためのAPI
  2. Query API: 登録されたリソースを検索するためのAPI
  3. Node API: Node上のリソースを見つけるためのAPI

Node開発の概要

NMOS IS-04 Nodeの開発は主に以下の3つのステップで構成されます:

  1. RDSの探索: DNS-SDを使用してRegistration & Discovery System (RDS)を探索
  2. Registration APIの利用: 発見したRDSのRegistration APIを使用してリソースを登録
  3. Node APIサーバーの実装: 自身のリソース情報を提供するAPIサーバーを実装

それでは、各ステップを詳しく見ていきましょう。

開発環境の準備

まず、開発環境を準備します。Go言語のインストールと必要なライブラリのセットアップを行います。

# Goのインストール(既にインストール済みの場合はスキップ)
# Ubuntuの場合
sudo apt-get update
sudo apt-get install golang-go

# プロジェクトディレクトリの作成
mkdir -p nmos-node
cd nmos-node

# Go Modulesの初期化
go mod init github.com/{ユーザー名}/nmos-node

# 必要なライブラリのインストール
go get github.com/brutella/dnssd
go get github.com/google/uuid

RDSの探索実装

最初のステップは、DNS-SDを使用してRegistration & Discovery System (RDS)を探索することです。IS-04仕様では、RDSは_nmos-register._tcpサービスタイプでアドバタイズされています。

以下のコードでは、brutella/dnssdライブラリを使用してRDSを探索する実装例を示します。

まず、discovery.goファイルを作成し、以下のコードを記述します:

package main

import (
	"context"
	"fmt"
	"log"
	"sort"
	"strings"
	"time"

	"github.com/brutella/dnssd"
)

// RDSService はRDSサービスの情報を保持する構造体
type RDSService struct {
	Name     string
	Type     string
	Domain   string
	Host     string
	Port     int
	IPv4     []string
	IPv6     []string
	TXT      map[string]string
	Priority int // TXT priフィールドから取得
}

// DiscoverRDS はDNS-SDを使用してRDSを探索する関数
func DiscoverRDS(ctx context.Context, timeout time.Duration) ([]RDSService, error) {
	// 結果を格納するスライス
	var services []RDSService

	// 探索完了を通知するチャネル
	discoveryDone := make(chan struct{})

	// サービス追加時のコールバック
	addCallback := func(service dnssd.Service) {
		// TXTレコードからpriorityを取得
		priority := 100 // デフォルト値
		if pri, ok := service.Text["pri"]; ok {
			fmt.Sscanf(pri, "%d", &priority)
		}

		// APIバージョンを確認 - 今回は最新のv1.3のみサポート
		apiVer := service.Text["api_ver"]
		if !strings.Contains(apiVer, "v1.3") {
			log.Printf("Unsupported API version: %s", apiVer)
			return
		}

		// RDSServiceオブジェクトを作成
		rdsService := RDSService{
			Name:     service.Name,
			Type:     service.Type,
			Domain:   service.Domain,
			Host:     service.Host,
			Port:     service.Port,
			IPv4:     service.IPs,
			TXT:      service.Text,
			Priority: priority,
		}

		log.Printf("Found RDS: %s at %s:%d (priority: %d)", 
			service.Name, service.Host, service.Port, priority)
		
		services = append(services, rdsService)
	}

	// ブラウザの設定
	config := dnssd.Config{
		Context: ctx,
	}

	// ブラウザの作成
	browser, err := dnssd.NewBrowser(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create browser: %v", err)
	}

	// サービス追加時のコールバックを設定
	browser.AddHandler(dnssd.ServiceAddFunc(addCallback))

	// ブラウズを開始
	go func() {
		// _nmos-register._tcpサービスを探索
		err := browser.Browse("_nmos-register._tcp", "local.")
		if err != nil {
			log.Printf("Browse error: %v", err)
		}

		// タイムアウト後に探索を終了
		time.Sleep(timeout)
		discoveryDone <- struct{}{}
	}()

	// 探索完了を待機
	<-discoveryDone

	// 優先度でソート
	sort.Slice(services, func(i, j int) bool {
		return services[i].Priority < services[j].Priority
	})

	return services, nil
}

// SelectRDS は発見したRDSから最適なものを選択する関数
func SelectRDS(services []RDSService) (*RDSService, error) {
	if len(services) == 0 {
		return nil, fmt.Errorf("no RDS services found")
	}

	// 優先度が最も高いRDSを選択
	// 既にDiscoverRDS内でソート済みなので、最初の要素を返す
	return &services[0], nil
}

// GetRegistrationAPIURL はRDSからRegistration APIのURLを構築する関数
func GetRegistrationAPIURL(service *RDSService) string {
	protocol := "http"
	if proto, ok := service.TXT["api_proto"]; ok && proto == "https" {
		protocol = "https"
	}

	// IPv4アドレスがある場合は最初のものを使用
	if len(service.IPv4) > 0 {
		return fmt.Sprintf("%s://%s:%d/x-nmos/registration/v1.3", 
			protocol, service.IPv4[0], service.Port)
	}

	// ホスト名を使用
	return fmt.Sprintf("%s://%s:%d/x-nmos/registration/v1.3", 
		protocol, service.Host, service.Port)
}

次に、この探索機能をテストするためのmain.goファイルを作成します:

package main

import (
	"context"
	"log"
	"time"
)

func main() {
	log.Println("Starting RDS discovery...")

	// コンテキストの作成
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// RDSの探索(5秒間)
	services, err := DiscoverRDS(ctx, 5*time.Second)
	if err != nil {
		log.Fatalf("Error discovering RDS: %v", err)
	}

	log.Printf("Found %d RDS services", len(services))

	// RDSが見つからなかった場合
	if len(services) == 0 {
		log.Println("No RDS found, would operate in peer-to-peer mode")
		return
	}

	// 最適なRDSを選択
	selectedRDS, err := SelectRDS(services)
	if err != nil {
		log.Fatalf("Error selecting RDS: %v", err)
	}

	// Registration APIのURLを取得
	registrationURL := GetRegistrationAPIURL(selectedRDS)
	log.Printf("Selected Registration API: %s", registrationURL)

	// ここからRegistration APIを使用してリソースを登録する処理に進む
	// (次のセクションで実装)
}

このコードを実行すると、ネットワーク上のRDSを探索し、見つかったRDSの中から最適なものを選択します。選択基準は主にTXTレコードのpriフィールド(優先度)に基づいています。

実行方法

go run main.go discovery.go

動作説明

  1. DiscoverRDS関数は、DNS-SDを使用して_nmos-register._tcpおよび_nmos-registration._tcpサービスを探索します。
  2. 見つかったサービスごとに、TXTレコードから優先度(pri)やAPIバージョン(api_ver)などの情報を取得します。
  3. サービスは優先度でソートされ、SelectRDS関数によって最適なRDSが選択されます。
  4. 選択されたRDSの情報から、GetRegistrationAPIURL関数によってRegistration APIのURLが構築されます。

エラーハンドリング

  • RDSが見つからない場合は、ピアツーピアモードで動作するようにフォールバックします。
  • APIバージョンが互換性のないものは無視されます。
  • 複数のRDSが同じ優先度を持つ場合は、最初に見つかったものが選択されます(実際の実装ではランダム選択が推奨されます)。

Registration API連携

前のセクションでは、DNS-SDを使用してRDSを探索する方法を実装しました。このセクションでは、発見したRDSのRegistration APIを使用してノードとそのリソースを登録する方法を解説します。

Registration APIの概要

Registration APIは、NMOSノードがリソース情報をRDSに登録するためのインターフェースです。主な機能は以下の通りです:

  1. ノードとその関連リソース(デバイス、ソース、フロー、送信者、受信者)の登録
  2. 定期的なハートビート送信によるリソースの有効性維持
  3. リソースの更新と削除

IS-04仕様では、Registration APIは以下の主要エンドポイントを提供しています:

  • /resource - リソースの登録
  • /resource/{resourceType}/{resourceId} - 特定リソースの操作
  • /health/nodes/{nodeId} - ノードのヘルスチェック(ハートビート)

リソースモデルの実装

まず、IS-04仕様に準拠したリソースモデルを実装します。models.goファイルを作成し、以下のコードを記述します:

package main

import (
	"github.com/google/uuid"
)

// NMOSリソースの共通フィールド
type Resource struct {
	ID          string    `json:"id"`
	Version     string    `json:"version"`
	Label       string    `json:"label"`
	Description string    `json:"description"`
	Tags        Tags      `json:"tags"`
}

// タグのリスト
type Tags struct {
	Values []string `json:"values"`
}

// ノードリソース
type Node struct {
	Resource
	Hostname    string   `json:"hostname"`
	Href        string   `json:"href"`
	Caps        Caps     `json:"caps"`
	Api         Api      `json:"api"`
	Services    []Service `json:"services"`
	Interfaces  []Interface `json:"interfaces"`
}

// ノードが提供するAPI
type Api struct {
	Versinos  []string   `json:"versions"`
	Endpoints []Endpoint `json:"endpoints"`
}

// ノードが提供するAPIのエンドポイント
type Endpoint struct {
	Host     string `json:"host"`
	Port     string `json:"port"`
	Protocol string `json:"protocol"`
}

// ノードの機能
type Caps struct {
	Values []string `json:"values"`
}

// ノードが提供するサービス
type Service struct {
	Type string `json:"type"`
	Href string `json:"href"`
}

// ネットワークインターフェース
type Interface struct {
	Name      string   `json:"name"`
	ChassisID string   `json:"chassis_id"`
	PortID    string   `json:"port_id"`
	Attached  []string `json:"attached"`
}

// デバイスリソース
type Device struct {
	Resource
	NodeID      string   `json:"node_id"`
	Type        string   `json:"type"`
	Controls    []Control `json:"controls"`
	Senders     []string `json:"senders"`
	Receivers   []string `json:"receivers"`
}

// デバイスのコントロールエンドポイント
type Control struct {
	Type string `json:"type"`
	Href string `json:"href"`
}

// 新しいノードリソースを作成する関数
func NewNode(hostname, port, protocol, apiEndpoint string) *Node {
	id := uuid.New().String()

	return &Node{
		Resource: Resource{
			ID:          id,
			Version:     "1738858561:000000000",
			Label:       "Go NMOS Node",
			Description: "NMOS Node implemented in Go",
			Tags: Tags{
				Values: []string{"nmos", "go"},
			},
		},
		Hostname:    hostname,
		Href:        apiEndpoint,
		Caps:        Caps{Values: []string{}},
		Api:         Api{
			Versinos: []string{"v1.3"},
			Endpoints: []Endpoint{
				{
					Host:     hostname,
					Port:     port,
					Protocol: protocol,
				},
			},
		},
		Services:    []Service{},
		Interfaces:  []Interface{},
	}
}

// 新しいデバイスリソースを作成する関数
func NewDevice(nodeID string) *Device {
	id := uuid.New().String()

	return &Device{
		Resource: Resource{
			ID:          id,
			Version:     "1738858561:000000000",
			Label:       "Go NMOS Device",
			Description: "NMOS Device implemented in Go",
			Tags: Tags{
				Values: []string{"nmos", "go"},
			},
		},
		NodeID:      nodeID,
		Type:        "urn:x-nmos:device:generic",
		Controls:    []Control{},
		Senders:     []string{},
		Receivers:   []string{},
	}
}

Registration APIクライアントの実装

次に、Registration APIと通信するためのクライアントを実装します。registration.goファイルを作成し、以下のコードを記述します:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"time"
)

// RegistrationClient はRegistration APIと通信するためのクライアント
type RegistrationClient struct {
	BaseURL    string
	HTTPClient *http.Client
}

// RegistartionBody はRegistration APIへ送信するリクエストボディの構造体
type RegistartionBody struct {
    Type string      `json:"type"`
    Data interface{} `json:"data"`
}

// NewRegistrationClient は新しいRegistrationClientを作成する関数
func NewRegistrationClient(baseURL string) *RegistrationClient {
	return &RegistrationClient{
		BaseURL: baseURL,
		HTTPClient: &http.Client{
			Timeout: 10 * time.Second,
		},
	}
}

// RegisterResource はリソースを登録する関数
func (c *RegistrationClient) RegisterResource(resourceType string, resource interface{}) error {
	url := fmt.Sprintf("%s/resource", c.BaseURL)
	
	// リソースをJSONに変換
	jsonData, err := json.Marshal(RegistartionBody{
		Type: resourceType,
		Data: resource,
	})
	if err != nil {
		return fmt.Errorf("failed to marshal resource: %v", err)
	}
	
	// POSTリクエストを作成
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
	if err != nil {
		return fmt.Errorf("failed to create request: %v", err)
	}
	
	// ヘッダーを設定
	req.Header.Set("Content-Type", "application/json")
	
	// リクエストを送信
	resp, err := c.HTTPClient.Do(req)
	if err != nil {
		return fmt.Errorf("failed to send request: %v", err)
	}
	defer resp.Body.Close()
	
	// レスポンスを確認
	if resp.StatusCode != http.StatusCreated {
		return fmt.Errorf("failed to register resource, status: %s", resp.Status)
	}
	
	log.Printf("Successfully registered %s resource", resourceType)
	return nil
}

// SendHeartbeat はノードのハートビートを送信する関数
func (c *RegistrationClient) SendHeartbeat(nodeID string) error {
	url := fmt.Sprintf("%s/health/nodes/%s", c.BaseURL, nodeID)
	
	// POSTリクエストを作成
	req, err := http.NewRequest("POST", url, nil)
	if err != nil {
		return fmt.Errorf("failed to create request: %v", err)
	}
	
	// リクエストを送信
	resp, err := c.HTTPClient.Do(req)
	if err != nil {
		return fmt.Errorf("failed to send heartbeat: %v", err)
	}
	defer resp.Body.Close()
	
	// レスポンスを確認
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("failed to send heartbeat, status: %s", resp.Status)
	}
	
	return nil
}

// StartHeartbeat はハートビートを定期的に送信するゴルーチンを開始する関数
func (c *RegistrationClient) StartHeartbeat(nodeID string, interval time.Duration, ctx context.Context) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			err := c.SendHeartbeat(nodeID)
			if err != nil {
				log.Printf("Heartbeat error: %v", err)
			} else {
				log.Printf("Heartbeat sent for node %s", nodeID)
			}
		case <-ctx.Done():
			log.Println("Stopping heartbeat")
			return
		}
	}
}

メイン処理の実装

最後に、RDS探索とリソース登録を組み合わせたメイン処理を実装します。main.goファイルを更新します:

package main

import (
	"context"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	log.Println("Starting NMOS Node...")

	// コンテキストの作成
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// RDSの探索(5秒間)
	services, err := DiscoverRDS(ctx, 5*time.Second)
	if err != nil {
		log.Fatalf("Error discovering RDS: %v", err)
	}

	log.Printf("Found %d RDS services", len(services))

	// RDSが見つからなかった場合
	if len(services) == 0 {
		log.Println("No RDS found, operating in peer-to-peer mode")
		// ピアツーピアモードの処理(このハンズオンではでは省略)
		return
	}

	// 最適なRDSを選択
	selectedRDS, err := SelectRDS(services)
	if err != nil {
		log.Fatalf("Error selecting RDS: %v", err)
	}

	// Registration APIのURLを取得
	registrationURL := GetRegistrationAPIURL(selectedRDS)
	log.Printf("Selected Registration API: %s", registrationURL)

	// Registration APIクライアントの作成
	client := NewRegistrationClient(registrationURL)

	// ホスト名の取得
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
		log.Printf("Failed to get hostname: %v, using default: %s", err, hostname)
	}

	// IPアドレスの取得
	ipAddr, err := getOutboundIP()
	if err != nil {
		log.Fatalf("Failed to get IP address: %v", err)
	}

	// Node APIのエンドポイント
	nodeAPIEndpoint := fmt.Sprintf("http://%s:8080/x-nmos/node/v1.3", ipAddr)

	// ノードリソースの作成
	node := NewNode(hostname, nodeAPIEndpoint)

	// ノードの登録
	err = client.RegisterResource("node", node)
	if err != nil {
		log.Fatalf("Failed to register node: %v", err)
	}

	// デバイスリソースの作成
	device := NewDevice(node.ID)

	// デバイスの登録
	err = client.RegisterResource("device", device)
	if err != nil {
		log.Printf("Failed to register device: %v", err)
	}

	// ハートビートの開始(5秒間隔)
	go client.StartHeartbeat(node.ID, 5*time.Second, ctx)

	// シグナル処理
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// 終了シグナルを待機
	<-sigChan
	log.Println("Shutting down...")
	cancel() // コンテキストをキャンセルしてハートビートを停止
	time.Sleep(1 * time.Second) // ハートビート停止を待機
}

// getOutboundIP は外部通信用のIPアドレスを取得する関数
func getOutboundIP() (string, error) {
	conn, err := net.Dial("udp", "8.8.8.8:80")
	if err != nil {
		return "", err
	}
	defer conn.Close()

	localAddr := conn.LocalAddr().(*net.UDPAddr)
	return localAddr.IP.String(), nil
}

実行方法

以下のコマンドでプログラムを実行します:

go run main.go discovery.go registration.go models.go

動作説明

  1. プログラムはまずRDSを探索し、最適なRDSを選択します。
  2. 選択したRDSのRegistration APIに接続するクライアントを作成します。
  3. ホスト名とIPアドレスを取得し、ノードリソースを作成します。
  4. ノードリソースをRegistration APIに登録します。
  5. デバイスリソースを作成し、Registration APIに登録します。
  6. 定期的なハートビートを送信するゴルーチンを開始します。
  7. プログラムは終了シグナル(Ctrl+C)を受け取るまで実行を続けます。

エラーハンドリング

  • RDSが見つからない場合は、ピアツーピアモードで動作します。
  • リソース登録に失敗した場合はエラーメッセージを表示します。
  • ハートビート送信に失敗した場合もエラーメッセージを表示しますが、プログラムは実行を続けます。

Node APIサーバー実装

前のセクションでは、RDSの探索とRegistration APIを使用したリソース登録の方法を実装しました。このセクションでは、Node APIサーバーを実装する方法を解説します。Node APIは、他のノードやコントローラーがこのノードのリソース情報を取得するためのインターフェースです。

Node APIの概要

Node APIは各NMOSノードによって公開され、そのノードのリソース情報を提供します。IS-04仕様では、Node APIは以下の主要エンドポイントを提供しています:

  • / - ベースエンドポイント(APIのバージョン情報)
  • /self - ノード自身の情報
  • /sources - ソースリソース
  • /flows - フローリソース
  • /devices - デバイスリソース
  • /senders - 送信者リソース
  • /receivers - 受信者リソース

Node APIサーバーは、前のセクションで登録したリソース情報と同じ情報を提供する必要があります。

リソースストアの実装

まず、ノードのリソース情報を管理するためのストアを実装します。store.goファイルを作成し、以下のコードを記述します:

package main

import (
	"sync"
)

// ResourceStore はノードのリソース情報を管理するストア
type ResourceStore struct {
	Node      *Node
	Devices   map[string]*Device
	Sources   map[string]interface{} // 簡略化のため、具体的な型は省略
	Flows     map[string]interface{}
	Senders   map[string]interface{}
	Receivers map[string]interface{}
	mutex     sync.RWMutex
}

// NewResourceStore は新しいResourceStoreを作成する関数
func NewResourceStore() *ResourceStore {
	return &ResourceStore{
		Node:      nil,
		Devices:   make(map[string]*Device),
		Sources:   make(map[string]interface{}),
		Flows:     make(map[string]interface{}),
		Senders:   make(map[string]interface{}),
		Receivers: make(map[string]interface{}),
	}
}

// SetNode はノード情報を設定する関数
func (s *ResourceStore) SetNode(node *Node) {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.Node = node
}

// AddDevice はデバイスを追加する関数
func (s *ResourceStore) AddDevice(device *Device) {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.Devices[device.ID] = device
}

// GetNode はノード情報を取得する関数
func (s *ResourceStore) GetNode() *Node {
	s.mutex.RLock()
	defer s.mutex.RUnlock()
	return s.Node
}

// GetDevices はすべてのデバイスを取得する関数
func (s *ResourceStore) GetDevices() []*Device {
	s.mutex.RLock()
	defer s.mutex.RUnlock()
	
	devices := make([]*Device, 0, len(s.Devices))
	for _, device := range s.Devices {
		devices = append(devices, device)
	}
	return devices
}

// GetDevice は指定されたIDのデバイスを取得する関数
func (s *ResourceStore) GetDevice(id string) *Device {
	s.mutex.RLock()
	defer s.mutex.RUnlock()
	return s.Devices[id]
}

// 他のリソースタイプ(Sources, Flows, Senders, Receivers)に対する
// 同様のメソッドも実装できますが、今回のハンズオンでは省略しています

Node APIサーバーの実装

次に、Node APIサーバーを実装します。api.goファイルを作成し、以下のコードを記述します:

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strings"
)

// APIServer はNode APIサーバーの構造体
type APIServer struct {
	Store    *ResourceStore
	Port     int
	Version  string
	BasePath string
}

// NewAPIServer は新しいAPIServerを作成する関数
func NewAPIServer(store *ResourceStore, port int) *APIServer {
	return &APIServer{
		Store:    store,
		Port:     port,
		Version:  "v1.3",
		BasePath: "/x-nmos/node/v1.3",
	}
}

// Start はAPIサーバーを起動する関数
func (s *APIServer) Start() error {
	// ルートハンドラーの登録
	http.HandleFunc(s.BasePath+"/", s.handleRoot)
	http.HandleFunc(s.BasePath+"/self", s.handleSelf)
	http.HandleFunc(s.BasePath+"/devices", s.handleDevices)
	http.HandleFunc(s.BasePath+"/sources", s.handleSources)
	http.HandleFunc(s.BasePath+"/flows", s.handleFlows)
	http.HandleFunc(s.BasePath+"/senders", s.handleSenders)
	http.HandleFunc(s.BasePath+"/receivers", s.handleReceivers)
	
	// 個別リソースのハンドラー登録
	http.HandleFunc(s.BasePath+"/devices/", s.handleDeviceByID)
	// 他のリソースタイプに対する同様のハンドラーも登録できます
	
	// サーバーの起動
	addr := fmt.Sprintf(":%d", s.Port)
	log.Printf("Starting Node API server on %s", addr)
	return http.ListenAndServe(addr, nil)
}

// レスポンスを返す共通関数
func (s *APIServer) sendResponse(w http.ResponseWriter, data interface{}) {
	w.Header().Set("Content-Type", "application/json")
	
	jsonData, err := json.Marshal(data)
	if err != nil {
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		log.Printf("Error marshaling JSON: %v", err)
		return
	}
	
	w.Write(jsonData)
}

// ルートハンドラー
func (s *APIServer) handleRoot(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != s.BasePath+"/" && r.URL.Path != s.BasePath {
		http.NotFound(w, r)
		return
	}
	
	// APIのバージョン情報を返す
	response := map[string]string{
		"version": s.Version,
	}
	
	s.sendResponse(w, response)
}

// selfハンドラー
func (s *APIServer) handleSelf(w http.ResponseWriter, r *http.Request) {
	node := s.Store.GetNode()
	if node == nil {
		http.Error(w, "Node not found", http.StatusNotFound)
		return
	}
	
	s.sendResponse(w, node)
}

// devicesハンドラー
func (s *APIServer) handleDevices(w http.ResponseWriter, r *http.Request) {
	devices := s.Store.GetDevices()
	s.sendResponse(w, devices)
}

// 個別デバイスハンドラー
func (s *APIServer) handleDeviceByID(w http.ResponseWriter, r *http.Request) {
	// URLからデバイスIDを抽出
	parts := strings.Split(r.URL.Path, "/")
	if len(parts) < 5 {
		http.NotFound(w, r)
		return
	}
	
	deviceID := parts[len(parts)-1]
	device := s.Store.GetDevice(deviceID)
	
	if device == nil {
		http.Error(w, "Device not found", http.StatusNotFound)
		return
	}
	
	s.sendResponse(w, device)
}

// 他のリソースタイプのハンドラー
// 簡略化のため、これらは空のリストを返すだけの実装としています
func (s *APIServer) handleSources(w http.ResponseWriter, r *http.Request) {
	s.sendResponse(w, []interface{}{})
}

func (s *APIServer) handleFlows(w http.ResponseWriter, r *http.Request) {
	s.sendResponse(w, []interface{}{})
}

func (s *APIServer) handleSenders(w http.ResponseWriter, r *http.Request) {
	s.sendResponse(w, []interface{}{})
}

func (s *APIServer) handleReceivers(w http.ResponseWriter, r *http.Request) {
	s.sendResponse(w, []interface{}{})
}

メイン処理の更新

最後に、Node APIサーバーを起動するためにメイン処理を更新します。main.goファイルを更新します:

package main

import (
	"context"
	"log"
	"net"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	log.Println("Starting NMOS Node...")

	// コンテキストの作成
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// リソースストアの作成
	store := NewResourceStore()

	// IPアドレスの取得
	ipAddr, err := getOutboundIP()
	if err != nil {
		log.Fatalf("Failed to get IP address: %v", err)
	}

	// ホスト名の取得
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
		log.Printf("Failed to get hostname: %v, using default: %s", err, hostname)
	}

	// Node APIのエンドポイント
	nodeAPIPort := 8080
	nodeAPIEndpoint := fmt.Sprintf("http://%s:%d/x-nmos/node/v1.3", ipAddr, nodeAPIPort)

	// ノードリソースの作成
	node := NewNode(hostname, nodeAPIEndpoint)
	store.SetNode(node)

	// デバイスリソースの作成
	device := NewDevice(node.ID)
	store.AddDevice(device)

	// Node APIサーバーの作成と起動
	apiServer := NewAPIServer(store, nodeAPIPort)
	go func() {
		if err := apiServer.Start(); err != nil {
			log.Fatalf("Failed to start API server: %v", err)
		}
	}()

	// RDSの探索(5秒間)
	services, err := DiscoverRDS(ctx, 5*time.Second)
	if err != nil {
		log.Fatalf("Error discovering RDS: %v", err)
	}

	log.Printf("Found %d RDS services", len(services))

	// RDSが見つからなかった場合
	if len(services) == 0 {
		log.Println("No RDS found, operating in peer-to-peer mode")
		// ピアツーピアモードの処理
		// この場合でもNode APIサーバーは起動したままにする
	} else {
		// 最適なRDSを選択
		selectedRDS, err := SelectRDS(services)
		if err != nil {
			log.Fatalf("Error selecting RDS: %v", err)
		}

		// Registration APIのURLを取得
		registrationURL := GetRegistrationAPIURL(selectedRDS)
		log.Printf("Selected Registration API: %s", registrationURL)

		// Registration APIクライアントの作成
		client := NewRegistrationClient(registrationURL)

		// ノードの登録
		err = client.RegisterResource("node", node)
		if err != nil {
			log.Fatalf("Failed to register node: %v", err)
		}

		// デバイスの登録
		err = client.RegisterResource("device", device)
		if err != nil {
			log.Printf("Failed to register device: %v", err)
		}

		// ハートビートの開始(5秒間隔)
		go client.StartHeartbeat(node.ID, 5*time.Second, ctx)
	}

	// シグナル処理
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// 終了シグナルを待機
	<-sigChan
	log.Println("Shutting down...")
	cancel() // コンテキストをキャンセルしてハートビートを停止
	time.Sleep(1 * time.Second) // ハートビート停止を待機
}

実行方法

以下のコマンドでプログラムを実行します:

go run main.go discovery.go registration.go models.go store.go api.go

動作説明

  1. プログラムはまずリソースストアを作成し、ノードとデバイスのリソースを作成します。
  2. Node APIサーバーを起動し、リソース情報を提供します。
  3. RDSを探索し、見つかった場合はRegistration APIにリソースを登録します。
  4. RDSが見つからない場合は、ピアツーピアモードで動作します(Node APIサーバーのみ起動)。
  5. プログラムは終了シグナル(Ctrl+C)を受け取るまで実行を続けます。

APIのテスト

Node APIサーバーが起動したら、以下のようにcurlコマンドでAPIをテストできます:

# ルートエンドポイント
curl http://localhost:8080/x-nmos/node/v1.3/

# ノード情報
curl http://localhost:8080/x-nmos/node/v1.3/self

# デバイス一覧
curl http://localhost:8080/x-nmos/node/v1.3/devices

# 特定のデバイス情報(実際のデバイスIDに置き換えてください)
curl http://localhost:8080/x-nmos/node/v1.3/devices/your-device-id

CORS対応

実際の運用環境では、ブラウザからのクロスオリジンリクエストを許可するためにCORS(Cross-Origin Resource Sharing)対応が必要になる場合があります。以下のようにCORSヘッダーを追加できます:

// CORSミドルウェア
func corsMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Access-Control-Allow-Origin", "*")
		w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
		w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
		
		if r.Method == "OPTIONS" {
			w.WriteHeader(http.StatusOK)
			return
		}
		
		next.ServeHTTP(w, r)
	})
}

// APIサーバーの起動時にCORSミドルウェアを適用
func (s *APIServer) Start() error {
	// ハンドラーの登録
	mux := http.NewServeMux()
	mux.HandleFunc(s.BasePath+"/", s.handleRoot)
	// 他のハンドラーも同様に登録
	
	// CORSミドルウェアを適用
	handler := corsMiddleware(mux)
	
	// サーバーの起動
	addr := fmt.Sprintf(":%d", s.Port)
	log.Printf("Starting Node API server on %s", addr)
	return http.ListenAndServe(addr, handler)
}

まとめ

この記事では、Go言語を使用してNMOS IS-04に準拠したNodeを開発する方法を解説しました。主な実装ポイントは以下の通りです:

  1. RDSの探索: DNS-SDを使用してRegistration & Discovery System (RDS)を探索する方法
  2. Registration APIの利用: 発見したRDSのRegistration APIを使用してリソースを登録する方法
  3. Node APIサーバーの実装: 自身のリソース情報を提供するAPIサーバーを実装する方法

これらの機能を組み合わせることで、NMOS IS-04に準拠したNodeを実装することができます。実際の運用環境では、より多くのリソースタイプや機能を追加することで、より完全なNMOS Nodeを構築できます。

NMOS IS-04は、ネットワークメディア機器の相互運用性を確保するための重要な仕様です。この記事で解説した実装例を基に、独自のNMOS対応機器やアプリケーションの開発に取り組んでみてください。

拡張ポイント

このコードは基本的な実装を示していますが、以下のような拡張が可能です:

  1. より多くのリソースタイプ(Source, Flow, Sender, Receiver)の完全な実装
  2. WebSocketを使用したクエリAPIのサブスクリプション機能
  3. TLS対応(HTTPSサポート)
  4. 認証機能の追加
  5. より堅牢なエラーハンドリングとロギング
  6. リソースの検証機能
  7. ピアツーピアモードの完全な実装

参考リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?