9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[ストリーミング技術]RustでGStreamerチュートリアル 3: Dynamic Pipelines

Last updated at Posted at 2020-06-23

第一回: https://qiita.com/kyasbal_1994/items/a1a7d1bd5c4832947a8a

第二回:https://qiita.com/kyasbal_1994/items/a7f2ab0b05f47d96ded3

次回: https://qiita.com/kyasbal_1994/items/82939a756e0f602bbfa2

コード: https://github.com/kyasbal-1994/qiita-gstreamer-rust-tutorial

ゴール

このチュートリアルでは、GStreamerを用いるのに必要な残りの基本的なコンセプトを解説します。アプリケーションの開始時に決められたパイプラインを構築するのではなく、メディアデータの情報が利用可能になり次第パイプラインを動的に構築します。

  • 要素をつなげる際にどのようにしたらきめ細かいコントロールをすることができるか
  • イベントを購読するにはどうしたらよいか
  • 要素がなりうる様々なステート

導入

このチュートリアルではパイプラインがplayingステートになる前に完全に完成していません。これ自体は問題なく、もし適切なアクションをしなければデータはパイプラインの終端に達し、パイプラインがエラーメッセージを創出してストップします。
しかし、ここでは適切なアクションを行ってみます。

このサンプルではmuxedなファイルを取り扱います、この中ではオーディオやビデオがコンテナファイルに一緒に含まれています。このようなコンテナファイルを開くために必要な要素がdemuxerと呼ばれます。
代表的なコンテナファイルはMKV、QT,MOV,OggあるいはASF,WMV,WMAなどの発展的なシステムフォーマットです。

もし、コンテナファイルが複数のストリームを含んでいるとき(例えば一つのビデオに2つのオーディオトラックなど)、demuxerはこれらを別々の出力ポートに分けます。このようにして複数の枝がパイプラインの中に含まれ、別々の型を扱うことが可能です。

これらポートをGStreamerではpadと呼んでいます。sink padから要素にデータが入り、source padからデータが抜けていきます。つまり、source要素ではsource padsしか持たず、sink要素ではsink padしか持ちません。一方でfilter要素はこれら両方を持ちます。

image.png

image.png

image.png

source,filter,sinkのそれぞれのイメージ

demuxerは一つのsink padに対して複数のsource padを保持しています。

image.png

source padを2つ持つdemuxerの例

このチュートリアルで作成するパイプラインではないですが、ここにdemuxerを用いて構成するパイプラインの例を示します。

image.png

2つの枝分かれがあるパイプラインの例

demuxerを扱う際の複雑な点は、メディアデータがdemuxerに到達しコンテナの中を開けるまで何が入っているかわからない点です。これによってdemuxerには初期状態で接続できるsource padがありません。

demuxerは十分な情報を得た際にsource padを生成します。この際にパイプラインを構築することができます。

簡単にするため、ここではオーディオだけを扱ってみます。
まず、今回扱うパイプラインを前回までと同様にgst-launch-1.0を使って再現してみます。

$ gst-launch-1.0 uridecodebin uri="https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm" ! audioconvert ! autoaudiosink

オーディオが再生されたでしょうか。再生されなかった場合にはLinuxであればPulseaudioがインストールされているかなどをチェックしてください。

上記のパイプラインでは、uridecodebinがdemuxerの役割を果たしています。uridecodebinは指定されたuriの表す先からデータを取得し、適切なdemuxを行って後段の要素に流します。今までの説明通り、このuridecodebinも最初はsrc padが存在しません。そのため、コードから扱う場合には適切に気をつけてあげる必要があります。

Dynamic Hello World

今回用いるコードは以下のようなコードになります。

extern crate gstreamer as gst;
use gst::prelude::*;

fn main(){
    gst::init().unwrap();

    // Instanciate elements in pipeline
    let source = gst::ElementFactory::make("uridecodebin", Some("source")).expect("Could not instanciate uridecodebin");
    let convert = gst::ElementFactory::make("audioconvert", Some("convert")).expect("Could not instanciate audioconvert");
    let sink = gst::ElementFactory::make("autoaudiosink", Some("sink")).expect("Could not instanciate audiosink");

    // Instanciate pipeline
    let pipeline = gst::Pipeline::new(Some("test-pipeline"));

    // Add all elements inside of the pipeline
    pipeline.add_many(&[&source,&convert,&sink]).unwrap();
    convert.link(&sink).expect("element could not be linked");
    // It is impossible to link with source and convert here.

    // Set uri property in uridecodebin
    let uri = "https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm";
    source.set_property("uri",&uri).expect("Couldn't set uri property on uridecodebin");

    // Initiate weak pointer to be used in different thread
    let pipeline_weak = pipeline.downgrade();
    let convert_weak = convert.downgrade();
    // Add event listener
    source.connect_pad_added(move |_,src_pad|{
        // Getting actual reference from weak reference if it was not discarded
        let pipeline = match pipeline_weak.upgrade() {
            Some(pipeline)=>pipeline,
            None=>return,
        };
        let convert = match convert_weak.upgrade() {
            Some(convert)=>convert,
            None=>return,
        };
        println!("Received new pad {} from {}",src_pad.get_name(),pipeline.get_name());

        // Obtain the sink_pad from audioconvert element
        let sink_pad = convert.get_static_pad("sink").expect("Failed to get static sink pad from convert");
        if sink_pad.is_linked() {
            println!("We are already linked. Ignoreing");
            return;
        }
        
        // Retrive capability of the elements
        let new_pad_caps = src_pad.get_current_caps().expect("Failed to get caps of new pad");
        let new_pad_struct  = new_pad_caps.get_structure(0).expect("Failed to get first structure of caps");
        let new_pad_type = new_pad_struct.get_name();
        
        // Check this pad is for audio, otherwise, it should be for video
        let is_audio = new_pad_type.starts_with("audio/x-raw");
        if !is_audio{
            println!("It has type {} which is not a raw audio.Ignoreing",new_pad_type);
            return;
        }

        // Link the src pad to sink pad
        let res = src_pad.link(&sink_pad);
        if res.is_err() {
            println!("Type is {} but link failed",new_pad_type);
        }else{
            println!("Link succeeded type {}",new_pad_type)
        }
    });

    // Make pipeline state Playing
    pipeline.set_state(gst::State::Playing).expect("Failed to set the pipeline to the playing state");

    // Obtain the bus and loop while monitor the messages
    let bus = pipeline.get_bus().unwrap();
    for msg in bus.iter_timed(gst::CLOCK_TIME_NONE){
        use gst::MessageView;
        match msg.view(){
            gst::MessageView::Error(err)=>{
                eprintln!("Error received from element {:?} {}",err.get_src().map(|s| s.get_path_string()),err.get_error());
                eprintln!("Debugging information {:?}",err.get_debug());
                break;
            }
            gst::MessageView::StateChanged(state_changed)=>{ // If pipeline state was changed
                if state_changed.get_src().map(|s| s == pipeline).unwrap_or(false){
                    println!("Pipeline state was changed from {:?}| to {:?}",state_changed.get_old(),state_changed.get_current());
                }
            }
            gst::MessageView::Eos(..)=>break,
            _=>()
        }
    }

    //Cleaning up
    pipeline.set_state(gst::State::Null).expect("Failed to set the pipeline state to null");
}

Workthrough

Creating elements

    // Instanciate elements in pipeline
    let source = gst::ElementFactory::make("uridecodebin", Some("source")).expect("Could not instanciate uridecodebin");
    let convert = gst::ElementFactory::make("audioconvert", Some("convert")).expect("Could not instanciate audioconvert");
    let sink = gst::ElementFactory::make("autoaudiosink", Some("sink")).expect("Could not instanciate audiosink");

今までどおりに要素を生成します。uridecodebinは内部的に必要な要素(sources,demuxer,decoderなど)を生成し、URIをraw audioもしくはraw videoのストリーミングに変換する要素です。playbinがやっていたことの手前までを行っていると考えて問題ありません。
この要素はdemuxerを含んでいるため、source padば初期状態では利用できないため、実行時につなげる必要があります。

audioconvertは異なるオーディオフォーマット間で変換するときに便利な要素です。これはautoaudiosinkが想定するオーディオ形式として、すべてのOSではraw audioが利用可能であるとは限らないため挟まれています。

    // Instanciate pipeline
    let pipeline = gst::Pipeline::new(Some("test-pipeline"));

    // Add all elements inside of the pipeline
    pipeline.add_many(&[&source,&convert,&sink]).unwrap();
    convert.link(&sink).expect("element could not be linked");
    // It is impossible to link with source and convert here.

    // Set uri property in uridecodebin
    let uri = "https://www.freedesktop.org/software/gstreamer-sdk/data/media/sintel_trailer-480p.webm";
    source.set_property("uri",&uri).expect("Couldn't set uri property on uridecodebin");

ここでは、パイプラインを作りすべての要素をつなげていますが、convertとsinkをつなげていないことに注意してください
さらに、uridecodebinは読み取るメディアデータの対象をuriで指定できるため、これを用いて取得するメディアデータのURLを指定しています。

Signals

GStreamerの中でSignalは重要な点です。これによって、コールバックによりなにか興味深いことが起きた際にイベントを受け取ることができます。Rustでは、signalに対応したイベントハンドラを登録する関数が生えています。


    // Initiate weak pointer to be used in different thread
    let pipeline_weak = pipeline.downgrade();
    let convert_weak = convert.downgrade();
    
    // Add event listener
    source.connect_pad_added(move |_,src_pad|{
        // Getting actual reference from weak reference if it was not discarded
        let pipeline = match pipeline_weak.upgrade() {
            Some(pipeline)=>pipeline,
            None=>return,
        };
        let convert = match convert_weak.upgrade() {
            Some(convert)=>convert,
            None=>return,
        };
        println!("Received new pad {} from {}",src_pad.get_name(),pipeline.get_name());

        // Obtain the sink_pad from audioconvert element
        let sink_pad = convert.get_static_pad("sink").expect("Failed to get static sink pad from convert");
        if sink_pad.is_linked() {
            println!("We are already linked. Ignoreing");
            return;
        }
        
        // Retrive capability of the elements
        let new_pad_caps = src_pad.get_current_caps().expect("Failed to get caps of new pad");
        let new_pad_struct  = new_pad_caps.get_structure(0).expect("Failed to get first structure of caps");
        let new_pad_type = new_pad_struct.get_name();
        
        // Check this pad is for audio, otherwise, it should be for video
        let is_audio = new_pad_type.starts_with("audio/x-raw");
        if !is_audio{
            println!("It has type {} which is not a raw audio.Ignoreing",new_pad_type);
            return;
        }

        // Link the src pad to sink pad
        let res = src_pad.link(&sink_pad);
        if res.is_err() {
            println!("Type is {} but link failed",new_pad_type);
        }else{
            println!("Link succeeded type {}",new_pad_type)
        }
    });

ここでは、source要素のpadが増えた際のイベントをconnect_pad_added関数を用いて監視しています。この際、この関数にpipelineやconvertを直接渡した場合には、所有権の移動が起こり、その後パイプラインのスタートやメッセージバスの取得などができません。
そこで、glib自身に弱参照の機能が含まれているため利用します。pipelineconvertの弱参照を取得し、イベントの最初に本物の参照に戻します。この際、Noneが帰ってきたら本当の参照が破棄されていることを示します。

        // Getting actual reference from weak reference if it was not discarded
        let pipeline = match pipeline_weak.upgrade() {
            Some(pipeline)=>pipeline,
            None=>return,
        };
        let convert = match convert_weak.upgrade() {
            Some(convert)=>convert,
            None=>return,
        };

さらに、sink padを持つ方のconvert側からsink padへの参照を取得します。動的なpad出ない場合にはstatic padとして名前で取得することが可能です。この際、すでにリンクされていないか確認してリンクをしています。

        let sink_pad = convert.get_static_pad("sink").expect("Failed to get static sink pad from convert");
        if sink_pad.is_linked() {
            println!("We are already linked. Ignoreing");
            return;
        }

さらにsrc padが実際にどのような型のストリームを出力するかチェックします。ここでは、sink_padはaudioconvertのものであるために、videoを出力するsrc padをsink padにつなげることはできません。src_pad.get_current_caps()を用いてpadのcapabilityを取得します。こうして得られたケイパビリティとは現在出力しているデータの型を示しています。
他に出力可能な型を探したい場合にはquery_capsを用いてケイパビリティを探すことができます。一つのpadは複数のケイパビリティを提供することがあるため、new_pad_capsは複数のケイパビリティを含んでいることがあります。ただし、get_current_caps()で得られるstructureは常にひとつのメディアフォーマットだけを表します。

今回は、オーディオに関わるpadだけを取得したいのでまず、gst_pad_structからget_name()を用いてフォーマットのメディアタイプを取得します。

もし、このメディアタイプがaudio/x-rawであるならば、デコードされたオーディオのpadであり、そうでなければここでは無視します。

        // Retrive capability of the elements
        let new_pad_caps = src_pad.get_current_caps().expect("Failed to get caps of new pad");
        let new_pad_struct  = new_pad_caps.get_structure(0).expect("Failed to get first structure of caps");
        let new_pad_type = new_pad_struct.get_name();

        // Check this pad is for audio, otherwise, it should be for video
        let is_audio = new_pad_type.starts_with("audio/x-raw");
        if !is_audio{
            println!("It has type {} which is not a raw audio.Ignoreing",new_pad_type);
            return;
        }

こうして得られたオーディオのpadと、convert要素から取得したsink padをつなげて、pad_addedのsignalへの対処が終了です。

        // Link the src pad to sink pad
        let res = src_pad.link(&sink_pad);
        if res.is_err() {
            println!("Type is {} but link failed",new_pad_type);
        }else{
            println!("Link succeeded type {}",new_pad_type)
        }

GStreamer States

すでにステートについてはPlayingとNullの状態があると述べました。GStreamerには4つのステートが存在します。

State Description
NULL 要素の初期状態
READY PAUSEDに行く準備ができている状態
PAUSED 要素がデータを受け取る、もしくは処理する準備ができている状態
PLAYING クロックが有効であり、データが流れている状態

これらの状態は2つ以上またぐことはできません。もし、NULLから PLAYINGにした場合にはGStreamerが自動的に中間の状態への遷移を追加します。

実際、バスの監視コードの中で以下のコードを追加してステートの遷移を監視しています。

            gst::MessageView::StateChanged(state_changed)=>{ // If pipeline state was changed
                if state_changed.get_src().map(|s| s == pipeline).unwrap_or(false){
                    println!("Pipeline state was changed from {:?}| to {:?}",state_changed.get_old(),state_changed.get_current());
                }
            }

Screenshot from 2020-06-23 22-25-58.png

実行時のログ

まず、ステートがREADYに変化し、padなどが追加され、ReadyからPausedに変化して、最後にPlayingになっているのがわかります。

Conclusion

以下のことが習得できたと思います。

  • どのようにしてイベントをハンドリングするか
  • 要素をつなげるのではなく、特定のPadをどのようにつなげるか
  • GStreamerの要素の様々な状態

次回: https://qiita.com/kyasbal_1994/items/82939a756e0f602bbfa2

9
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?