RTMPサーバーを作成する過程でTCPサーバーを作成する
RTMPサーバーを作っている途中の記録を残そうと思って書いています。
**注意:**これらのリポジトリを参考にして作っています。
https://github.com/KallDrexx/rust-media-libs/tree/master/examples/mio_rtmp_server
https://github.com/nareix/joy4
RTMPとは?
RTMPは,Real-Time Messaging Protocol
のことで,映像や音声,またはそれ以外のデータをストリーム形式で送受信するときに用いるプロトコルのことです。
TCPベースで作られているので,1対1で通信を行います。
TCPベースなので,とりあえずTCPサーバーだけ立ててみようと思います。
プロトコル
プロトコルは,ざっくりいうと通信を行う際に決められている手順や規格のことです。
RTMPの処理順序
- Handshake
- 接続
- publish(配信を行う場合)
- play(視聴を行う場合)
つまり,この処理順序で通信を行うサーバーを作ればいいってことですかね!
Handshakeとは
まずHandshake
というのは,TCP通信を行う際に,接続の前に応答確認やタイムスタンプの交換などを行うことを指します。
今回はThree way handshake
というものを最初に行います。これは図のように,
クライアントとサーバー間で3回通信のやり取りを行ってHandshakeを行います。
クライアントからC0, C1を送信し,サーバーからS0, S1, S2を送信し,最後にまたクライアントからC2を送信して完了となります。
このHandshakeを行ってはじめて接続をすることができます。
では,とりあえずディレクトリを作成します。
mkdir inferno
cd inferno
ディレクトリの中にCargo.toml
を作成し,ワークスペースを作成します。
とりあえずamf0
は使うことがわかっているのでライブラリとしてプロジェクトを作成するためにワークスペースに登録しておきます。
[workspace]
members = [
"amf0",
"inferno-rtmp-engine"
]
cargo new inferno-rtmp-engine --bin
cargo new amf0 --lib
これで最初の準備ができました。
Handshake部分の実装
それではHandshakeの部分を実装していこうと思います。
inferno-rtmp-engine/
の方のプロジェクトのsrc/
の中に新しくhandshake/
フォルダを作成します。その中にerrors.rs
とmod.rs
を作成します。
すると現在のinferno-rtmp-engine
のディレクトリ構成は
.
├── Cargo.toml
└── src
├── handshake
│ ├── errors.rs
│ └── mod.rs
└── main.rs
このようになっています。
次は中身の部分を書いていきます。
Stage構造体
では,Handshakeの際に自身が実行するべき処理を把握するための構造体Stage
を定義します。
※Rustのインデントはスペース4つらしいです。
mod errors;
# [derive(Eq, PartialEq, Debug, Clone)]
enum Stage {
WaitingForC0,
WaitingForC1,
NeedToSendS2,
Complete,
}
サーバーを立てる際に使うフォルダservers
を作成します。そしてその中にまたmod.rs
を作成しておきます。それと一緒に,errors.rs
とresult.rs
も作っちゃいます。
さて,RTMPはTCPベースになっているので,TCPサーバーが立たないといけません。
なのでまずはRustでTCPサーバーを立ててみます。
TCPサーバーの準備
mod errors;
mod result;
use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
pub struct Server {
addr: String,
}
impl Server {
// Some methods to handle connection.
pub fn listen_and_serve() -> Result<(ServerSession, Vec<ServerSessionResult>), ServerSessionError> {
// Build a server
}
}
このようにlisten_and_serve()
メソッドを使ってTCPサーバーが立つようにしたいと思います。
ServerSession
などはまだ定義されていないので,result.rs
たちの中に書いていきます。(あとで掲載します)
すると,
mod errors;
mod result;
use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
pub use self::errors::ServerSessionError;
pub use self::result::ServerSessionResult;
pub struct Server {
addr: String,
}
impl Server {
// Some methods to handle connection.
pub fn listen_and_serve() -> Result<(Server, Vec<ServerSessionResult>), ServerSessionError> {
// Build a server
let server = Server {
addr: "somewhere".to_string(),
};
let mut results = Vec::with_capacity(4);
Ok((server, results))
}
}
とりあえずこんな感じでlisten_and_serve()
メソッドを作ることができました。
メソッドの中身はとりあえずテストが通るように書いただけみたいな感じです。
では,次はメソッドの中身を埋めていきます。
まずはTCPサーバーを立てる必要があるので,その処理を書いていきます。
mod errors;
mod result;
# [cfg(test)]
mod tests;
use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
pub use self::errors::ServerSessionError;
pub use self::result::ServerSessionResult;
pub struct Server {
addr: Option<String>,
}
impl Server {
pub fn new() -> Result<(Server, Vec<ServerSessionResult>), ServerSessionError> {
let server = Server {
addr: Some("somewhere".to_string()),
};
let mut results = Vec::with_capacity(4);
Ok((server, results))
}
pub fn listen_and_serve(self) {
// Build a server
let listener = TcpListener::bind("127.0.0.1:1935").expect("Error: Failed to bind");
println!("Listening...");
for streams in listener.incoming() {
match streams {
Err(e) => { eprintln!("error: {}", e) },
Ok(stream) => {
thread::spawn(move || {
handler(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
}
}
}
}
fn handler(mut stream: TcpStream) -> Result<(), Error> {
println!("Connection from {}", stream.peer_addr()?);
let mut buffer = [0; 1024];
loop {
let nbytes = stream.read(&mut buffer)?;
if nbytes == 0 {
return Ok(());
}
stream.write(&buffer[..nbytes])?;
stream.flush()?;
}
}
これでTCPサーバーが立つようになったので,PythonのスクリプトでTCPサーバーとして動くかどうかを試してみます。
[server]
ip = 127.0.0.1
port = 1935
[packet]
# [bytes]
header_size = 4
# [pixels]
image_width = 64
image_height = 64
import socket
import configparser
import logging
import time
logging.basicConfig(level=logging.DEBUG)
config = configparser.ConfigParser()
config.read('./connection.ini', 'UTF-8')
# Connectino Settings
SERVER_IP = config.get('server', 'ip')
SERVER_PORT = int(config.get('server', 'port'))
IMAGE_WIDTH = int(config.get('packet', 'image_width'))
IMAGE_HEIGHT = int(config.get('packet', 'image_height'))
IMAGE_SIZE = IMAGE_WIDTH * IMAGE_HEIGHT
if __name__ == '__main__':
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((SERVER_IP, SERVER_PORT))
i = 0
while True:
s.send(i.to_bytes(IMAGE_SIZE, 'big'))
logging.info(" Send: " + str(i))
time.sleep(1)
i = i + 1
このスクリプトと,Rustのサーバーを同時に起動すると,
Hello, world!
Listening...
Connection from 127.0.0.1:50247
一応テスト用のコードも出しておくと,
use super::*;
use std::thread;
# [test]
fn bridge_tcp_server() {
let (server, _result) = Server::new().unwrap();
thread::spawn(move || {
server.listen_and_serve();
});
}
新しくスレッドを作成してlisten_and_serve()
を行うことで,テストでたてたTCPサーバーが正常に終了するようにしています。
running 1 test
test tests::it_works ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Running target/debug/deps/inferno_rtmp_engine-d3654c5273df6f45
running 1 test
Listening...
test servers::tests::bridge_tcp_server ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Doc-tests amf0
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
まとめ
とりあえず今回はTCPサーバーをたてるところまで進めました。
RTMPサーバーを立てる途中の記録なので,TCPサーバーのコードとしては要らない部分がかなり多いと思います。
この開発の目的はRTMPサーバーを作成することなので,また少しずつ進めていきます。
改良版
今回のサーバーの改良: 改良版
参考リンク
http://blog.hirokikana.com/dev/rtmp-client/
https://www.adobe.com/jp/devnet/rtmp.html
https://www.otsuka-shokai.co.jp/words/protocol.html
http://e-words.jp/w/%E3%83%8F%E3%83%B3%E3%83%89%E3%82%B7%E3%82%A7%E3%82%A4%E3%82%AF.html
https://developers.cyberagent.co.jp/blog/archives/13739/
https://cha-shu00.hatenablog.com/entry/2019/03/02/174532
https://users.rust-lang.org/t/how-to-close-tcpconnection-with-a-function/39367/3
参考にしたコード
https://github.com/KallDrexx/rust-media-libs/tree/master/examples/mio_rtmp_server
https://github.com/nareix/joy4/blob/master/format/rtmp/rtmp.go