LoginSignup
2
1

More than 1 year has passed since last update.

[GStreamer x Rust] gst-plugin-rs でも非同期処理がしたい! 〜 tcpsinkを作ってスケールさせる

Posted at

動機 : 動画配信をスケールさせたい

作ってから少し時間が経ってしまいましたが、GStreamerでSRTでリアルタイムに受けた動画を Warp over WebTransportを使って1対多に超低遅延の動画配信システムの検証をしています。
今回はGStreamerは一つだけとして、多数のユーザが視聴するための Warp over WebTransport の配信エンドポイントを複数作り、2段構成のシステムとしてみます。

なぜマルチレッドではなく非同期処理にするのか

パフォーマンスに顕著な差が出るからですが、GStreamer Conference 2018の資料が詳しいです。

When adding more threads adds more problemsadds more problem

作ったもの : パイプラインとtcpsink

今回はTCPにて fragmented mp4 を配信サーバーにプッシュするため tcpsink というプラグインを作ります。
全体のパイプラインの概略を以下のようになります。SRTで受信したデータを分離・デコード・リサイズして再エンコードしています。
最後に cmafmux にて fmp4 を作ってそれを今回作る tcpsink で送信します。

tcpsink については上位から流れてきたデータをTCPにて渡すだけです。
言葉にして書くととても簡単なことですが、実際はかなり複雑になります。
まず、tcpsink をTCPサーバーにするのかクライアントにするのかを考えなければいけませんが、その先の配信サーバーは常に変化するため、こちらをTCPサーバーにする必要があります。
また、後述しますがGStreamerで非同期処理を行うためにはお作法に従う必要があり、データをやり取りするためのタスクとTCPサーバーのための二つのタスクを実行する必要があります。

タスクには動画データを渡し、またTCPのコネクションリストを共有します。

必要な知識

  • Rustの非同期処理周りの知識
  • gst-plugin-rs での非同期周りの知識

まず、Rustの非同期処理については、Arc<Mutex<Hoge>> や メッセージパッシング、Pin などが理解できていれば後はコンパイラのメッセージとサンプルコードを参考にすればなんとかなるように思います。

リソースを共有する : Arc<Mutext<Hoge>>

複数のスレッドや非同期のタスクから Hoge データにアクセスしますが、一貫性を保つためロックかける必要があります。
そこでまずミューテックスを必要とし、 Mutex<Hoge> となります。
次にこのミューテックスを複数箇所から参照するため、アトミックな参照カウンタのスマートポイントととして Arc<> で囲います。
今回はTCPコネクションリストを共有するためなどに Arc<Mutex<HashMap<SocketAddr, Async<TcpStream>>> のようにデータを持たせています。

データをやり取りする : メッセージパッシング

タスクはデータを非同期に処理するので、データを渡すためにメッセージパッシングを用いています。
データを送るための sender_taskreceiver を持たせ、sink本体からデータを送出しています。

Mutexのロックの範囲に注意する

ミューテックスはロックしたまま await できないためスコープを区切るかロックを明示的に drop() するなど対策が必要です。
はまりどころ1として後述。

所有権

言わずもがなですが、データの持たせ方をよく検討する必要があります。
特に gst:BufferAsync<TcpStream> などそのままではクローンできず所有権をどこで管理するか決めておく必要があります。
今回はデータについては TcpSink本体からタスクにバッファを所有権ごと渡してしまい、TCPコネクションリストについては原則共有することにしつつも、送信処理をするときのみリストをコピーするようにしました。

GStreamerの非同期処理について

そして gst-plugins-rsにて非同期処理をするための方法は下記の threadshare モジュールにまとまっています。
サンプルもいくつかあります。tcpclientsrcudpsink はありますが残念ながら tcpsink はありませんでした。

Rustでは非同期処理に通常は tokio などのランタイムを使いますが、gst-plugins-rs の場合はそのまま使えないようです。
代わりに軽量なRuntimeがこちらにあります。internalになっているのでまるっとコピーして取り込みつつ中身を追っていくととても勉強になります。

Taskについて

gst-plugins-rs で非同期処理を行うためには、runtime::Task という型を使います。

imp.rs
pub struct TcpSink {
    // ...
    send_task: Task, // fmp4 を送信するタスク
    listener_task: Task, // TCP Listener
    sender: Mutex<mpsc::Sender<M4S>>, // これでsend_taskにデータを渡す
}

タスクは TcpImplトレイトを実装する必要があり、prepare() try_next() handle_item() メソッドを主に使います。
try_next() はタスクの実行順番が来たときに呼び出されるため、データ送信処理やコネクション待ち受け処理を実装します。
handle_item()try_next() で処理したアイテムを処理する関数です。

tcp_listener_task : TCPコネクションを受け付ける

TCPサーバを起動してコネクションを待ち受け、TCPコネクションリストに追加します。

imp.rs
struct TcpListenerTask {
    // ...
    listener: Option<Async<TcpListener>>,
    streams: Arc<Mutex<HashMap<SocketAddr, Async<TcpStream>>>>,
}

impl TcpListenerTask {
    // fn new(...) {}
}
impl TaskImpl for TcpListenerTask {
    type Item = ();
    fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
        async move {
            // TcpListenerを起動する
            self.listener = Some(Async::<TcpListener>::bind(self.saddr).map_err(|err| {
                gst::error_msg!(
                    gst::ResourceError::OpenRead,
                    ["failed to listen to {:?}: {:?}", self.saddr, err]
                )
            })?);
            gst::trace!(CAT, obj: self.element, "listening {:?}", self.saddr);
            Ok(())
        }
        .boxed()
    }
    fn handle_action_error() { ... }

    // タスクの実行順番が来たら呼ばれる
    // コネクションを待ち受けて接続されたらメタデータを送信する
    fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
        async move {
            gst::trace!(CAT, obj: self.element, "Start accept.");
            match self.listener.as_ref().unwrap().accept().await {
                Ok((mut stream, saddr)) => {
                    gst::trace!(CAT, obj: self.element, "accept from {:?}", saddr);

            // 必要に応じてメタデータなどを送る
                    stream.write(buf.as_slice()).await.map_err(...);

                    // コネクションリストに追加する
                    let mut streams = self.streams.lock().unwrap();
                    streams.insert(saddr, stream);
                }
                Err(err) => {}
            }
            Ok(())
        }
        .boxed()
    }
    // try_next() で次に処理するデータはないためここでは何もしない
    fn handle_item(&mut self, _: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
        async move { Ok(()) }.boxed()
    }
    fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
        async move { Ok(()) }.boxed()
    }
}

send_task : データを送信する

send_taskは render_list() にて上流から渡されたデータ(fmp4)を全てのTCPコネクションに対して送信します。
そのため、try_next() にてメッセージパッシングでデータを受け取り、その後 handle_item() で送信処理を行います。

imp.rs
struct SendTask {
    // ...
    receiver: mpsc::Receiver<M4S>,
    streams: Arc<Mutex<HashMap<SocketAddr, Async<TcpStream>>>>,
}
impl TcpSinkTask {
    fn new() { ... }
}
impl TaskImpl for TcpSinkTask {
    type Item = FragmentedMP4; // 独自でやり取りするデータ 主にgst::BufferList

    // 特になし
    fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
        async move {
            gst::trace!(CAT, obj: self.element, "Task prepare");
            Ok(())
        }
        .boxed()
     fn handle_action_error() { ... }

    // TcpSink::render_list() からデータを受け取る
    // その後 handle_item() が呼ばれる
    fn try_next(&mut self) -> BoxFuture<'_, Result<M4S, gst::FlowError>> {
        async move {
            gst::trace!(CAT, obj: self.element, "Task try_next");
            Ok(self.receiver.next().await.unwrap())
        }
        .boxed()
    }
    fn handle_item(&mut self, m4s: M4S) -> BoxFuture<'_, Result<(), gst::FlowError>> {
        async move {
            gst::trace!(CAT, obj: self.element, "Task handle_item");

            // 全てのコネクションに対してデータを送信する
            // エラーハンドリングなど煩雑なので一部省略
            let fut1 = join_all(
                           // ↓このままだとコンパイルできない。詳細は後述
                           streams
                            .iter_mut()
                            .map(|(saddr, stream)| async {
                                // データを送信してエラーだったらコネクションリストから削除する
                                if let Err(err) = stream.write_all(buf.as_slice()).await {  
                                    {
                                        let mut s = self.streams.lock().unwrap();
                                        s.remove(saddr);
                                    }
                                };
                            }))
           );
           // タイマーと組み合わせてキャンセル処理などを入れたいが調整中
           fut1.await;
            Ok(())
        }
        .boxed()
    }
}

Taskを準備する

上記で実装したタスクを実際に使用にするには、TcpSink::prepare() にてタスクを準備します。

imp.rs
impl TcpSink {
    fn prepare(&self) -> Result<(), gst::ErrorMessage> {
        // タスクの初期化 (defaultトレイトにて実装)
        //  send_task: Task::default(),
        //  listener_task: Task::default()

        // contextの待ち時間を20msに設定する (0にすると当然CPUを食い潰す)
        let context =
            Context::acquire(&settings.context, std::time::Duration::from_millis(20)).map_err(|err| {
                gst::error_msg!(
                    gst::ResourceError::OpenRead,
                    ["Failed to acquire Context: {}", err]
                )
            })?

        // TCPサーバーを起動する
        ...
        // 共有するTCPコネクション
        let streams = Arc::new(Mutex::new(HashMap::new()));

        let (sender, receiver) = mpsc::channel(10);
        {
            let mut s = self.sender.lock().unwrap();
            *s = sender;
        }
        let _ = self
            .send_task
            .prepare(
                TcpSinkTask::new(self.obj().clone(), receiver, streams.clone()),
                context.clone(),
            )
            .check()?;
        let _ = self
            .listener_task
            .prepare(
                TcpListenerTask::new(self.obj().clone(), saddr, streams),
                context,
            )
            .check()?;

        Ok(()
    }
}

以上で非同期にてデータをTCPで送信できるようになりました。

はまりどころ1 : ロックの範囲に気をつける

SendTask::handle_item() にて全てのTCPコネクションに対してデータを送信するわけですが、ここは一工夫必要です。
なぜかというと、 async move {} でstreamのミューテックのロックをとりますが、ここでは処理が中断される可能性があるためコンパイルが通りません。
言い換えるとミューテックスのロックは他の処理が割り込まないうちに処理を抜ける必要があるようです。
そのため、今回はstreamsを一度コピーしてそれに対して送信処理を行うことにしました。

imp.rs
fn handle_item() ... {
  async move {
            // 一度全てコピーする。ただし Async<TcpStream> はそのままコピーできないので中身をコピーする
            let mut streams = { // <- ここでスコープ区切るかdrop()しないとコンパイルエラーになる。下でawaitしているため。
                self.streams
                    .lock()
                    .unwrap()
                    .iter()
                    .map(|(saddr, stream)| (saddr.clone(), Async::new(stream.get_ref().try_clone().unwrap()).unwrap()))
                    .collect::<HashMap<SocketAddr, Async<TcpStream>>>()
            }
            // 全てのコネクションに対して送信する
            stream.iter_mut().map(|s| {
              let ret = stream.write_all(buf.as_slice()).await;

               // エラー時にここでロックを取ってリストから削除するのはOK
               // awaitで処理を抜ける可能性がないため
               if Err(e) = ret {
                   let mut s = self.streams.lock().unwrap();
                   s.remove(saddr)
               }
            });
 
  }.boxed()
}

はまりどころ2 : write_all() と write()

Rustの io::Writewrite() は全ての書き込みを保証しない。
なので write_all() を使うのだけれどもすっかり忘れていて、「データが途中で破損する!おかしい!」と数日ハマっていました。
他のところとかGoのreadのところとかはちゃんと気をつけていたのに忘れるとすっかり厄介ですね😅

Calls to write are not guaranteed to block waiting for data to be written, and a write which would otherwise block can be indicated through an Err variant.
書き込みの呼び出しは、データが書き込まれるのを待つためにブロックされることは保証されず、そうでなければブロックされる書き込みは、Errバリアントで示すことができます。

The trait also provides convenience methods like write_all, which calls write in a loop until its entire input has been written.
また、write_allのような便利なメソッドも用意されており、入力がすべて書き込まれるまでループでwriteを呼び出します。

その他の部分について

公式のサンプルが充実しているのでそちらを参考にした方が良いとは思いますが、開発にあたっての要点を記載しておきます。

プラグイン側

gst_plugin_version_helperを使う

プラグインをビルドするにはこのモジュールを使うと便利です。

build.rs
fn main() {
    gst_plugin_version_helper::info()
}
Cargo.toml
[package]
name = "tcpsink"
version = "0.2.0"
edition = "2021"
rust-version = "1.63"
description = "tcpsink element only write fragment."
repository = "https://[your repository url]"

[dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" , tag = "0.19.7", features = ["v1_18"]}
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" , tag = "0.19.7", features = ["v1_18"]}

futures = "0.3.21"

[lib]
name = "tcpsink"
crate-type = ["cdylib"]
path = "src/lib.rs"

[build-dependencies]
gst-plugin-version-helper = {  git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs", tag = "0.9.3"}

エントリポイントとなる src/lib.rs でプラグインを複数登録できます。
また、公式のサンプルではアプリケーション本体にバイナリを含めてしまっていますが、コンテナ化していると修正時のリビルドなどが重いため、プラグインは独立したバイナリにするようにしています。

src/lib.rs
mod fmp4mux;
mod tcpsink;

#[allow(dead_code)]
mod runtime;

use gst::glib;

fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
    tcpsink::register(plugin)?;
    fmp4mux::register(plugin)?;
    Ok(())
}

gst::plugin_define!(
    tcpsink,
    env!("CARGO_PKG_DESCRIPTION"),
    plugin_init,
    concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
    "MIT/X11",
    env!("CARGO_PKG_NAME"),
    env!("CARGO_PKG_NAME"),
    env!("CARGO_PKG_REPOSITORY"),
    env!("BUILD_REL_DATE")
);

後は普通に cargo build --release するとビルドできます。

アプリケーション側

アプリケーション側では最初のナビゲートのためにここでもTCPサーバーを立てていますが、ここでは普通に tokio でサーバを立てています。

main.rs
#[tokio::main]
async fn main() -> Result<(), Error> {

  gst::init();

  ... // パイプライン初期化など

 let (tx, rx) = oneshot::channel();
  tokio::spawn(async move {
    tokio::select! {
        _ = async move {
            loop {
                println!("connectiong...");
                match listener.accept().await {
                    Ok((mut stream, saddr)) => {
                        tokio::spawn(async move {
                            // プレイリスト情報とかどのプラグインに繋ぎにいけば良いのかなどのデータを送る
                            println!("connected from {:?}", saddr);
                            if let Err(err) = stream.write(playlist_cloned.as_bytes()).await {
                                println!("error send init data. {:?} {:?}", saddr, err);
                            };
                        });
                    },
                    Err(err) => {
                        println!("err accpet connection. {:?}", err);
                    },
                };
            }
        } => {},
        _ = rx  => {println!("terminate accept loop.");}
    };
    });

    // GStreamerのメインループを実行
    main_loop.run();

    pipeline.set_state(gst::State::Null)?;
    let _ = tx.send(()); // 非同期処理の方を終了させておく

    Ok(())
}

プラグインのつなぎ込みは普通のプラグインと特に変わらず、リンクするだけで使えます。

src/main.rs

       gst:init();
      let pipeline = gst::Pipeline::new(None);

     ... // 必要なエレメントを生成する

       let mux =
            gst::ElementFactory::make_with_name("cmafmux", Some(format!("mux_v_{}", num).as_str()))
                .unwrap();
        let sink = gst::ElementFactory::make_with_name(
            "tcpsink",
            Some(format!("sink_v_{}", num).as_str()),
        )
        .unwrap();

       pipeline.add_many(&[
            ..., // 色々なエレメント
            &mux,
            &sink,
        ])?;
        gst::Element::link_many(&[
            ..., // 色々なエレメント
            &mux,
            &sink,
        ])?;

      ... // padを繋いだりプロパティを設定する

まとめ

GStreamerで非同期処理を行いたい場合、アプリケーション側では tokio などの非同期処理ランタイムを使うことで実現できますが、
プラグイン側では TaskImpl トレイトを実装して threadshare::runtime を使うなど通常とは異なる点がありました。
ただし runtime 自体は数個のファイルなので処理を追って中身を理解しようと試みることはできますし、
ルールに慣れてしまえば不可解な挙動や難しいバグを踏みそうなところもなく、Rustの安全性を生かした開発ができるように思います。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1