はじめに
まず、CLIでビデオ会議(GUI)?相反するタイトルですが細かい事は気にしないで下さい。。
実は当初の目的はタイトルのものを作りたかった訳ではなく、在宅勤務下でも社員同士が双方向通信できるツールが作りたかったのと、ツールに組み込みやすいようにCLIで実行できる形で実現したかったという考えがありました。
在宅勤務下なのでネットワークが別という事もあり、NAT超えをする必要もあるのも課題です。
今回はその目的と課題を解決するための技術選定と簡単なサンプル実装を行ったので、紹介したいと思います。
技術選定
NAT越えの双方向通信といえば、すぐに思い浮かぶのはWebRTCです。
WebRTCには動画・音声だけでなくDataChannelという独自のデータを送る機能も用意されているので、この機能を使えばツールでも活用できそうです。
CLI縛りがあるのでブラウザは使えないため、今回はCLIでWebRTCが使えるpion-webrtcというライブラリを使う事にしました。
テキストチャットの実装
pion-webrtcが提供しているサンプル実装に、CLI上で双方向通信を行いDateChannelを使ってテキストを送受信するサンプル実装があったので、こちらを参考にテキストチャットを実装してみたいと思います。
シグナリングサーバー
WebRTCを使うにはシグナリングサーバーという通信相手の情報を相互に交換するためのサーバーが必要ですが、pion-webrtcのサンプル実装ではローカルネットワーク内での通信でしか使えない簡易的なものだったので、外部ネットワークでも使えるものを自作します。
シグナリングサーバーの機能としてはクライントから受信したデータを他に接続しているクライアントにリレーするだけの機能が最低限あれば問題ないです。
今回はシグナリングサーバーが実装しやすいWebSocketを使って実装しました。
実装についてはシンプルなものなのでgithubをご覧下さい。
シグナリングサーバーを起動する
シグナリングサーバーは全てのクライアントが接続できる場所に置く必要があります。
今回はEC2上に配置しました。
tcp:8080
ポートを使用するのでポートの開放を行って下さい。
ビルドと起動方法は以下です。
$ git clone git@github.com:nishina-y/p2p_cli_sample.git
$ cd p2p_cli_sample/signaling_server_sample
$ go build
$ ./signaling_server_sample
TURNサーバー
ネットワーク環境によってはNAT超えできない場合があり、その場合WebRTCではTURNサーバーを経由した通信を行います。
TURNサーバーはいくつかOSSのものがありますが、今回はpion-webrtcと同じプロダクトの一つのpion-turnを使いました。
今回は実装は行わずに、pion-turnのレポジトリにあるサンプル実装をそのまま使用する事にします。
TURNサーバーを起動する
TURNサーバーもシグナリングサーバーと同様に全てのクライアントが接続できる場所に置く必要があります。
3478
,49152-65535
ポートを使用するのでポート開放も行って下さい。
以下がビルドと起動方法です。
コマンドラインパラメータにはサーバーのグローバルIPとTURNサーバーにログインするためのユーザーネームとパスワードを渡して下さい。
$ git clone https://github.com/pion/turn
$ cd turn/examples/turn-server/simple
$ go build
$ ./simple -public-ip {GLOBAL_IP} -users username=password
使用したサンプル実装はhttpでユーザーネームとパスワードを平文で送信していますが、セキュアな構成も可能なので詳しくはpion-turnのgithubをご覧下さい。
テキストチャットのクライアントの実装
まずシグナリングサーバーとの接続及びデータの送受信を行うための簡単なクラスを作成します。
package main
import (
"bytes"
"log"
"math"
"net/url"
"strings"
"github.com/gorilla/websocket"
)
type SignalingClient struct {
conn *websocket.Conn
}
// シグナリングサーバーと接続を行う
func (c *SignalingClient) connection(addr *string, onReceive chan string) {
u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
c.conn = conn
done := make(chan struct{})
// 受信用ゴールーチン
go func() {
defer close(done)
var offset int
var receiveMessage string
for {
_, message, err := c.conn.ReadMessage()
receiveMessage += string(message)
// 雑に改行コードを1つのメッセージの区切りとしているので、改行コード区切りでメッセージを処理する
if bytes.Index(message, []byte("\n")) == -1 {
continue
}
offset = 0
for {
nextOffset := strings.Index(receiveMessage[offset:], "\n")
if nextOffset == -1 {
break
}
nextOffset += offset
// 受信イベントを発火
onReceive <- receiveMessage[offset:nextOffset]
offset = nextOffset + 1
}
receiveMessage = ""
}
}()
}
// シグナリングサーバーにデータを送信
func (c *SignalingClient) textMessage(message string) error {
// メッセージの送信容量制限があるため500byteごとに分割して送信
messageByte := []byte(message)
messageLen := len(messageByte)
frameSize := 500
frameCount := len(messageByte)/frameSize + 1
for i := 0; i < frameCount; i++ {
start := i * frameSize
end := int(math.Min(float64(start+frameSize), float64(messageLen)))
text := message[start:end]
if i == frameCount-1 {
// 末尾に改行コードを追加
text += "\n"
}
// メッセージを送信
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(text)); err != nil {
return err
}
}
return nil
}
// 切断する
func (c *SignalingClient) close() error {
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
return err
}
c.conn.Close()
return nil
}
シグナリングサーバーとの接続を行います。
signalingClient = &SignalingClient{}
signalingClient.connection("{シグナリングサーバーのIPアドレス}:8080", onReceive)
defer signalingClient.close()
TURNサーバーの情報をWebRTCの設定に追加します。
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"turn:{TURNサーバーのIPアドレス}:3478"},
Username: "username",
Credential: "password",
CredentialType: webrtc.ICECredentialTypePassword,
},
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
WebRTCの初期化処理を行います。
// PeerConnectionを初期化
peerConnection, err := webrtc.NewPeerConnection(config)
defer peerConnection.Close()
// ICE Candidatesが生成された時に実行されるイベントハンドラ
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
candidatesMux.Lock()
defer candidatesMux.Unlock()
desc := peerConnection.RemoteDescription()
if desc == nil {
pendingCandidates = append(pendingCandidates, c)
} else if onICECandidateErr := signalCandidate(c); onICECandidateErr != nil {
panic(onICECandidateErr)
}
})
接続Offerを送る処理を実装します。
// DataChannelの作成
dataChannel, err := peerConnection.CreateDataChannel("data", nil)
// テキストチャットの送受信処理を設定
setDataChannel(dataChannel)
// Offer SDPを生成します
offer, err := peerConnection.CreateOffer(nil)
// Offer SDPをローカルに記録しておきます
peerConnection.SetLocalDescription(offer)
// Offer SDPをJson化します
payload, err := json.Marshal(offer)
offerJson := string(payload)
// シグナリングサーバーを仲介して接続相手にOffer SDPを送信します。
// SDPとわかるように雑にPrefixを追加して送信します。
signalingClient.textMessage("@SDP:" + offerJson)
シグナリングサーバーからのメッセージを受信する処理を実装します。
OfferSDPの受信処理と、ICE Candidate(通信経路の情報)の受信処理を実装します。
// シグナリングサーバーから受信したメッセージを処理するためのゴールーチン
go func() {
for {
select {
case message := <-onReceive:
if strings.HasPrefix(message, "@SDP:") {
// OfferSDPもしくはAnswerSDPの受信処理
sdpJson := message[5:]
sdp := webrtc.SessionDescription{}
json.Unmarshal([]byte(sdpJson), &sdp)
// 受信したSDPを登録します
sdpErr := peerConnection.SetRemoteDescription(sdp)
if *mode == "answer" {
// Offerを受け取った後にanswerを返す
// Answer SDPを生成
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
}
// Answer ADPをJsonに変換
payload, err := json.Marshal(answer)
// シグナリングサーバーを仲介して接続相手にAnswer SDPを送信します。
// SDPとわかるように雑にPrefixを追加して送信します。
if err := signalingClient.textMessage("@SDP:" + string(payload)); err != nil {
panic(err)
}
if err := peerConnection.SetLocalDescription(answer); err != nil {
panic(err)
}
}
// シグナリングサーバーを仲介して接続相手にICE Candidate(通信経路の情報)を送信します
candidatesMux.Lock()
for _, c := range pendingCandidates {
if onICECandidateErr := signalCandidate(c); onICECandidateErr != nil {
panic(onICECandidateErr)
}
}
candidatesMux.Unlock()
} else if strings.HasPrefix(message, "@CND:") {
// ICE Candidate(通信経路の情報)の受信処理
candidate := message[5:]
// ICE Candidate(通信経路の情報)を登録します
if candidateErr := peerConnection.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
panic(candidateErr)
}
}
case <-done:
return
}
}
}()
// シグナリングサーバーを仲介して接続相手にICE Candidate(通信経路の情報)を送信します
func signalCandidate(c *webrtc.ICECandidate) error {
payload := c.ToJSON().Candidate
// Candidateとわかるように雑にPrefixを追加して送信します。
if err := signalingClient.textMessage("@CND:" + payload); err != nil {
return err
}
return nil
}
DataChannelを使ったテキストチャットの送受信処理を実装します。
func setDataChannel(d *webrtc.DataChannel) {
// DataChannelの接続タイミングで呼ばれるコールバック
d.OnOpen(func() {
for {
// 標準入力からの入力待ち
text := MustReadStdin()
// 接続相手に入力されたテキストを送信
d.SendText(text)
}
})
// DataChannelでデータを受信した際に呼ばれるコールバック
d.OnMessage(func(msg webrtc.DataChannelMessage) {
// 受信したテキストをprint
fmt.Println("> " + string(msg.Data))
})
}
answer側は接続が確率されてDataChannelの準備が出来た際の呼び出されるコールバックでテキストチャットの処理を設定します。
// DataChannelの準備が出来た際のコールバック関数を登録します
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
// テキストチャットの送受信処理を設定
setDataChannel(d)
})
動かしてみよう
実装したコードをgithubに上げているので実際に動かしてみましょう。
テキストチャットのサンプル実装(github)
①コードを取得してビルドを行う
$ git clone git@github.com:nishina-y/p2p_cli_sample.git
$ cd p2p_cli_sample/text_chat_sample
$ go build
②シグナリングサーバーとTURNサーバーを起動
シグナリングサーバーの説明と、TURNサーバーの説明を参考にサーバーを起動して下さい。
※ ローカルマシン同士で通信する場合はTURNサーバーは不要です
③answer側を起動
$ ./text_chat_sample \
--addr {シグナリングサーバーのIPアドレス}:8080 \
--mode answer
④offer側を起動
新たにターミナルを開くか、別のマシンで起動して下さい。
$ ./text_chat_sample \
--addr {シグナリングサーバーのIPアドレス}:8080 \
--mode offer
数秒待つと接続が完了して、CLI上でテキストチャットを行う事ができます。
動画配信の実装
せっかくWebRTCを使っているので動画配信機能を付けてビデオ会議システムっぽいものを作って見ようと思います。
※ これから紹介する実装はmacでしか確認をとっていません。
pion-webrtcを使った動画の送受信のサンプル実装を見つけたので、これを参考に実装してみます。
初期設定
まずはCLIからカメラ制御や動画再生を行うためのGStreamerというツールをインストールして下さい。
$ brew install gstreamer \
gst-plugins-base gst-plugins-good \
gst-plugins-bad gst-plugins-ugly
ターミナルから以下のコマンドを実行してmacのセキュリティーとプライバシー設定でターミナルからのカメラとマイクへのアクセス許可を設定して下さい。
$ gst-launch-1.0 avfvideosrc ! autovideosink
$ gst-launch-1.0 osxaudiosrc ! autoaudiosink
実装
テキストチャットのコードに動画・音声を配信するための実装追加します。
video trackを生成してpeerConnectionに登録します。
// ビデオトラックを追加
videoTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: "video/vp8"},
"video", "pion-video")
peerConnection.AddTrack(videoTrack)
// オーディオトラックを追加
audioTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: "audio/opus"},
"audio", "pion-audio")
peerConnection.AddTrack(audioTrack)
接続確立を待ってGStreamerを使ってカメラの映像データとマイクのサウンドデータを接続相手に流し込むゴールーチンを開始します。
go func() {
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// 接続待ち
<-gatherComplete
// カメラの映像データを接続相手に流し込む
gstSrc.CreatePipeline("vp8", []*webrtc.TrackLocalStaticSample{videoTrack}, *videoSrc).Start()
// マイクのサウンドデータを接続相手に流し込む
gstSrc.CreatePipeline("opus", []*webrtc.TrackLocalStaticSample{audioTrack}, *audioSrc).Start()
}()
動画・音声データの受信処理を実装します。
受信した映像・音声データはGStreamerを使って再生されます。
// 動画等のTrackが追加された時に呼び出されるコールバック
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
go func() {
ticker := time.NewTicker(time.Second * 3)
for range ticker.C {
rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}})
}
}()
// 受信した映像をGStreamerを使ってGUIで表示
codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1]
pipeline := gstSink.CreatePipeline(track.PayloadType(), strings.ToLower(codecName))
pipeline.Start()
buf := make([]byte, 1400)
for {
i, _, readErr := track.Read(buf)
pipeline.Push(buf[:i])
}
})
動かしてみよう
実装したコードをgithubに上げているので実際に動かしてみましょう。
動画配信のサンプル実装(github)
①コードを取得してビルドを行う
$ git clone git@github.com:nishina-y/p2p_cli_sample.git
$ cd p2p_cli_sample/video_communication_sample
$ go build
②シグナリングサーバーとTURNサーバーを起動
シグナリングサーバーの説明と、TURNサーバーの説明を参考にサーバーを起動して下さい。
※ ローカルマシン同士で通信する場合はTURNサーバーは不要です
③answer側を起動
$ ./video_communication_sample \
--addr {シグナリングサーバーのIPアドレス}:8080 \
--mode answer \
--video-src 'autovideosrc ! videoconvert' \
--audio-src 'osxaudiosrc ! audioresample ! audio/x-raw, rate=8000'
④offer側を起動
$ ./video_communication_sample \
--addr {シグナリングサーバーのIPアドレス}:8080 \
--mode offer \
--video-src 'autovideosrc ! videoconvert'
--audio-src 'osxaudiosrc ! audioresample ! audio/x-raw, rate=8000'
数秒後に接続相手にカメラ映像と音声が再生されます。
たまに接続したタイミングでエラーになることがあったので、その場合は再度③④の接続を行って下さい。
PCにカメラ・マイクが付いていない場合
PCにカメラ・マイクが付いていない場合は、--video-srcと--audio-srcパラメータを外すとGStreamerが用意しているダミー映像と音声で確認する事ができます。
おわりに
WebRTCのおかげで思ったより少ないコード量で、それっぽいビデオ会議システムが出来上がりました。
今回コードを公開していますが、私はgo初心者なので細かな実装部分は参考にしない方がいいと思います。