目次
はじめに
NMOS(Networked Media Open Specifications)は、Advanced Media Workflow Association(AMWA)によって開発された、ネットワークメディア向けのオープン仕様群です。特にIS-04は、ネットワーク上のリソースの登録と発見を担当する重要な仕様です。
本記事では、Go言語を使用してNMOS IS-04に準拠したNodeを開発する方法を、ハンズオン形式で解説します。開発フローに沿って、RDSの探索、Registration APIの利用、Node APIサーバーの実装までを段階的に進めていきます。
NMOSとは
NMOSは、プロフェッショナル向けネットワークメディアのための仕様群です。異なるベンダーの機器が相互運用できるようにするためのオープン仕様として、AMWAによって開発されています。
NMOSは以下のような特徴を持っています:
- 異なるベンダーの機器間での相互運用性を確保
- IPネットワーク上でのメディア機器の発見と接続を自動化
- RESTful APIを使用した標準化されたインターフェース
- JSONベースのデータモデル
NMOSは複数の仕様(IS-04, IS-05など)から構成されており、それぞれが特定の機能を担当しています。
IS-04とは
IS-04は「Discovery and Registration Specification」と呼ばれ、ネットワーク上のリソース(Node, Device, Sender, Receiver, Source, Flowなど)を登録・発見するための仕様です。
IS-04の主な目的は:
- ネットワーク上のリソースを自動的に発見する仕組みを提供
- リソース間の接続を容易にするためのメタデータを提供
- システム構築の手動作業を削減し、自動化を促進
- 動的なデプロイメントを可能にする
IS-04は3つの主要なAPIから構成されています:
- Registration API: Nodeがリソースを登録するためのAPI
- Query API: 登録されたリソースを検索するためのAPI
- Node API: Node上のリソースを見つけるためのAPI
Node開発の概要
NMOS IS-04 Nodeの開発は主に以下の3つのステップで構成されます:
- RDSの探索: DNS-SDを使用してRegistration & Discovery System (RDS)を探索
- Registration APIの利用: 発見したRDSのRegistration APIを使用してリソースを登録
- Node APIサーバーの実装: 自身のリソース情報を提供するAPIサーバーを実装
それでは、各ステップを詳しく見ていきましょう。
開発環境の準備
まず、開発環境を準備します。Go言語のインストールと必要なライブラリのセットアップを行います。
# Goのインストール(既にインストール済みの場合はスキップ)
# Ubuntuの場合
sudo apt-get update
sudo apt-get install golang-go
# プロジェクトディレクトリの作成
mkdir -p nmos-node
cd nmos-node
# Go Modulesの初期化
go mod init github.com/{ユーザー名}/nmos-node
# 必要なライブラリのインストール
go get github.com/brutella/dnssd
go get github.com/google/uuid
RDSの探索実装
最初のステップは、DNS-SDを使用してRegistration & Discovery System (RDS)を探索することです。IS-04仕様では、RDSは_nmos-register._tcp
サービスタイプでアドバタイズされています。
以下のコードでは、brutella/dnssdライブラリを使用してRDSを探索する実装例を示します。
まず、discovery.go
ファイルを作成し、以下のコードを記述します:
package main
import (
"context"
"fmt"
"log"
"sort"
"strings"
"time"
"github.com/brutella/dnssd"
)
// RDSService はRDSサービスの情報を保持する構造体
type RDSService struct {
Name string
Type string
Domain string
Host string
Port int
IPv4 []string
IPv6 []string
TXT map[string]string
Priority int // TXT priフィールドから取得
}
// DiscoverRDS はDNS-SDを使用してRDSを探索する関数
func DiscoverRDS(ctx context.Context, timeout time.Duration) ([]RDSService, error) {
// 結果を格納するスライス
var services []RDSService
// 探索完了を通知するチャネル
discoveryDone := make(chan struct{})
// サービス追加時のコールバック
addCallback := func(service dnssd.Service) {
// TXTレコードからpriorityを取得
priority := 100 // デフォルト値
if pri, ok := service.Text["pri"]; ok {
fmt.Sscanf(pri, "%d", &priority)
}
// APIバージョンを確認 - 今回は最新のv1.3のみサポート
apiVer := service.Text["api_ver"]
if !strings.Contains(apiVer, "v1.3") {
log.Printf("Unsupported API version: %s", apiVer)
return
}
// RDSServiceオブジェクトを作成
rdsService := RDSService{
Name: service.Name,
Type: service.Type,
Domain: service.Domain,
Host: service.Host,
Port: service.Port,
IPv4: service.IPs,
TXT: service.Text,
Priority: priority,
}
log.Printf("Found RDS: %s at %s:%d (priority: %d)",
service.Name, service.Host, service.Port, priority)
services = append(services, rdsService)
}
// ブラウザの設定
config := dnssd.Config{
Context: ctx,
}
// ブラウザの作成
browser, err := dnssd.NewBrowser(config)
if err != nil {
return nil, fmt.Errorf("failed to create browser: %v", err)
}
// サービス追加時のコールバックを設定
browser.AddHandler(dnssd.ServiceAddFunc(addCallback))
// ブラウズを開始
go func() {
// _nmos-register._tcpサービスを探索
err := browser.Browse("_nmos-register._tcp", "local.")
if err != nil {
log.Printf("Browse error: %v", err)
}
// タイムアウト後に探索を終了
time.Sleep(timeout)
discoveryDone <- struct{}{}
}()
// 探索完了を待機
<-discoveryDone
// 優先度でソート
sort.Slice(services, func(i, j int) bool {
return services[i].Priority < services[j].Priority
})
return services, nil
}
// SelectRDS は発見したRDSから最適なものを選択する関数
func SelectRDS(services []RDSService) (*RDSService, error) {
if len(services) == 0 {
return nil, fmt.Errorf("no RDS services found")
}
// 優先度が最も高いRDSを選択
// 既にDiscoverRDS内でソート済みなので、最初の要素を返す
return &services[0], nil
}
// GetRegistrationAPIURL はRDSからRegistration APIのURLを構築する関数
func GetRegistrationAPIURL(service *RDSService) string {
protocol := "http"
if proto, ok := service.TXT["api_proto"]; ok && proto == "https" {
protocol = "https"
}
// IPv4アドレスがある場合は最初のものを使用
if len(service.IPv4) > 0 {
return fmt.Sprintf("%s://%s:%d/x-nmos/registration/v1.3",
protocol, service.IPv4[0], service.Port)
}
// ホスト名を使用
return fmt.Sprintf("%s://%s:%d/x-nmos/registration/v1.3",
protocol, service.Host, service.Port)
}
次に、この探索機能をテストするためのmain.go
ファイルを作成します:
package main
import (
"context"
"log"
"time"
)
func main() {
log.Println("Starting RDS discovery...")
// コンテキストの作成
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// RDSの探索(5秒間)
services, err := DiscoverRDS(ctx, 5*time.Second)
if err != nil {
log.Fatalf("Error discovering RDS: %v", err)
}
log.Printf("Found %d RDS services", len(services))
// RDSが見つからなかった場合
if len(services) == 0 {
log.Println("No RDS found, would operate in peer-to-peer mode")
return
}
// 最適なRDSを選択
selectedRDS, err := SelectRDS(services)
if err != nil {
log.Fatalf("Error selecting RDS: %v", err)
}
// Registration APIのURLを取得
registrationURL := GetRegistrationAPIURL(selectedRDS)
log.Printf("Selected Registration API: %s", registrationURL)
// ここからRegistration APIを使用してリソースを登録する処理に進む
// (次のセクションで実装)
}
このコードを実行すると、ネットワーク上のRDSを探索し、見つかったRDSの中から最適なものを選択します。選択基準は主にTXTレコードのpri
フィールド(優先度)に基づいています。
実行方法
go run main.go discovery.go
動作説明
-
DiscoverRDS
関数は、DNS-SDを使用して_nmos-register._tcp
および_nmos-registration._tcp
サービスを探索します。 - 見つかったサービスごとに、TXTレコードから優先度(
pri
)やAPIバージョン(api_ver
)などの情報を取得します。 - サービスは優先度でソートされ、
SelectRDS
関数によって最適なRDSが選択されます。 - 選択されたRDSの情報から、
GetRegistrationAPIURL
関数によってRegistration APIのURLが構築されます。
エラーハンドリング
- RDSが見つからない場合は、ピアツーピアモードで動作するようにフォールバックします。
- APIバージョンが互換性のないものは無視されます。
- 複数のRDSが同じ優先度を持つ場合は、最初に見つかったものが選択されます(実際の実装ではランダム選択が推奨されます)。
Registration API連携
前のセクションでは、DNS-SDを使用してRDSを探索する方法を実装しました。このセクションでは、発見したRDSのRegistration APIを使用してノードとそのリソースを登録する方法を解説します。
Registration APIの概要
Registration APIは、NMOSノードがリソース情報をRDSに登録するためのインターフェースです。主な機能は以下の通りです:
- ノードとその関連リソース(デバイス、ソース、フロー、送信者、受信者)の登録
- 定期的なハートビート送信によるリソースの有効性維持
- リソースの更新と削除
IS-04仕様では、Registration APIは以下の主要エンドポイントを提供しています:
-
/resource
- リソースの登録 -
/resource/{resourceType}/{resourceId}
- 特定リソースの操作 -
/health/nodes/{nodeId}
- ノードのヘルスチェック(ハートビート)
リソースモデルの実装
まず、IS-04仕様に準拠したリソースモデルを実装します。models.go
ファイルを作成し、以下のコードを記述します:
package main
import (
"github.com/google/uuid"
)
// NMOSリソースの共通フィールド
type Resource struct {
ID string `json:"id"`
Version string `json:"version"`
Label string `json:"label"`
Description string `json:"description"`
Tags Tags `json:"tags"`
}
// タグのリスト
type Tags struct {
Values []string `json:"values"`
}
// ノードリソース
type Node struct {
Resource
Hostname string `json:"hostname"`
Href string `json:"href"`
Caps Caps `json:"caps"`
Api Api `json:"api"`
Services []Service `json:"services"`
Interfaces []Interface `json:"interfaces"`
}
// ノードが提供するAPI
type Api struct {
Versinos []string `json:"versions"`
Endpoints []Endpoint `json:"endpoints"`
}
// ノードが提供するAPIのエンドポイント
type Endpoint struct {
Host string `json:"host"`
Port string `json:"port"`
Protocol string `json:"protocol"`
}
// ノードの機能
type Caps struct {
Values []string `json:"values"`
}
// ノードが提供するサービス
type Service struct {
Type string `json:"type"`
Href string `json:"href"`
}
// ネットワークインターフェース
type Interface struct {
Name string `json:"name"`
ChassisID string `json:"chassis_id"`
PortID string `json:"port_id"`
Attached []string `json:"attached"`
}
// デバイスリソース
type Device struct {
Resource
NodeID string `json:"node_id"`
Type string `json:"type"`
Controls []Control `json:"controls"`
Senders []string `json:"senders"`
Receivers []string `json:"receivers"`
}
// デバイスのコントロールエンドポイント
type Control struct {
Type string `json:"type"`
Href string `json:"href"`
}
// 新しいノードリソースを作成する関数
func NewNode(hostname, port, protocol, apiEndpoint string) *Node {
id := uuid.New().String()
return &Node{
Resource: Resource{
ID: id,
Version: "1738858561:000000000",
Label: "Go NMOS Node",
Description: "NMOS Node implemented in Go",
Tags: Tags{
Values: []string{"nmos", "go"},
},
},
Hostname: hostname,
Href: apiEndpoint,
Caps: Caps{Values: []string{}},
Api: Api{
Versinos: []string{"v1.3"},
Endpoints: []Endpoint{
{
Host: hostname,
Port: port,
Protocol: protocol,
},
},
},
Services: []Service{},
Interfaces: []Interface{},
}
}
// 新しいデバイスリソースを作成する関数
func NewDevice(nodeID string) *Device {
id := uuid.New().String()
return &Device{
Resource: Resource{
ID: id,
Version: "1738858561:000000000",
Label: "Go NMOS Device",
Description: "NMOS Device implemented in Go",
Tags: Tags{
Values: []string{"nmos", "go"},
},
},
NodeID: nodeID,
Type: "urn:x-nmos:device:generic",
Controls: []Control{},
Senders: []string{},
Receivers: []string{},
}
}
Registration APIクライアントの実装
次に、Registration APIと通信するためのクライアントを実装します。registration.go
ファイルを作成し、以下のコードを記述します:
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
// RegistrationClient はRegistration APIと通信するためのクライアント
type RegistrationClient struct {
BaseURL string
HTTPClient *http.Client
}
// RegistartionBody はRegistration APIへ送信するリクエストボディの構造体
type RegistartionBody struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}
// NewRegistrationClient は新しいRegistrationClientを作成する関数
func NewRegistrationClient(baseURL string) *RegistrationClient {
return &RegistrationClient{
BaseURL: baseURL,
HTTPClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// RegisterResource はリソースを登録する関数
func (c *RegistrationClient) RegisterResource(resourceType string, resource interface{}) error {
url := fmt.Sprintf("%s/resource", c.BaseURL)
// リソースをJSONに変換
jsonData, err := json.Marshal(RegistartionBody{
Type: resourceType,
Data: resource,
})
if err != nil {
return fmt.Errorf("failed to marshal resource: %v", err)
}
// POSTリクエストを作成
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
// ヘッダーを設定
req.Header.Set("Content-Type", "application/json")
// リクエストを送信
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// レスポンスを確認
if resp.StatusCode != http.StatusCreated {
return fmt.Errorf("failed to register resource, status: %s", resp.Status)
}
log.Printf("Successfully registered %s resource", resourceType)
return nil
}
// SendHeartbeat はノードのハートビートを送信する関数
func (c *RegistrationClient) SendHeartbeat(nodeID string) error {
url := fmt.Sprintf("%s/health/nodes/%s", c.BaseURL, nodeID)
// POSTリクエストを作成
req, err := http.NewRequest("POST", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
// リクエストを送信
resp, err := c.HTTPClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send heartbeat: %v", err)
}
defer resp.Body.Close()
// レスポンスを確認
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to send heartbeat, status: %s", resp.Status)
}
return nil
}
// StartHeartbeat はハートビートを定期的に送信するゴルーチンを開始する関数
func (c *RegistrationClient) StartHeartbeat(nodeID string, interval time.Duration, ctx context.Context) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := c.SendHeartbeat(nodeID)
if err != nil {
log.Printf("Heartbeat error: %v", err)
} else {
log.Printf("Heartbeat sent for node %s", nodeID)
}
case <-ctx.Done():
log.Println("Stopping heartbeat")
return
}
}
}
メイン処理の実装
最後に、RDS探索とリソース登録を組み合わせたメイン処理を実装します。main.go
ファイルを更新します:
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
log.Println("Starting NMOS Node...")
// コンテキストの作成
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// RDSの探索(5秒間)
services, err := DiscoverRDS(ctx, 5*time.Second)
if err != nil {
log.Fatalf("Error discovering RDS: %v", err)
}
log.Printf("Found %d RDS services", len(services))
// RDSが見つからなかった場合
if len(services) == 0 {
log.Println("No RDS found, operating in peer-to-peer mode")
// ピアツーピアモードの処理(このハンズオンではでは省略)
return
}
// 最適なRDSを選択
selectedRDS, err := SelectRDS(services)
if err != nil {
log.Fatalf("Error selecting RDS: %v", err)
}
// Registration APIのURLを取得
registrationURL := GetRegistrationAPIURL(selectedRDS)
log.Printf("Selected Registration API: %s", registrationURL)
// Registration APIクライアントの作成
client := NewRegistrationClient(registrationURL)
// ホスト名の取得
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
log.Printf("Failed to get hostname: %v, using default: %s", err, hostname)
}
// IPアドレスの取得
ipAddr, err := getOutboundIP()
if err != nil {
log.Fatalf("Failed to get IP address: %v", err)
}
// Node APIのエンドポイント
nodeAPIEndpoint := fmt.Sprintf("http://%s:8080/x-nmos/node/v1.3", ipAddr)
// ノードリソースの作成
node := NewNode(hostname, nodeAPIEndpoint)
// ノードの登録
err = client.RegisterResource("node", node)
if err != nil {
log.Fatalf("Failed to register node: %v", err)
}
// デバイスリソースの作成
device := NewDevice(node.ID)
// デバイスの登録
err = client.RegisterResource("device", device)
if err != nil {
log.Printf("Failed to register device: %v", err)
}
// ハートビートの開始(5秒間隔)
go client.StartHeartbeat(node.ID, 5*time.Second, ctx)
// シグナル処理
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 終了シグナルを待機
<-sigChan
log.Println("Shutting down...")
cancel() // コンテキストをキャンセルしてハートビートを停止
time.Sleep(1 * time.Second) // ハートビート停止を待機
}
// getOutboundIP は外部通信用のIPアドレスを取得する関数
func getOutboundIP() (string, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return "", err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP.String(), nil
}
実行方法
以下のコマンドでプログラムを実行します:
go run main.go discovery.go registration.go models.go
動作説明
- プログラムはまずRDSを探索し、最適なRDSを選択します。
- 選択したRDSのRegistration APIに接続するクライアントを作成します。
- ホスト名とIPアドレスを取得し、ノードリソースを作成します。
- ノードリソースをRegistration APIに登録します。
- デバイスリソースを作成し、Registration APIに登録します。
- 定期的なハートビートを送信するゴルーチンを開始します。
- プログラムは終了シグナル(Ctrl+C)を受け取るまで実行を続けます。
エラーハンドリング
- RDSが見つからない場合は、ピアツーピアモードで動作します。
- リソース登録に失敗した場合はエラーメッセージを表示します。
- ハートビート送信に失敗した場合もエラーメッセージを表示しますが、プログラムは実行を続けます。
Node APIサーバー実装
前のセクションでは、RDSの探索とRegistration APIを使用したリソース登録の方法を実装しました。このセクションでは、Node APIサーバーを実装する方法を解説します。Node APIは、他のノードやコントローラーがこのノードのリソース情報を取得するためのインターフェースです。
Node APIの概要
Node APIは各NMOSノードによって公開され、そのノードのリソース情報を提供します。IS-04仕様では、Node APIは以下の主要エンドポイントを提供しています:
-
/
- ベースエンドポイント(APIのバージョン情報) -
/self
- ノード自身の情報 -
/sources
- ソースリソース -
/flows
- フローリソース -
/devices
- デバイスリソース -
/senders
- 送信者リソース -
/receivers
- 受信者リソース
Node APIサーバーは、前のセクションで登録したリソース情報と同じ情報を提供する必要があります。
リソースストアの実装
まず、ノードのリソース情報を管理するためのストアを実装します。store.go
ファイルを作成し、以下のコードを記述します:
package main
import (
"sync"
)
// ResourceStore はノードのリソース情報を管理するストア
type ResourceStore struct {
Node *Node
Devices map[string]*Device
Sources map[string]interface{} // 簡略化のため、具体的な型は省略
Flows map[string]interface{}
Senders map[string]interface{}
Receivers map[string]interface{}
mutex sync.RWMutex
}
// NewResourceStore は新しいResourceStoreを作成する関数
func NewResourceStore() *ResourceStore {
return &ResourceStore{
Node: nil,
Devices: make(map[string]*Device),
Sources: make(map[string]interface{}),
Flows: make(map[string]interface{}),
Senders: make(map[string]interface{}),
Receivers: make(map[string]interface{}),
}
}
// SetNode はノード情報を設定する関数
func (s *ResourceStore) SetNode(node *Node) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Node = node
}
// AddDevice はデバイスを追加する関数
func (s *ResourceStore) AddDevice(device *Device) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Devices[device.ID] = device
}
// GetNode はノード情報を取得する関数
func (s *ResourceStore) GetNode() *Node {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.Node
}
// GetDevices はすべてのデバイスを取得する関数
func (s *ResourceStore) GetDevices() []*Device {
s.mutex.RLock()
defer s.mutex.RUnlock()
devices := make([]*Device, 0, len(s.Devices))
for _, device := range s.Devices {
devices = append(devices, device)
}
return devices
}
// GetDevice は指定されたIDのデバイスを取得する関数
func (s *ResourceStore) GetDevice(id string) *Device {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.Devices[id]
}
// 他のリソースタイプ(Sources, Flows, Senders, Receivers)に対する
// 同様のメソッドも実装できますが、今回のハンズオンでは省略しています
Node APIサーバーの実装
次に、Node APIサーバーを実装します。api.go
ファイルを作成し、以下のコードを記述します:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
)
// APIServer はNode APIサーバーの構造体
type APIServer struct {
Store *ResourceStore
Port int
Version string
BasePath string
}
// NewAPIServer は新しいAPIServerを作成する関数
func NewAPIServer(store *ResourceStore, port int) *APIServer {
return &APIServer{
Store: store,
Port: port,
Version: "v1.3",
BasePath: "/x-nmos/node/v1.3",
}
}
// Start はAPIサーバーを起動する関数
func (s *APIServer) Start() error {
// ルートハンドラーの登録
http.HandleFunc(s.BasePath+"/", s.handleRoot)
http.HandleFunc(s.BasePath+"/self", s.handleSelf)
http.HandleFunc(s.BasePath+"/devices", s.handleDevices)
http.HandleFunc(s.BasePath+"/sources", s.handleSources)
http.HandleFunc(s.BasePath+"/flows", s.handleFlows)
http.HandleFunc(s.BasePath+"/senders", s.handleSenders)
http.HandleFunc(s.BasePath+"/receivers", s.handleReceivers)
// 個別リソースのハンドラー登録
http.HandleFunc(s.BasePath+"/devices/", s.handleDeviceByID)
// 他のリソースタイプに対する同様のハンドラーも登録できます
// サーバーの起動
addr := fmt.Sprintf(":%d", s.Port)
log.Printf("Starting Node API server on %s", addr)
return http.ListenAndServe(addr, nil)
}
// レスポンスを返す共通関数
func (s *APIServer) sendResponse(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
jsonData, err := json.Marshal(data)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
log.Printf("Error marshaling JSON: %v", err)
return
}
w.Write(jsonData)
}
// ルートハンドラー
func (s *APIServer) handleRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != s.BasePath+"/" && r.URL.Path != s.BasePath {
http.NotFound(w, r)
return
}
// APIのバージョン情報を返す
response := map[string]string{
"version": s.Version,
}
s.sendResponse(w, response)
}
// selfハンドラー
func (s *APIServer) handleSelf(w http.ResponseWriter, r *http.Request) {
node := s.Store.GetNode()
if node == nil {
http.Error(w, "Node not found", http.StatusNotFound)
return
}
s.sendResponse(w, node)
}
// devicesハンドラー
func (s *APIServer) handleDevices(w http.ResponseWriter, r *http.Request) {
devices := s.Store.GetDevices()
s.sendResponse(w, devices)
}
// 個別デバイスハンドラー
func (s *APIServer) handleDeviceByID(w http.ResponseWriter, r *http.Request) {
// URLからデバイスIDを抽出
parts := strings.Split(r.URL.Path, "/")
if len(parts) < 5 {
http.NotFound(w, r)
return
}
deviceID := parts[len(parts)-1]
device := s.Store.GetDevice(deviceID)
if device == nil {
http.Error(w, "Device not found", http.StatusNotFound)
return
}
s.sendResponse(w, device)
}
// 他のリソースタイプのハンドラー
// 簡略化のため、これらは空のリストを返すだけの実装としています
func (s *APIServer) handleSources(w http.ResponseWriter, r *http.Request) {
s.sendResponse(w, []interface{}{})
}
func (s *APIServer) handleFlows(w http.ResponseWriter, r *http.Request) {
s.sendResponse(w, []interface{}{})
}
func (s *APIServer) handleSenders(w http.ResponseWriter, r *http.Request) {
s.sendResponse(w, []interface{}{})
}
func (s *APIServer) handleReceivers(w http.ResponseWriter, r *http.Request) {
s.sendResponse(w, []interface{}{})
}
メイン処理の更新
最後に、Node APIサーバーを起動するためにメイン処理を更新します。main.go
ファイルを更新します:
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
log.Println("Starting NMOS Node...")
// コンテキストの作成
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// リソースストアの作成
store := NewResourceStore()
// IPアドレスの取得
ipAddr, err := getOutboundIP()
if err != nil {
log.Fatalf("Failed to get IP address: %v", err)
}
// ホスト名の取得
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
log.Printf("Failed to get hostname: %v, using default: %s", err, hostname)
}
// Node APIのエンドポイント
nodeAPIPort := 8080
nodeAPIEndpoint := fmt.Sprintf("http://%s:%d/x-nmos/node/v1.3", ipAddr, nodeAPIPort)
// ノードリソースの作成
node := NewNode(hostname, nodeAPIEndpoint)
store.SetNode(node)
// デバイスリソースの作成
device := NewDevice(node.ID)
store.AddDevice(device)
// Node APIサーバーの作成と起動
apiServer := NewAPIServer(store, nodeAPIPort)
go func() {
if err := apiServer.Start(); err != nil {
log.Fatalf("Failed to start API server: %v", err)
}
}()
// RDSの探索(5秒間)
services, err := DiscoverRDS(ctx, 5*time.Second)
if err != nil {
log.Fatalf("Error discovering RDS: %v", err)
}
log.Printf("Found %d RDS services", len(services))
// RDSが見つからなかった場合
if len(services) == 0 {
log.Println("No RDS found, operating in peer-to-peer mode")
// ピアツーピアモードの処理
// この場合でもNode APIサーバーは起動したままにする
} else {
// 最適なRDSを選択
selectedRDS, err := SelectRDS(services)
if err != nil {
log.Fatalf("Error selecting RDS: %v", err)
}
// Registration APIのURLを取得
registrationURL := GetRegistrationAPIURL(selectedRDS)
log.Printf("Selected Registration API: %s", registrationURL)
// Registration APIクライアントの作成
client := NewRegistrationClient(registrationURL)
// ノードの登録
err = client.RegisterResource("node", node)
if err != nil {
log.Fatalf("Failed to register node: %v", err)
}
// デバイスの登録
err = client.RegisterResource("device", device)
if err != nil {
log.Printf("Failed to register device: %v", err)
}
// ハートビートの開始(5秒間隔)
go client.StartHeartbeat(node.ID, 5*time.Second, ctx)
}
// シグナル処理
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 終了シグナルを待機
<-sigChan
log.Println("Shutting down...")
cancel() // コンテキストをキャンセルしてハートビートを停止
time.Sleep(1 * time.Second) // ハートビート停止を待機
}
実行方法
以下のコマンドでプログラムを実行します:
go run main.go discovery.go registration.go models.go store.go api.go
動作説明
- プログラムはまずリソースストアを作成し、ノードとデバイスのリソースを作成します。
- Node APIサーバーを起動し、リソース情報を提供します。
- RDSを探索し、見つかった場合はRegistration APIにリソースを登録します。
- RDSが見つからない場合は、ピアツーピアモードで動作します(Node APIサーバーのみ起動)。
- プログラムは終了シグナル(Ctrl+C)を受け取るまで実行を続けます。
APIのテスト
Node APIサーバーが起動したら、以下のようにcurlコマンドでAPIをテストできます:
# ルートエンドポイント
curl http://localhost:8080/x-nmos/node/v1.3/
# ノード情報
curl http://localhost:8080/x-nmos/node/v1.3/self
# デバイス一覧
curl http://localhost:8080/x-nmos/node/v1.3/devices
# 特定のデバイス情報(実際のデバイスIDに置き換えてください)
curl http://localhost:8080/x-nmos/node/v1.3/devices/your-device-id
CORS対応
実際の運用環境では、ブラウザからのクロスオリジンリクエストを許可するためにCORS(Cross-Origin Resource Sharing)対応が必要になる場合があります。以下のようにCORSヘッダーを追加できます:
// CORSミドルウェア
func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
next.ServeHTTP(w, r)
})
}
// APIサーバーの起動時にCORSミドルウェアを適用
func (s *APIServer) Start() error {
// ハンドラーの登録
mux := http.NewServeMux()
mux.HandleFunc(s.BasePath+"/", s.handleRoot)
// 他のハンドラーも同様に登録
// CORSミドルウェアを適用
handler := corsMiddleware(mux)
// サーバーの起動
addr := fmt.Sprintf(":%d", s.Port)
log.Printf("Starting Node API server on %s", addr)
return http.ListenAndServe(addr, handler)
}
まとめ
この記事では、Go言語を使用してNMOS IS-04に準拠したNodeを開発する方法を解説しました。主な実装ポイントは以下の通りです:
- RDSの探索: DNS-SDを使用してRegistration & Discovery System (RDS)を探索する方法
- Registration APIの利用: 発見したRDSのRegistration APIを使用してリソースを登録する方法
- Node APIサーバーの実装: 自身のリソース情報を提供するAPIサーバーを実装する方法
これらの機能を組み合わせることで、NMOS IS-04に準拠したNodeを実装することができます。実際の運用環境では、より多くのリソースタイプや機能を追加することで、より完全なNMOS Nodeを構築できます。
NMOS IS-04は、ネットワークメディア機器の相互運用性を確保するための重要な仕様です。この記事で解説した実装例を基に、独自のNMOS対応機器やアプリケーションの開発に取り組んでみてください。
拡張ポイント
このコードは基本的な実装を示していますが、以下のような拡張が可能です:
- より多くのリソースタイプ(Source, Flow, Sender, Receiver)の完全な実装
- WebSocketを使用したクエリAPIのサブスクリプション機能
- TLS対応(HTTPSサポート)
- 認証機能の追加
- より堅牢なエラーハンドリングとロギング
- リソースの検証機能
- ピアツーピアモードの完全な実装