はじめに
今回は、Go言語とUDP(User Datagram Protocol)を使用して、Webカメラからリアルタイムでフレームをキャプチャし、ネットワーク越しにストリーミングする簡単なアプリケーションを作成します。
前提条件
この記事では以下の前提知識が必要です
- Go言語の基本的な文法
- ネットワークの基本的な概念
実装するもの
サーバー (server.go)
- Webカメラからフレームをキャプチャ
- 画像データをUDPで送信
クライアント (client.go)
- サーバーからUDPパケットを受信
- 受信した画像を表示
実装
ディレクトリ構成
.
├── client
│ └── client.go
└── server
└── server.go
必要なライブラリ
プロジェクトを進めるために、以下のライブラリをインストールします
brew install opencv
server.go
コネクションの確立
handleClientRegistration
関数を実装します。この関数は、新しいクライアントを追跡し、登録します。
type Client struct {
addr *net.UDPAddr
}
var (
clients = make(map[string]*Client)
clientsMu sync.Mutex
currentImageID uint32
)
func handleClientRegistration(conn *net.UDPConn) {
buffer := make([]byte, 65507) // 最大UDPパケットサイズに合わせる
for {
_, remoteAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Println("Read error:", err)
continue
}
clientsMu.Lock()
clientKey := remoteAddr.String()
clients[clientKey] = &Client{addr: remoteAddr}
clientsMu.Unlock()
log.Printf("New client registered: %s", clientKey)
}
}
1. バッファの準備
UDPでは、最大ペイロードサイズが65,507バイトに制限されているため、このサイズに設定しています
buffer := make([]byte, 65507) // 最大UDPパケットサイズに合わせる
2. 無限ループでクライアント接続を待機
ReadFromUDP
は、受信したデータのサイズ _
と、送信元のアドレス情報 remoteAddr
を返します
for {
_, remoteAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Println("Read error:", err)
continue
}
}
3. クライアント情報の登録
clientsMu.Lock()
, clientsMu.Unlock()
で排他制御を行い、remoteAddr.String()
(クライアントのIPアドレスとポートの文字列)をclient情報として保存します
clientsMu.Lock()
clientKey := remoteAddr.String()
clients[clientKey] = &Client{addr: remoteAddr}
clientsMu.Unlock()
ストリーミング
streamImages
関数では、WebカメラからキャプチャしたフレームをJPGとしてbyteにエンコードします
func streamImages(webcam *gocv.VideoCapture, conn *net.UDPConn) {
img := gocv.NewMat()
defer img.Close()
for {
if ok := webcam.Read(&img); !ok {
log.Println("Error capturing frame")
continue
}
// 画像が空でないことを確認
if img.Empty() {
log.Println("Captured frame is empty")
continue
}
// デバッグ情報の出力
size := img.Size()
log.Printf("Captured frame size: %dx%d", size[0], size[1])
// JPEG形式でエンコード
buf, err := gocv.IMEncode(gocv.JPEGFileExt, img)
if err != nil {
log.Println("Encoding error:", err)
continue
}
// 画像をbyteにする
imageData := buf.GetBytes()
// 各クライアントに送信
clientsMu.Lock()
for _, client := range clients {
sendImage(conn, imageData, client)
}
clientsMu.Unlock()
time.Sleep(33 * time.Millisecond)
}
}
1. フレームキャプチャ
webcam.Read(&img)
メソッドを使用して、Webカメラから連続的にフレームをキャプチャします。
if ok := webcam.Read(&img); !ok {
log.Println("Error capturing frame")
continue
}
Read()
メソッドは、成功時にtrue
、失敗時にfalse
を返します
キャプチャに失敗した場合は、ログを出力し、次に進みます
2. 画像のエンコード
OpenCVのIMEncode
関数を使用して、キャプチャしたフレームをJPEG形式に変換します。
buf, err := gocv.IMEncode(".jpg", img)
.jpg
拡張子を指定することで、JPEG形式でエンコードします
エンコードに失敗した場合は、エラーをログに出力します
3. クライアントへの送信
登録されているすべてのクライアントにパケットを送信します。
clientsMu.Lock()
for _, client := range clients {
sendImage(conn, imageData, client)
}
clientsMu.Unlock()
スレッドセーフなclientsマップへのアクセス
各クライアントにパケットを送信
送信エラーをログに出力
4. フレームレート制御
time.Sleep()を使用して、約30FPSに制限します。
time.Sleep(33 * time.Millisecond) // 約30FPSに制限
1秒間に30フレーム表示するための待ち時間を設定
画像をパケットで送信する
type PacketHeader struct {
ImageID uint32
SequenceNum uint16
TotalPackets uint16
PayloadSize uint16
Checksum uint32
}
func sendImage(conn *net.UDPConn, imageData []byte, client *Client) {
packetSize := 1024
imageID := atomic.AddUint32(¤tImageID, 1)
totalPackets := uint16((len(imageData) + packetSize - 1) / packetSize)
log.Printf("imageID: %d", imageID)
log.Printf("totalPackets: %d", totalPackets)
log.Printf("packetSize: %d", len(imageData))
for seqNum := uint16(0); seqNum < totalPackets; seqNum++ {
start := int(seqNum) * packetSize
end := start + packetSize
if end > len(imageData) {
end = len(imageData)
}
if start == 0 {
log.Printf("First 16 bytes of encoded image: % x", imageData[:16])
}
payload := imageData[start:end]
header := PacketHeader{
ImageID: imageID,
SequenceNum: seqNum,
TotalPackets: totalPackets,
PayloadSize: uint16(len(payload)),
Checksum: crc32.ChecksumIEEE(payload),
}
// ヘッダーとペイロードをシリアライズ
var packet bytes.Buffer
binary.Write(&packet, binary.BigEndian, header)
packet.Write(payload)
_, err := conn.WriteToUDP(packet.Bytes(), client.addr)
if err != nil {
log.Printf("Error sending to client %v: %v", client.addr, err)
}
}
}
パケット構造の設計
-
ImageID
: 異なる画像フレームを識別 -
SequenceNum
: パケットの順序を保証 -
TotalPackets
: クライアントが全パケットを受信したか確認 -
PayloadSize
: 実際のデータサイズを把握 -
Checksum
: データの破損を検出
パケット分割と送信プロセス
1. パケットサイズの決定
- 1024バイトのペイロードサイズを使用
- UDPの最大パケットサイズ(65,507バイト)より十分小さく設定
2. 画像の分割処理
start := int(seqNum) * packetSize
end := start + packetSize
if end > len(imageData) {
end = len(imageData)
}
- 画像データを均等なサイズに分割
- 最後のパケットは残りのデータサイズに合わせる
3. パケットの構築
var packet bytes.Buffer
binary.Write(&packet, binary.BigEndian, header)
packet.Write(payload)
- ヘッダーとペイロードを1つのバッファに結合
- バイトオーダーはBigEndianを使用
メイン関数
メイン関数では、Webカメラを開き、UDPサーバーをセットアップし、クライアント登録とイメージストリーミングのゴルーチンを起動します。
func main() {
// OpenCVでWebカメラをオープン
fmt.Println("Starting server...")
webcam, err := gocv.OpenVideoCapture(0)
fmt.Println("webcam = ", webcam)
if err != nil {
log.Fatal("Error opening webcam:", err)
}
defer webcam.Close()
fmt.Println("Connecting to server")
// UDPサーバーセットアップ
addr, err := net.ResolveUDPAddr("udp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// クライアント登録ゴルーチン
go handleClientRegistration(conn)
// 画像ストリーミングゴルーチン
go streamImages(webcam, conn)
select {}
}
1. Webカメラのオープン
gocv.OpenVideoCapture()
を使用してWebカメラデバイスを開きます。
webcam, err := gocv.OpenVideoCapture(0)
引数0
は、最初のWebカメラデバイスを意味します
エラーが発生した場合は、log.Fatal()で処理を中断
2. UDPサーバーのセットアップ
net.ResolveUDPAddr()
とnet.ListenUDP()
を使用してUDPサーバーを作成します。
addr, err := net.ResolveUDPAddr("udp", ":8000")
conn, err := net.ListenUDP("udp", addr)
:8000
は、すべてのネットワークインターフェースの8000番ポートでリッスン
ListenUDP()
は、UDP接続のリスナーを作成
3. ゴルーチンの起動
並行処理のためにゴルーチンを使用します。
go handleClientRegistration(conn)
go streamImages(webcam, conn)
handleClientRegistration
:クライアント登録を非同期に処理
streamImages
:画像ストリーミングを非同期に実行
4. メインスレッドのブロック
select {}
は、メインゴルーチンを無期限にブロックします。
select {}
プログラムを終了させず、他のゴルーチンの実行を継続
client.go
クライアント側では、サーバーからUDPパケットを受信し、受け取った画像データをデコードして表示します。
メイン関数とサーバー接続
func main() {
// サーバーのアドレス解決
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
// ローカルアドレス解決
localAddr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
log.Fatal(err)
}
// UDP接続
conn, err := net.DialUDP("udp", localAddr, serverAddr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// サーバーに登録
_, err = conn.Write([]byte("register"))
if err != nil {
log.Fatal("Registration failed:", err)
}
// 画像表示用のウィンドウを作成
window := gocv.NewWindow("UDP Stream")
defer window.Close()
assembler := NewImageAssembler()
receiveAndDisplay(conn, assembler, window)
}
ImageAssemblerの実装
まず、受信したパケットを組み立てるためのImageAssemblerの構造体と初期化関数を説明します。
type ImageAssembler struct {
currentImage map[uint16][]byte
receivedPackets map[uint16]struct{}
imageID uint32
totalPackets uint16
mu sync.Mutex
}
func NewImageAssembler() *ImageAssembler {
return &ImageAssembler{
currentImage: make(map[uint16][]byte),
receivedPackets: make(map[uint16]struct{}),
}
}
func (ia *ImageAssembler) reset() {
ia.currentImage = make(map[uint16][]byte)
ia.receivedPackets = make(map[uint16]struct{})
}
パケット受信と画像表示
func receiveAndDisplay(conn *net.UDPConn, assembler *ImageAssembler, window *gocv.Window) {
buffer := make([]byte, 65507)
headerSize := binary.Size(PacketHeader{})
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Println("Read error:", err)
continue
}
// ヘッダーとペイロードの分離
var header PacketHeader
headerBuf := bytes.NewReader(buffer[:headerSize])
if err := binary.Read(headerBuf, binary.BigEndian, &header); err != nil {
log.Printf("Header decode error: %v", err)
continue
}
payload := buffer[headerSize:n]
// 画像の組み立てと表示
if completeImage, ok := assembler.addPacket(header, payload); ok {
mat, err := gocv.IMDecode(completeImage, gocv.IMReadColor)
if err != nil {
log.Printf("Image decode error: %v", err)
continue
}
window.IMShow(mat)
if window.WaitKey(1) >= 0 {
mat.Close()
return
}
mat.Close()
}
}
}
パケットの追加と画像の組み立て
func (ia *ImageAssembler) addPacket(header PacketHeader, payload []byte) ([]byte, bool) {
ia.mu.Lock()
defer ia.mu.Unlock()
// 新しい画像が始まった場合はリセット
if header.ImageID != ia.imageID {
log.Printf("New image started: ID=%d, Total packets=%d", header.ImageID, header.TotalPackets)
ia.reset()
ia.imageID = header.ImageID
ia.totalPackets = header.TotalPackets
}
// チェックサムの確認
if crc32.ChecksumIEEE(payload) != header.Checksum {
log.Printf("Checksum mismatch for packet %d of image %d", header.SequenceNum, header.ImageID)
return nil, false
}
// パケットの保存と完全性チェック
copiedPayload := make([]byte, len(payload))
copy(copiedPayload, payload)
ia.currentImage[header.SequenceNum] = copiedPayload
ia.receivedPackets[header.SequenceNum] = struct{}{}
// 全パケットが揃ったかチェック
if len(ia.receivedPackets) == int(ia.totalPackets) {
return ia.assembleImage(), true
}
return nil, false
}
コードの全体像
package main
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"log"
"net"
"sync"
"sync/atomic"
"time"
"gocv.io/x/gocv"
)
type Client struct {
addr *net.UDPAddr
}
type PacketHeader struct {
ImageID uint32
SequenceNum uint16
TotalPackets uint16
PayloadSize uint16
Checksum uint32
}
var (
clients = make(map[string]*Client)
clientsMu sync.Mutex
currentImageID uint32
)
func main() {
fmt.Println("Starting server...")
// WebカメラをOpen
webcam, err := gocv.OpenVideoCapture(0)
if err != nil {
log.Fatal("Error opening webcam:", err)
}
defer webcam.Close()
// UDPサーバーの設定
addr, err := net.ResolveUDPAddr("udp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Println("Server started on :8000")
// クライアント登録の処理
go handleClientRegistration(conn)
// 画像のストリーミング処理
streamImages(webcam, conn)
}
func handleClientRegistration(conn *net.UDPConn) {
buffer := make([]byte, 1024)
for {
n, remoteAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Println("Read error:", err)
continue
}
if string(buffer[:n]) == "register" {
clientsMu.Lock()
clientKey := remoteAddr.String()
clients[clientKey] = &Client{addr: remoteAddr}
clientsMu.Unlock()
log.Printf("New client registered: %s", clientKey)
}
}
}
func streamImages(webcam *gocv.VideoCapture, conn *net.UDPConn) {
img := gocv.NewMat()
defer img.Close()
for {
if ok := webcam.Read(&img); !ok {
log.Println("Error capturing frame")
continue
}
// 画像が空でないことを確認
if img.Empty() {
log.Println("Captured frame is empty")
continue
}
// デバッグ情報の出力
size := img.Size()
log.Printf("Captured frame size: %dx%d", size[0], size[1])
// JPEG形式でエンコード
buf, err := gocv.IMEncode(gocv.JPEGFileExt, img)
if err != nil {
log.Println("Encoding error:", err)
continue
}
imageData := buf.GetBytes()
log.Printf("Encoded image size: %d bytes", len(imageData))
// 最初の数バイトをデバッグ出力
if len(imageData) > 16 {
log.Printf("First 16 bytes of encoded image: % x", imageData[:16])
}
// 各クライアントに送信
clientsMu.Lock()
for _, client := range clients {
sendImage(conn, imageData, client)
}
clientsMu.Unlock()
time.Sleep(33 * time.Millisecond)
}
}
func sendImage(conn *net.UDPConn, imageData []byte, client *Client) {
packetSize := 1024
imageID := atomic.AddUint32(¤tImageID, 1)
totalPackets := uint16((len(imageData) + packetSize - 1) / packetSize)
log.Printf("imageID: %d", imageID)
log.Printf("totalPackets: %d", totalPackets)
log.Printf("packetSize: %d", len(imageData))
for seqNum := uint16(0); seqNum < totalPackets; seqNum++ {
start := int(seqNum) * packetSize
end := start + packetSize
if end > len(imageData) {
end = len(imageData)
}
if start == 0 {
log.Printf("First 16 bytes of encoded image: % x", imageData[:16])
}
payload := imageData[start:end]
header := PacketHeader{
ImageID: imageID,
SequenceNum: seqNum,
TotalPackets: totalPackets,
PayloadSize: uint16(len(payload)),
Checksum: crc32.ChecksumIEEE(payload),
}
// ヘッダーとペイロードをシリアライズ
var packet bytes.Buffer
binary.Write(&packet, binary.BigEndian, header)
packet.Write(payload)
_, err := conn.WriteToUDP(packet.Bytes(), client.addr)
if err != nil {
log.Printf("Error sending to client %v: %v", client.addr, err)
}
}
}
package main
import (
"bytes"
"encoding/binary"
"fmt"
"gocv.io/x/gocv"
"hash/crc32"
"log"
"net"
"sync"
)
type PacketHeader struct {
ImageID uint32
SequenceNum uint16
TotalPackets uint16
PayloadSize uint16
Checksum uint32
}
type ImageAssembler struct {
currentImage map[uint16][]byte
receivedPackets map[uint16]struct{}
imageID uint32
totalPackets uint16
mu sync.Mutex
}
func NewImageAssembler() *ImageAssembler {
return &ImageAssembler{
currentImage: make(map[uint16][]byte),
receivedPackets: make(map[uint16]struct{}),
}
}
func (ia *ImageAssembler) reset() {
ia.currentImage = make(map[uint16][]byte)
ia.receivedPackets = make(map[uint16]struct{})
}
func main() {
// サーバーのアドレス解決
serverAddr, err := net.ResolveUDPAddr("udp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
// ローカルアドレス解決
localAddr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
log.Fatal(err)
}
// UDP接続
conn, err := net.DialUDP("udp", localAddr, serverAddr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Println("Connected to server")
// サーバーに登録
_, err = conn.Write([]byte("register"))
if err != nil {
log.Fatal("Registration failed:", err)
}
// 画像表示用のウィンドウを作成
window := gocv.NewWindow("UDP Stream")
defer window.Close()
assembler := NewImageAssembler()
receiveAndDisplay(conn, assembler, window)
}
func (ia *ImageAssembler) addPacket(header PacketHeader, payload []byte) ([]byte, bool) {
ia.mu.Lock()
defer ia.mu.Unlock()
// 新しい画像が始まった場合はリセット
if header.ImageID != ia.imageID {
log.Printf("New image started: ID=%d, Total packets=%d", header.ImageID, header.TotalPackets)
ia.reset()
ia.imageID = header.ImageID
ia.totalPackets = header.TotalPackets
}
// チェックサムの確認
if crc32.ChecksumIEEE(payload) != header.Checksum {
log.Printf("Checksum mismatch for packet %d of image %d", header.SequenceNum, header.ImageID)
return nil, false
}
log.Printf("Sequence number: %d, Payload First 16 bytes: % x", header.SequenceNum, payload[:16])
// パケットの保存
copiedPayload := make([]byte, len(payload))
copy(copiedPayload, payload)
// `ia.currentImage` にコピーを格納
ia.currentImage[header.SequenceNum] = copiedPayload
if header.SequenceNum >= 1 {
log.Printf("First 16 bytes of encoded image: % x", ia.currentImage[1][:16])
}
ia.receivedPackets[header.SequenceNum] = struct{}{}
log.Printf("Received packet %d/%d for image %d", header.SequenceNum, header.TotalPackets, header.ImageID)
// 全パケットが揃ったかチェック
if len(ia.receivedPackets) == int(ia.totalPackets) {
log.Printf("All packets received for image %d, assembling...", header.ImageID)
var completeImage []byte
// パケットを順番に結合
for i := uint16(0); i < ia.totalPackets; i++ {
if data, ok := ia.currentImage[i]; ok {
completeImage = append(completeImage, data...)
} else {
log.Printf("Missing packet %d in sequence for image %d", i, header.ImageID)
return nil, false
}
}
log.Printf("Image %d assembled, total size: %d bytes", header.ImageID, len(completeImage))
ia.reset()
return completeImage, true
}
return nil, false
}
// クライアント側の receiveAndDisplay 関数の修正
func receiveAndDisplay(conn *net.UDPConn, assembler *ImageAssembler, window *gocv.Window) {
buffer := make([]byte, 65507)
headerSize := binary.Size(PacketHeader{})
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Println("Read error:", err)
continue
}
if n <= headerSize {
log.Printf("Received packet too small: %d bytes", n)
continue
}
// ヘッダーのデシリアライズ
var header PacketHeader
headerBuf := bytes.NewReader(buffer[:headerSize])
if err := binary.Read(headerBuf, binary.BigEndian, &header); err != nil {
log.Printf("Header decode error: %v", err)
continue
}
// ペイロードの抽出
payload := buffer[headerSize:n]
// 画像の組み立てを試行
if completeImage, ok := assembler.addPacket(header, payload); ok {
// 最初の数バイトをデバッグ出力
if len(completeImage) > 16 {
log.Printf("First 16 bytes of complete image: % x", completeImage[:16])
}
// JPEGファイルヘッダーの確認
if len(completeImage) < 2 || completeImage[0] != 0xFF || completeImage[1] != 0xD8 {
log.Printf("Invalid JPEG header")
continue
}
// 画像のデコード
mat, err := gocv.IMDecode(completeImage, gocv.IMReadColor)
if err != nil {
log.Printf("Image decode error: %v", err)
continue
}
if mat.Empty() {
log.Printf("Decoded image is empty")
mat.Close()
continue
}
size := mat.Size()
log.Printf("Successfully decoded image: %dx%d", size[0], size[1])
window.IMShow(mat)
if window.WaitKey(1) >= 0 {
mat.Close()
return
}
mat.Close()
}
}
}
動作確認
依存関係を初期化する
go mod init demo
go mod tidy
サーバーとクライアントを別々のターミナルで起動します
cd server
go run server.go
cd client
go run client.go