Edited at

WebRTCの勉強でむりやりGo使ってみた!

More than 3 years have passed since last update.


はじめに

今回WebRTCの学習にあたりこちらのサイトを参考にさせていただきました。大変参考になりました。

https://html5experts.jp/mganeko/5438/

私の成果物はこちら。

https://github.com/hogehoge-banana/webrtc-test

chromeでしか動作確認しておりません。

あくまでも学習過程での試作ですので、いろいろ不安定だったり、もう使ってないコードが残ってたりすると思いますがご愛嬌ください。


なんでそんな事したの?


経緯


  • もともとWebRTCには興味があったんだけど、OpenSTFっていうプロジェクトに携わてて、この技術が転用できそうだと思ったから。詳細はこちら

  • 単純にGoやってみたかったw


WebRTCって何


  • Web Real Time Communication の頭文字

  • ブラウザ間でp2p通信する仕組み

  • プロトコルはUDPを使用

  • 映像音声を流すためのメディアチャネルと何でもありのデータチャネルなるものがある

  • 接続確立するまえにお互いの紹介所(シグナリングサーバー)が必要

  • NAT超えにはSTUN/TURNサーバーが必要


WebRTCの実装


P2P確立するまでの流れ

P2P接続するまえに接続するための情報(SDP)を通信相手に送らなくてはいけません。この処理は実装者に委ねられるため今回は以下のような仕様で実装しました。

シチュエーションとして東京の会議室で待ってるボブに沖縄のアリスがテレビ会議を開始しようとしていると仮定します。


  • アリスが東京会議室にPorkします。

  • Porkを聞いたボブはpeerコネクションを作成しアリスにofferSDPを送信します

  • オファーを受信したアリスはpeerコネクションを作成し、ボブにanswerSDPを送信します

  • answer SDPを受信したボブは先に作成したpeerコネクションにアリスのSDPを設定してコネクションを確立します

※今回STUN/TURNサーバーは使用してません。なのでまだNAT超えはまだできてません


アリスがPork!


  • enterメッセージをブロードキャスト

communication.png

index.htmlにアクセスするとすぐにlocal videoストリームを開始して会議室へPorkするように実装してます。

front/index.js#L245

  startVideo().then(function(){  // ローカルビデオストリームの開始

enterRoom(); // 会議室入室処理
});

front/index.js#L45

  // ローカルビデオストリームの開始

function startVideo() {
return new Promise(function (resolve, reject) {
navigator.webkitGetUserMedia( // ブラウザにカメラ及びマイクの利用リクエスを送ってます
{video: true, audio: true},
function (stream) { // success
localStream = stream;
:
:(省略)

// pork to all
function enterRoom(){
var roomname = getRoomName(); // url queryから取ってます。
ws.enter(roomname);
}

front/socket.js#L58

 socket.prototype.enter = function(roomname) {

var self = this;
self.roomname = roomname;
return this._enter();
};

//front/socket.js#L101
socket.prototype._enter = function() {
var self = this;
return this.connect().then(function() {
return new Promise(function(resolve, reject){
self.ws.onmessage = function(evt) {
var msg = eval("(" + evt.data + ")");
if (msg.Type === 'enter' && msg.From === self.channel) {
console.log('room ready');
resolve();
}
};

self.ws.onerror = function(evt) {
self._onerror(evt);
reject();
};
self._send('enter', self.roomname, 'bro', self.roomname);
}).then(function(){
self.ready = true;
self.ws.onmessage = function(evt) {
var msg = eval("(" + evt.data + ")");
if (self.channel !== msg.From) {
self.callback(msg.Type, msg)
} else {
console.debug('reject self message')
}
}
});
});
};

会議室にPorkする前にwebsocketの接続処理が必要ですが、接続から会議室への入室まで他のメッセージを受け付けないようにその都度websocketのonmessageを上書きするように実装しております。この辺りはサーバーサイドの仕様も絡むので、下の方にサーバーサイドの処理と合わせて書きます。


ボブがアリスにオファー送信

offersdp.png


  • アリスからenterメッセージを受信したボブはをpeerコネクションを作成しアリスにoffer SDPをユニキャストします

front/index.js#L22

  var ws = new Socket(url, function(type, msg) {

if (type === 'enter') {
sendOffer(msg);
:
:(省略)

//front/index.js#L73

// start the connection upon user request
function sendOffer(data) {
var conn = prepareNewConnection(data.From);
conn.createOffer(function (sessionDescription) {
// in case of success
conn.peer.setLocalDescription(sessionDescription);
sendSDP(data.From, sessionDescription); // 送信元にボブのSDPを返信する
:
:(省略)

//front/index.js#L125
// peer connection
function prepareNewConnection(id) {
:
:(省略)
var pc_config = {"iceServers":[]}; // 今回STUN/TURNサーバーはなし
try {
peer = new webkitRTCPeerConnection(pc_config); //
// send any ice candidates to the other peer
peer.onicecandidate = function (evt) {
:
:(省略)

function sendSDP(id, sdp) {
var text = JSON.stringify(sdp);
textForSendSDP.value = text; // 画面確認用
ws.unicast(id, sdp.type, sdp)
}

※RTCPeerConnection は各ブラウザごとにprefixがあります。以下を参照ください。

https://developer.mozilla.org/ja/docs/Web/API/RTCPeerConnection

※SocketはWebSocketをラッパーした感じに実装してます。

front/socket.js

※ ConnectionはRTCPeerConnectionを管理するための属性を追加したラッパークラスです。

front/connection.js


アリスがボブにアンサー返信

answersdp.png


  • ボブからのofferを受け取ったアリスがpeerコネクションを作成しボブのSDPをセットします。

  • ボブにanswer SDPを返信します

front/index.js

  // Line 22

var ws = new Socket(url, function(type, msg) {
:
:(省略)
} else if (type === 'offer') {
onOffer(msg);
:
:(省略)

// Line 87
function onOffer(msg) {
var sdp = JSON.parse(msg.Msg);
var connection = prepareNewConnection(msg.From);
connection.peer.setRemoteDescription(new RTCSessionDescription(sdp));
addConnection(connection);
sendAnswer(msg);
}

function sendAnswer(evt) {
var conn = getConnection(evt.From) // 通信相手ごとにコネクションを管理
var peer = conn.peer;
peer.createAnswer(function (sessionDescription) { // in case of success
peer.setLocalDescription(sessionDescription);
sendSDP(evt.From, sessionDescription);
:
:(省略)


ボブとアリスとの通信開始

startp2p.png


  • アリスからのSDPを先に作ったPeerコネクションに設定し接続確立

front/index.js

  // Line 22

var ws = new Socket(url, function(type, msg) {
:
:(省略)
} else if (type === 'answer') {
onAnswer(msg);
:
:(省略)

// Line 106
function onAnswer(evt) {
var data = eval("(" + evt.Msg + ')');

var conn = getConnection(evt.From);
conn.peer.setRemoteDescription(new RTCSessionDescription(data));
}

参考サイトではnode.jsの socket.io 使っていましたが、今回サーバーサイドGoでやりたかったので、WebSocket周りでいろいろ独自実装が入ってます。



シグナリング(紹介所)の実装


シグナリングの仕様

先に書いたアリストボブの接続が確立するまでの間、接続に必要なメッセージのやり取りがシグナリングサーバーと呼ばれる紹介所のようなサーバーでなされます。この紹介所の仕様は実装に委ねられており、今回は以下の機能を実装します。


  • 複数人でビデオ会議ができること

  • 複数のビデオ会議室を確保できること

  • node.js使えば簡単そうだけどGoやってみたいから意地でもGoでやること

繰り返しになりますが、参考サイトではnode.jsの socket.io 使ってたのでいろいろオレオレ仕様が入ってます。


紹介所の登場人物


back/signaling/connection.go


  • websocketのコネクションのラッパーみたいな位置づけ


back/signaling/message.go


  • ボブとアリスがやり取りするメッセージ


back/signaling/message_frame.go


  • メッセージの送信に必要な情報とメッセージを管理する


back/signaling/server.go


  • メインメソッドみたいなの(こいつだけ構造体ポインタ使ってない)


仕様


  • メインメソッドでwebsocketのハンドラーを登録してサーバー実行

  • webSocket接続が来たら、Connectionを生成して一意キーを割り振りマップで確保する

  • メッセージ送信用チャネルを用意してハンドラーを実行する

  • ループを作成して、websocketからのメッセージフレームを処理する

  • 入室要求が来たらコネクションに会議室を割り当てる



メインメソッドでwebsocketのハンドラーを登録してサーバー実行

back/signaling/server.go


var conns map[string]*Connection

func Run() {
conns = make(map[string]*Connection)
http.HandleFunc("/ws", func(w http.ResponseWriter, req *http.Request) {
s := websocket.Server{Handler: websocket.Handler(wsHandler)}
s.ServeHTTP(w, req)
})
if err := http.ListenAndServe(SERVE_WS, nil); err != nil {
panic("ListenAndServe: " + err.Error())
}
}


  • ほとんどwebに転がってるソースのコピペなので細かい説明は割愛させていただきます。 オリジナルはコネクションを var conns map[string]*Connection で管理してるとこくらいかな。Connectionはポインタを管理してます。


webSocketハンドラーの実装


  • コネクション作成

  • マップに保存

  • メッセージハンドラー実行


func wsHandler(ws *websocket.Conn) {
conn := NewConnection(ws)
conns[conn.Channel] = conn
wsMsgHandler(conn)
}


  • goにはコンストラクタの概念がなく New + "型名" の名前の関数で初期化するのが慣習らしいです。

  • conn.Channelはコンストラクタの中でUUID発行してます。

  • wsMsgHandlerの中は後に書きます


コネクション作成の中(NewConnection)の処理


  • メッセージ送信チャネル作成

  • チャネルハンドラー実行

back/signaling/connection.go

// 構造体を定義

type Connection struct {
Channel string // コネクションの識別子
Conn *websocket.Conn // websocketのコネクション
Room string // 会議室名
sendChannel chan Messagee // チャネル
}

func NewConnection(ws *websocket.Conn) *Connection {
channel := uuid.NewV4().String()
connection := &Connection{ // "&" をつけてポインタを返す
Channel: channel,
Conn: ws,
sendChannel: make(chan Message)} //チャネル作成
go channelHandler(connection) // チャネルハンドラー実行
msg := Message{
Type: TYPE_CONNECTED,
Msg: channel}
connection.SendMessage(msg, connection) // コネクションの識別子を通知
return connection
}


  • ややこしくて恐縮ですが、コネクションの識別にChannel(チャネル)って名前をつけちゃいました。

  • 他のコネクションからのメッセージを受信するためのチャネル(sendChannel chan Messagee)を生成してます。名前がややこしくてすみませんw


    • 私はこのチャネルこそがGoの醍醐味ではないかと思っております。(他がまだ見えてないだけかも)



ここでポイントなのが、チャネルのハンドラーの実行とコネクション識別子の通知処理の順序です。チャネルはスレッド間の通信の仕組みで非同期っぽく動きますが、チャネルのハンドラーのackをまっているのか、ハンドラーが実行されてないチャネルにメッセージを投げるとデッドロックになってしまいます。(当初デッドロックになってて少しハマりました。)


websocketメッセージの処理


  • クライアントからのメッセージはMessageFrameの構造でやり取りしており、MessageFrameの送信先種別に応じて、会議室へのブロードキャストと個人宛へのユニキャストを切り替えてます

back/signaling/message_frame.go

// 送信先種別 

const (
DestTypeUnicast = "uni" // ユニキャスト
DestTypeBroadcast = "bro" // 本当のブロードキャスト(使ってないw)
DestTypeRoom = "room" // 会議室内のブロードキャスト (Porkするときに使う)
)

type MessageFrame struct {
Type string // 送信先種別
Dest string
Message Message
}

// MessageFrameはMessageを包有しております。
// back/signaling/message.go
type Message struct {
Type string // メッセージ種別 (offerだったりanswerだったり)
Msg string // メッセージ(jsonだったりplanだったり
From string // 送信元(返信先)
}

back/signaling/server.go

func wsMsgHandler(conn *Connection) {

for {
var frame MessageFrame
err := websocket.JSON.Receive(conn.Conn, &frame)
if err != nil {
fmt.Println(err)
onDisconnected(conn) // 基本接続切れたらここに来る。そうでない場合はバグ
return
}

fmt.Printf("Received: %s from[%s]\n", frame.Type, conn.Channel)

if frame.Message.Type == TYPE_ENTER {
onEnter(conn, frame) // アリスからのPork
} else {
frame.Message.From = conn.Channel
switch frame.Type {
case DestTypeUnicast:
// ユニキャスト
conns[frame.Dest].SendMessage(frame.Message, conn)
default:
// その他は会議室指定でブロードキャスト
broadcastRoom(frame.Message, frame.Dest, conn)
}
}
}
}

// 会議室ブロードキャスト

func broadcastRoom(msg Message, room string, from *Connection) {
fmt.Printf("------ broadcast room:[%s] from:[%s] type[%s] --------\n", room, from.Channel, msg.Type)
// goのループの書き方シンプルでステキ
for _, conn := range conns {
if conn.IsJoiningRoom(room) {
conn.SendMessage(msg, from)
}
}
}

受信したメッセージの送信処理はConnectionのチャネルを使用して処理しております。別スレッドに型セーフでメッセージが送受信できる仕組みが個人的にお気に入りです。

back/signaling/connection.go

// メッセージ送信処理

// ポインタや構造体に関数定義するときは `func (対象ポインタのalias *型) 関数名() { `
// Line 50
func (self *Connection) SendMessage(msg Message, from *Connection) {
msg.From = from.Channel
self.sendChannel <- msg // チャネルに放り込む
}

// Connectionのチャネルの処理
// Line 32
func channelHandler(self *Connection) {
for msg := range self.sendChannel { // チャネルからメッセージ取り出しループ
fmt.Printf("Sending %s channel:%s\n", msg.Type, self.Channel)
websocket.JSON.Send(self.Conn, msg)
}
}


会議室入室まで(ボブにPork送るまで)の流れ


まとめ


  • WebRTC、思ってたより簡単でした (NAT超えやってないから?)


  • goのchannelの送受信は半非同期。便利だけどデッドロックに注意が必要(チャネル作成と同時にハンドラー実行する癖つければ問題ないので慣れの問題かなと) 詰まってるだけでした。詳細は下記

  • 初めてGo書いたけど、コンパイラがとても厳しくてSな品質管理者にもMな開発者にピッタリだと感じました!w


2016-03-21 twitterでデッドロックについて指摘があったので追記と修正

twitterでこんな指摘がありました。

そういえば Tour of Go の Buffered Channels のとこにそんな記述がありましたね。


バッファが詰まった時は、チャネルへの送信をブロックします。 バッファが空の時には、チャネルの受信をブロックします。


ということで順番を入れ替えて再実行してみました

まずはデッドロックが起きるパターン

func NewConnection(ws *websocket.Conn) *Connection {

channel := uuid.NewV4().String()
connection := &Connection{
Channel: channel,
Conn: ws,
sendChannel: make(chan Message)} // バッファ設定してない
msg := Message{
Type: TYPE_CONNECTED,
Msg: channel}
fmt.Println("チャネルにPUT実行")
connection.SendMessage(msg, connection)
fmt.Println("チャネルにPUT完了")
fmt.Println("ゴルーチン実行")
go channelHandler(connection)
return connection
}

実行結果

$ go run server.go 

チャネルにPUT実行

デッドロックしてます。

チャネル作成時にバッファを設定するようにしました。

func NewConnection(ws *websocket.Conn) *Connection {

channel := uuid.NewV4().String()
connection := &Connection{
Channel: channel,
Conn: ws,
sendChannel: make(chan Message, 1)} // バッファ設定
msg := Message{
Type: TYPE_CONNECTED,
Msg: channel}
fmt.Println("チャネルにPUT実行")
connection.SendMessage(msg, connection)
fmt.Println("チャネルにPUT完了")
fmt.Println("ゴルーチン実行")
go channelHandler(connection)
return connection
}

実行結果

$ go run server.go 

チャネルにPUT実行
チャネルにPUT完了
ゴルーチン実行
:
:

いい感じになりました!

@_atton さんご指摘ありがとうございました!

勉強会で発表するってホントいい勉強になりますね。発表してなければこんな指摘受けれなかったでしょう。

次も楽しみです!

https://okinawa-go.doorkeeper.jp/