LoginSignup
9
4

More than 3 years have passed since last update.

TCPサーバーをRustで作成したときの記録

Last updated at Posted at 2020-03-21

RTMPサーバーを作成する過程でTCPサーバーを作成する

RTMPサーバーを作っている途中の記録を残そうと思って書いています。
注意:これらのリポジトリを参考にして作っています。

https://github.com/KallDrexx/rust-media-libs/tree/master/examples/mio_rtmp_server
https://github.com/nareix/joy4

RTMPとは?

RTMPは,Real-Time Messaging Protocolのことで,映像や音声,またはそれ以外のデータをストリーム形式で送受信するときに用いるプロトコルのことです。
TCPベースで作られているので,1対1で通信を行います。
TCPベースなので,とりあえずTCPサーバーだけ立ててみようと思います。

プロトコル

プロトコルは,ざっくりいうと通信を行う際に決められている手順や規格のことです。

RTMPの処理順序

  • Handshake
  • 接続
  • publish(配信を行う場合)
  • play(視聴を行う場合)

つまり,この処理順序で通信を行うサーバーを作ればいいってことですかね!

Handshakeとは

まずHandshakeというのは,TCP通信を行う際に,接続の前に応答確認やタイムスタンプの交換などを行うことを指します。
今回はThree way handshakeというものを最初に行います。これは図のように,
image.png
クライアントとサーバー間で3回通信のやり取りを行ってHandshakeを行います。
クライアントからC0, C1を送信し,サーバーからS0, S1, S2を送信し,最後にまたクライアントからC2を送信して完了となります。
このHandshakeを行ってはじめて接続をすることができます。

では,とりあえずディレクトリを作成します。

mkdir inferno
cd inferno

ディレクトリの中にCargo.tomlを作成し,ワークスペースを作成します。
とりあえずamf0は使うことがわかっているのでライブラリとしてプロジェクトを作成するためにワークスペースに登録しておきます。

Cargo.toml
[workspace]

members = [
  "amf0",
  "inferno-rtmp-engine"
]
cargo new inferno-rtmp-engine --bin
cargo new amf0 --lib

これで最初の準備ができました。

Handshake部分の実装

それではHandshakeの部分を実装していこうと思います。
inferno-rtmp-engine/の方のプロジェクトのsrc/の中に新しくhandshake/フォルダを作成します。その中にerrors.rsmod.rsを作成します。
すると現在のinferno-rtmp-engineのディレクトリ構成は

.
├── Cargo.toml
└── src
    ├── handshake
    │   ├── errors.rs
    │   └── mod.rs
    └── main.rs

このようになっています。

次は中身の部分を書いていきます。

Stage構造体

では,Handshakeの際に自身が実行するべき処理を把握するための構造体Stageを定義します。
※Rustのインデントはスペース4つらしいです。

handshake/mod.rs
mod errors;

#[derive(Eq, PartialEq, Debug, Clone)]
enum Stage {
    WaitingForC0,
    WaitingForC1,
    NeedToSendS2,
    Complete,
}

サーバーを立てる際に使うフォルダserversを作成します。そしてその中にまたmod.rsを作成しておきます。それと一緒に,errors.rsresult.rsも作っちゃいます。
さて,RTMPはTCPベースになっているので,TCPサーバーが立たないといけません。
なのでまずはRustでTCPサーバーを立ててみます。

TCPサーバーの準備

servers/mod.rs
mod errors;
mod result;

use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;

pub struct Server {
    addr: String,
}

impl Server {
    // Some methods to handle connection.
    pub fn listen_and_serve() -> Result<(ServerSession, Vec<ServerSessionResult>), ServerSessionError> {
        // Build a server
    }
}

このようにlisten_and_serve()メソッドを使ってTCPサーバーが立つようにしたいと思います。
ServerSessionなどはまだ定義されていないので,result.rsたちの中に書いていきます。(あとで掲載します)
すると,

servers/mod.rs
mod errors;
mod result;

use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;

pub use self::errors::ServerSessionError;
pub use self::result::ServerSessionResult;

pub struct Server {
    addr: String,
}

impl Server {
    // Some methods to handle connection.
    pub fn listen_and_serve() -> Result<(Server, Vec<ServerSessionResult>), ServerSessionError> {
        // Build a server
        let server = Server {
          addr: "somewhere".to_string(),
        };

        let mut results = Vec::with_capacity(4);

        Ok((server, results))
    }
}

とりあえずこんな感じでlisten_and_serve()メソッドを作ることができました。
メソッドの中身はとりあえずテストが通るように書いただけみたいな感じです。

では,次はメソッドの中身を埋めていきます。
まずはTCPサーバーを立てる必要があるので,その処理を書いていきます。

server/mod.rs
mod errors;
mod result;

#[cfg(test)]
mod tests;

use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;

pub use self::errors::ServerSessionError;
pub use self::result::ServerSessionResult;

pub struct Server {
    addr: Option<String>,
}

impl Server {
    pub fn new() -> Result<(Server, Vec<ServerSessionResult>), ServerSessionError> {
        let server = Server {
            addr: Some("somewhere".to_string()),
        };

        let mut results = Vec::with_capacity(4);

        Ok((server, results))
    }

    pub fn listen_and_serve(self) {
        // Build a server
        let listener = TcpListener::bind("127.0.0.1:1935").expect("Error: Failed to bind");
        println!("Listening...");
        for streams in listener.incoming() {
            match streams {
                Err(e) => { eprintln!("error: {}", e) },
                Ok(stream) => {
                    thread::spawn(move || {
                        handler(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                    });
                }
            }
        }
    }
}

fn handler(mut stream: TcpStream) -> Result<(), Error> {
    println!("Connection from {}", stream.peer_addr()?);
    let mut buffer = [0; 1024];
    loop {
        let nbytes = stream.read(&mut buffer)?;
        if nbytes == 0 {
            return Ok(());
        }

        stream.write(&buffer[..nbytes])?;
        stream.flush()?;
    }
}

これでTCPサーバーが立つようになったので,PythonのスクリプトでTCPサーバーとして動くかどうかを試してみます。

connection.ini
[server]
ip = 127.0.0.1
port = 1935

[packet]
# [bytes]
header_size = 4
# [pixels]
image_width = 64
image_height = 64
stream.py
import socket
import configparser
import logging
import time

logging.basicConfig(level=logging.DEBUG)

config = configparser.ConfigParser()
config.read('./connection.ini', 'UTF-8')

# Connectino Settings
SERVER_IP = config.get('server', 'ip')
SERVER_PORT = int(config.get('server', 'port'))

IMAGE_WIDTH = int(config.get('packet', 'image_width'))
IMAGE_HEIGHT = int(config.get('packet', 'image_height'))
IMAGE_SIZE = IMAGE_WIDTH * IMAGE_HEIGHT

if __name__ == '__main__':
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((SERVER_IP, SERVER_PORT))
        i = 0

        while True:
            s.send(i.to_bytes(IMAGE_SIZE, 'big'))
            logging.info(" Send: " + str(i))
            time.sleep(1)
            i = i + 1

このスクリプトと,Rustのサーバーを同時に起動すると,

Hello, world!
Listening...
Connection from 127.0.0.1:50247

一応テスト用のコードも出しておくと,

server/test.rs
use super::*;

use std::thread;

#[test]
fn bridge_tcp_server() {
    let (server, _result) = Server::new().unwrap();

    thread::spawn(move || {
        server.listen_and_serve();
    });
}

新しくスレッドを作成してlisten_and_serve()を行うことで,テストでたてたTCPサーバーが正常に終了するようにしています。

running 1 test
test tests::it_works ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

     Running target/debug/deps/inferno_rtmp_engine-d3654c5273df6f45

running 1 test
Listening...
test servers::tests::bridge_tcp_server ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

   Doc-tests amf0

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

まとめ

とりあえず今回はTCPサーバーをたてるところまで進めました。
RTMPサーバーを立てる途中の記録なので,TCPサーバーのコードとしては要らない部分がかなり多いと思います。
この開発の目的はRTMPサーバーを作成することなので,また少しずつ進めていきます。

改良版

今回のサーバーの改良: 改良版

参考リンク

http://blog.hirokikana.com/dev/rtmp-client/
https://www.adobe.com/jp/devnet/rtmp.html
https://www.otsuka-shokai.co.jp/words/protocol.html
http://e-words.jp/w/%E3%83%8F%E3%83%B3%E3%83%89%E3%82%B7%E3%82%A7%E3%82%A4%E3%82%AF.html
https://developers.cyberagent.co.jp/blog/archives/13739/
https://cha-shu00.hatenablog.com/entry/2019/03/02/174532
https://users.rust-lang.org/t/how-to-close-tcpconnection-with-a-function/39367/3

参考にしたコード

https://github.com/KallDrexx/rust-media-libs/tree/master/examples/mio_rtmp_server
https://github.com/nareix/joy4/blob/master/format/rtmp/rtmp.go

9
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
4