RTMPとは
RTMPは、Adobe が開発している Adobe Flash プレーヤーとサーバーの間で、音声・動画・データをやりとりするストリーミングのプロトコルです。
RTMP の主要な利用法は Flash Video を再生することでしたが、低遅延でのストリーミングを実現できることから、ストリーミングサーバーへの映像伝送に使われたりもします。
ネットで調べると大体のLive配信アプリはRTMPで配信し、HLSで視聴しているものが多いようです。
RTMP Handshakeとは
RTMPはTCP上で動きます。
TCPによる接続が確立すると、RTMPの接続をハンドシェイクから行っていきます。
RTMPのハンドシェイクは他のプロトコルとは異なります。
クライアント(接続を開始したエンドポイント)とサーバーはそれぞれ3つの固定長のチャンクを送信します。
これらのチャンクは、クライアントから送信されるものはC0、C1、C2、サーバーから送信されるものは0、S1、S2と呼ばれています。
ハンドシェイクは、クライアントがC0およびC1チャンクを送信することから始まります。
クライアントは、C2を送信する前にS1が受信されるまで待つ必要があります。
また、その他のデータ(音声や映像など)を送信するためにはS2の受信を待つ必要があります。
逆にサーバーはS0、S1を送る前にC0が受け取られる必要があります。(C1の受信まで待機してもOK)
S2を送信するにはC1が受信されるまで待つ必要があります。
そしてクライアントと同様その他のデータを送信するにはC2の受信を待つ必要があります。
図にするとこんな感じです。
しかし、C0とかS0とか言われても正直良くわかりません。
実際の通信を見ると、なるほど、ほんとにC0とかが送られているんだなとわかります。
ほんとにそういう名前なんですねー。
これらのチャンクには決まったフォーマットがありますが、フォーマットに関する説明は 仕様書 の「5.2.2. C0 and S0 Format」「5.2.3. C1 and S1 Format」「5.2.4. C2 and S2 Format」に詳しく書いてあるので割愛します。
こちらの記事も仕様書の日本語版くらいわかりやすかったので参考にしてみてください。
https://developers.cyberagent.co.jp/blog/archives/13739/#handshake
これも実際の通信を見てみると当たり前ですがフォーマット通りのチャンクが送受信されていることがわかります。
ハンドシェークの実装
では実装していきます。
ソケット接続自体は NSStream を使えば簡単です。
let inputQueue = DispatchQueue(label: "NetSocket.input")
let outputQueue = DispatchQueue(label: "NetSocket.output")
var inputStream: InputStream?
var outputStream: OutputStream?
func open(hostname: String, port: Int) {
Stream.getStreamsToHost(
withName: hostname,
port: port,
inputStream: &self.inputStream,
outputStream: &self.outputStream
)
inputQueue.async {
self.inputStream?.delegate = self
self.inputStream?.schedule(in: .main, forMode: .default)
self.outputStream?.delegate = self
self.outputStream?.schedule(in: .main, forMode: .default)
self.inputStream?.open()
self.outputStream?.open()
}
}
読み込みと書き込みについてはそれぞれ InputStream#read
と OutputStream#write
を使って以下のように書けます。
func read() {
outputQueue.async {
let maxLength = Int(UInt16.max)
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: maxLength)
let count = self.inputStream?.read(buffer, maxLength: maxLength) ?? 0
var data = Data()
if 0 < count {
data.append(buffer, count: count)
self.listen(data) // listen(_:)ついては後述
}
}
}
func write(data: Data) {
DispatchQueue.main.async {
let length = data.count
var leftLength = length
data.withUnsafeBytes { (buffer: UnsafePointer<UInt8>) -> Void in
while leftLength > 0 {
let count = self.outputStream?.write(buffer + (length - leftLength), maxLength: length) ?? 0
leftLength -= count
}
}
}
}
Input/OutputのStreamの状態が変化するとStreamDelegate#stream(Stream, handle: Stream.Event)
が呼ばれます。
これで通信可能になったとき( .openCompleted
)と何かを受信したとき( .hasBytesAvailable
)がわかるのでここからチャンクの送受信をおこなっていきます。
ソケットをオープンして通信が可能になったらクライアントからC0とC1を送信します。
C1まで送信するとサーバーからS0、S1、S2が送られてくるので先程の read()
メソッドを使って受信したチャンクを読みます。
func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
switch eventCode {
case .openCompleted:
guard inputStream?.streamStatus == .open, outputStream?.streamStatus == .open else { break }
if aStream == inputStream {
sendC0C1packet()
}
case .hasBytesAvailable:
if aStream == inputStream {
read()
}
case .errorOccurred:
print("errorOccurred")
default:
break
}
}
まずはC0とC1の送信です。
※整数のバイト配列への変換については HaishinKit.swift の ExpressibleByIntegerLiteral+Extension.swift を使わせてもらってます。
func sendC0C1packet() {
var data = Data()
// C0
let protocolVersion: UInt8 = 3
data.append(protocolVersion.data)
// C1
let time = UInt32(0)
data.append(time.bigEndian.data)
let zero = Data([0x00, 0x00, 0x00, 0x00])
data.append(zero)
let size = 1536 - 8 // C1の固定長 - time+zero
var random = Data(count: size)
var _ = random.withUnsafeMutableBytes { bytes in
SecRandomCopyBytes(kSecRandomDefault, size, bytes)
}
data.append(random)
write(data: data)
handshakeStatus = .c0c1
}
次にS0、S1、S2の読み取りです。
先程 read()
メソッドの中に登場した listen(_:)
を実装していきます。
※ HaishinKit.swift を参考にしています。
ハンドシェイクのステータスをenumで管理しています。
先程の sendC0C1packet()
を実行した時点でステータスは .c0c1
になっています。
ハンドシェイクは上述の通りルールが決まっているので、C0とC1を送ったならサーバからはS0、S1が来るはずです。
というわけで、 .c0c1
の状態でサーバーからチャンクを受け取ったらそれはS0、S1だろうということで次はC2を送ります(ステータスは .c2
)。
.c2
の状態でサーバーからチャンクを受け取ったらそれはS1だろうということでこれでハンドシェイクは完了です。
enum HandshakeStatus {
case none
case c0c1
case c2
case done
}
func listen(_ data: Data) {
switch handshakeStatus {
case .none:
break
case .c0c1:
if data.count <= 1536 {
break
}
sendC2packet(data)
handshakeStatus = .c2
var data = data
data.removeSubrange(0... 1536) // S0、S1、S2はいっぺんに送られてくることもあるのでS0、S1を取り除いてS2を渡して再起実行します
if 1536 <= data.count {
listen(data)
}
case .c2:
handshakeStatus = .done // ハンドシェイク完了
case .done:
break
}
}
func sendC2packet(_ s0s1packet: Data) {
var data = Data()
let time = s0s1packet.subdata(in: 1..<5)
data.append(time)
let time2 = UInt32(Date().timeIntervalSince1970)
data.append(time2.bigEndian.data)
let random = s0s1packet.subdata(in: 9..<1536+1)
data.append(random)
write(data: data)
handshakeStatus = .c2
}
まとめ
RTMP配信の序章を実装してみましたが、ハンドシェイクの仕様を理解するのが一番のハードルでした。
通信処理自体は NSStream
を使えば簡単に書けてしまうので実装自体はそう難しくなかったと思います。
ハンドシェイクの段階ではまだ受信したデータのパース処理が無いので(比較的)楽ですね。
しかしハンドシェイクが終わっても、このあとクライアントからconnectコマンドメッセージを送信してサーバーからresultを受信して今度はクライアントから...みたいな感じで音声や映像を送信するまではまだ道が長いです。。
自前で実装するのはかなりめんどくさいですが、仕様書や実際の通信をキャプチャして仕様を理解していくのは面白いなと思いました。
参考
- Real-Time Messaging Protocol (RTMP) specification (Version 1.0)
http://wwwimages.adobe.com/www.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf - RTMP 1.0 準拠のサーバーをGo言語で実装する
https://developers.cyberagent.co.jp/blog/archives/13739/#c0s0format - shogo4405/HaishinKit.swift
https://github.com/shogo4405/HaishinKit.swift