LoginSignup
14
11

More than 3 years have passed since last update.

Rust+async/awaitでノンブロッキングGUI ~ 頓挫編

Last updated at Posted at 2019-08-23

導入

安定化を間近に控えた Rust のasync/await。Rust と言えばサーバーのイメージで、主にこちらの方面での活用が期待されていますが、async/awaitと言えばもう 1 つ代名詞となっている分野がありますね?

そう、GUI です。

意外と知られていませんが、GUI フレームワークGTKの Rust バインディグであるgtk-rsは既に futures 0.3 に対応した非同期タスクの処理機構を備えており、現時点でノンブロッキング I/O を実行可能です。当記事ではこちらを使い、入力された URL から画像をダウンロードして表示する GUI アプリの作成手順をご紹介します。

お断り

Future の処理方式などといった理論的な部分に関しては掘り下げず、具体的な利用法について追っていきます。動作原理を知りたいという方は他記事をご参照ください。

また gtk-rs についても同様に詳細な解説は省きます。以下の記事が基礎から解説されており参考になりますので、先にそちらに目を通されることをおすすめします。
いまさらGTKとRustでLinuxデスクトップアプリ入門

当記事では全体的によりイディマオティックな記法を採用していますが、処理の内容は変わりません。対応する部分を比較しながらご覧ください。

環境

rustc 1.39.0-nightly (760226733 2019-08-22)
async_await フィーチャの利用には nightly ビルドが必要です。

コーディング

プロジェクト作成

まずはお約束のcargo newから始めましょう。

cargo new async-image-downloader

続いてCargo.tomlに必要なクレートを追加します。

Cargo.toml
[package]
name = "async-image-downloader"
version = "0.1.0"
authors = ["Your Name <you@example.com>"]
edition = "2018"

[dependencies]
gtk = "0.7.0"
glib = "0.8.1"
gio = "0.7.0"

ビューの構築

アプリケーション全体を表すApplicationのインスタンスを生成し、準備が完了したタイミングで発火するactivateシグナルにビューを構築するコールバックを登録します。

src/main.rs
use gio::{ApplicationExt, ApplicationExtManual};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // アプリケーションの生成
    let app = gtk::Application::new(None, Default::default())?;
    // activate シグナルでビューを構築
    app.connect_activate(build_ui);
    // 実行開始
    app.run(&[]);

    Ok(())
}

このコールバックはApplicationの参照を引数に取ります。この参照に対して実際の画面を表すApplicationWindowを登録することで標準的なアプリケーションの土台が完成します。

ということでそのコールバックであるbuild_ui関数の作成に移ります。

src/main.rs
use {
    gio::{ApplicationExt, ApplicationExtManual},
    // 追加
    glib::Cast,
    // 追加
    gtk::{ContainerExt, WidgetExt},
};

fn build_ui(app: &gtk::Application) {
    // 各ウィジェットの生成

    // イメージ
    let layout = gtk::LayoutBuilder::new().expand(true).build();
    let image = gtk::ImageBuilder::new().parent(layout.upcast_ref()).build();

    // ブロッキングを視覚化するスピナー
    let spinner = gtk::SpinnerBuilder::new()
        .height_request(32)
        .active(true)
        .build();

    // プログレスバー
    let progress = gtk::ProgressBarBuilder::new()
        .text("0%")
        .show_text(true)
        .build();

    // URL 入力ボックス
    let entry = gtk::EntryBuilder::new()
        .placeholder_text("Enter image url")
        .build();

    // ダウンロード開始ボタン
    let get_button = gtk::ButtonBuilder::new()
        .label("GET")
        .sensitive(false)
        .build();

    // ルートコンテナ
    let root = gtk::Box::new(gtk::Orientation::Vertical, 0);
    // 各ウィジェットを追加
    root.add(&layout);
    root.add(&spinner);
    root.add(&progress);
    root.add(&entry);
    root.add(&get_button);

    // メインウィンドウ
    gtk::ApplicationWindowBuilder::new()
        // Application をセット
        .application(app)
        // ルートコンテナをセット
        .child(&root.upcast())
        // 各プロパティをセット
        .window_position(gtk::WindowPosition::Center)
        .default_width(400)
        .default_height(400)
        .title("Async Download Test")
        .build()
        // 全体を表示
        .show_all();
}

*Builderは各ウィジェットのビルダークラスです。newの後に各プロパティをセットし、buildを呼び出すことでウィジェットを生成します。

let spinner = gtk::SpinnerBuilder::new()
    .height_request(32)
    .active(true)
    .build();

これは

let spinner = gtk::Spinner::new();
spinner.set_property_height_request(32);
spinner.set_property_active(true);

のように直接ウィジェットを生成した後、各プロパティのセッタを呼び出したのと同じ意味になります。

しかし前者の方が変数に対するアクセスが少なく(gtk-rs ではオブジェクトを内部可変性を持った参照カウンタで扱うので型としてはミュータブルではありませんが)、またオートコンプリートが利くのでコーディングが楽です。

余談ですが gtk-rs では継承関係を表現するためにトリッキーな方法で構造体を定義しており、その代償としてオートコンプリートがほぼ効きません。(つらい)

面白いのは最後のApplicationWindowの扱いでしょうか。変数束縛する必要がないのでbuildで生成した後、戻り値に対して直接show_allを呼び出しています。

これでビューは完成です。

view.png

ロジックの実装

それではいよいよ非同期 I/O を実行する部分を記述していきましょう。

Rust の非同期処理モデルはExecutorと呼ばれる処理機構が、タスク(Executor に直接保持される Future、トップレベル Future)の中断と再開を司ることで実現されています。

このタスクの追加は Executor のspawn~メソッドに Future を渡すことで行われます。Executor を表すトレイトとして futures クレートにはSpawnLocalSpawnが定義されています。

gtk-rs では Executor の責務はMainContextが担っており、Spawn と LocalSpawn を実装していますが、固有メソッドとしてspawn_localという直接 Future の実装型を受け取るものも定義されています。
使い勝手がいいので今回はこちらを使いましょう。

実装のために依存関係にいくつか変更を加えます。

  • 実際にダウンロードを行う web クライアント、reqwestの追加

  • Executor API を使用するために glib の futures フィーチャを有効化

  • Future/Stream にユーティリティメソッドを追加する futures-preview の追加

  • 画像のバッファリングを行う gdk-pixbuf の追加

Cargo.toml
...
[dependencies]
gtk = "0.7.0"
glib = { version = "0.8.1", features = ["futures"] }
gio = { version = "0.7.0", features = ["futures", "v2_44"] }
gdk-pixbuf = "0.7.0"
reqwest = "0.9.20"
futures-preview = { version = "0.3.0-alpha.18", features = ["compat"] }

まずは準備としてエントリのシグナルハンドラを記述します。
文字が入力されていたらダウンロードボタンがクリックできるようにしてみましょう。
入力されているテキストの長さが変化するたびに発火するproperty_text_length_notifyシグナルを監視し、ハンドラ内ではget_text_lengthを呼び出して値が 0 でなかったらボタンが有効化するようにします。

src/main.rs
use {
    gio::{ApplicationExt, ApplicationExtManual},
    glib::Cast,
    // EntryExt を追加
    gtk::{ContainerExt, EntryExt, WidgetExt},
};
...
    // ルートコンテナ
    let root = gtk::Box::new(gtk::Orientation::Vertical, 0);
    // 各ウィジェットを追加
    ...

    // シグナルハンドラの登録

    // エントリが空欄でなければボタンを有効化
    entry.connect_property_text_length_notify({
        let get_button = get_button.clone();
        move |entry| {
            get_button.set_sensitive(entry.get_text_length() != 0);
        }
    });

    // メインウィンドウ
    gtk::ApplicationWindowBuilder::new()
    ...

あるウィジェットのシグナルハンドラ内で別のウィジェットにアクセスしたい場合、そのウィジェットをクロージャにキャプチャさせます。
このウィジェットをまた後で使いたい場合にはcloneでコピーします。前述した通り gtk-rs のオブジェクトは参照カウンタで扱われるので、行われるのは実体ではなく参照のコピーです。
他にもビジュアルツリーを辿ってお目当てのウィジェットを探し当てるなどいくつか方法がありますが、これが最もシンプルです。

上記のコードは

    let clos = {
        let get_button = get_button.clone();
        move |entry| get_button.set_sensitive(entry.get_text_length() != 0)
    };

    entry.connect_property_text_length_notify(clos);

と書いても同様の意味合いになります。Rust のブロックは式なので、最後の行のクロージャが戻り値になっている訳ですね。JavaScirpt でスコープの中からローカル変数をキャプチャした関数を返すのと同じ要領です。
この記法のメリットはcloneしたウィジェットの名前がシャドーイングされず、ブロックが閉じた後も同じ名前で変数にアクセスできることです。

それではいよいよ核心部の記述に移りましょう。
ボタンのclickedハンドラに Future の生成とタスクの追加を割り当てます。

src/main.rs
// async_await フィーチャを有効化(1.39以降では不要)
#![feature(async_await)]

use {
    // 追加
    futures::{FutureExt, TryStreamExt},
    // 追加
    gdk_pixbuf::PixbufLoaderExt,
    ...
    // ButtonExt、ImageExt、ProgressBarExtを追加
    gtk::{ButtonExt, ContainerExt, EntryExt, ImageExt, ProgressBarExt, WidgetExt},
};
...
    // シグナルハンドラの登録
    ...

    // ボタンのクリックでダウンロードを開始する
    get_button.connect_clicked(move |button| {
        // ダウンロード中はウィジェットを無効化
        entry.set_sensitive(false);
        button.set_sensitive(false);

        // ボタンがクリックされている == 有効な時点でエントリは空欄ではないので unwrap して取り出す
        let uri = entry.get_text().unwrap();

        let progress = progress.clone();
        let image = image.clone();
        // Future の生成
        let future = async move {
            // クライアントを生成
            let res = reqwest::r#async::Client::new()
                .get(uri.as_str())
                .send()
                .await;

            // Content-Length を取得
            let content_length = res.content_length().map(|cl| cl as f64);
            // ボディを返す Stream を取得
            let mut decoder = res.into_body();

            // Stream からボディを継続受信
            let mut body = Vec::new();
            while let Some(chunk) = decoder.try_next().await? {
                body.extend(chunk);

                // 進捗に応じてプログレスバーを更新
                if let Some(content_length) = content_length {
                    let percent = body.len() as f64 / content_length;
                    progress.set_fraction(percent);
                    progress.set_text(Some(&format!("{:>6.02}%", percent * 100.0)));
                }
            }

            // ダウンロード完了

            progress.set_fraction(100.0);
            progress.set_text(Some("100.00%"));

            // Pixbuf にバイト列を流し込んでイメージにセット
            let loader = gdk_pixbuf::PixbufLoader::new();
            loader.write(&body)?;
            loader.close()?;
            image.set_from_pixbuf(loader.get_pixbuf().as_ref());

            Result::<_, Box<dyn std::error::Error>>::Ok(())
        }
            // ダウンロード後の処理
            .map({
                let entry = entry.clone();
                let button = button.clone();
                move |res| {
                    // 標準エラー出力にエラーの内容を表示
                    if let Err(err) = res {
                        eprintln!("Download failed: {:?}", err);
                    }
                    // 無効化したウィジェットを復元
                    entry.set_sensitive(true);
                    button.set_sensitive(true);
                }
            });

        // Executor にタスクを追加
        glib::MainContext::default().spawn_local(future);
    });
    ...

長くなりましたが要点はコメントの通りです。ですが 2 点補足しておきたい部分があります。

        ...
        let progress = progress.clone();
        let image = image.clone();
        // Future の生成
        let future = async move {
            // クライアントを生成
        ...

これ以降ウィジェットにはアクセスしないので、シグナルハンドラ自身はcloneでコピーしたものではなく、見えている外側の変数そのものの所有権を奪っています。
そしてシグナルハンドラの中では、その所有権を奪ったウィジェットを更にcloneして async move ブロックで Future 内にコピーの所有権を移しています。
冗長に見えますが、Future はタスクとなり、元のコンテキストから切り離されて実行されることになるのでこれは必須の手順です。
スレッドがデタッチされるイメージに近いでしょう。
驚くべきことにこれらの振る舞いは、シグナルハンドラのトレイト(FnMut)とspawn_localのシグニチャ(引数の Future に'staticを要求)によって型システム上で表現されており、このcloneの呼び出しをコメントアウトしただけでコンパイルエラーになります。

            ...
            let future = async move {
                ...
                Result::<_, Box<dyn std::error::Error>>::Ok(())
            }
            ...

さり気なく重要な一文です。
見ての通り async ブロック内でも ? オペレータを使用した早期リターンが可能なのですが、それを受ける async ブロック(から作られる Future の存在型)は最終的に解決される値の型であるOutput関連型を理解できず、エラーになってしまいます。
ブロック内で直接返される値を明示的に修飾することで、推定の補助を行っています。

以上です。
これにて呼び出しからは速やかに制御が返り、晴れて非同期処理が開始される運びとなります。おめでとうございます 🎉

Future の相互運用

上記のコードはコンパイルすら通りません。あれあれ……おかしいな。何がいけないんだろう🤔
はい。
Qiita 内のいくつかの記事でも言及されている通り、現時点では多くのクレートはバージョン 0.1 系列の futures に依存しており、標準ライブラリが準拠する 0.3 系列の Future には対応していません。しかしご安心ください。futures クレートには 0.1 系の Future を 0.3 系のものへと変換する便利トレイトが定義されています。これを使用するために compat フィーチャを有効化します。

Cargo.toml
[dependencies]
...
- futures-preview = "0.3.0-alpha.18"
+ futures-preview = { version = "0.3.0-alpha.18", features = ["compat"] }

そして実際の変換ですが、該当トレイトをインポートし、特に何も考えずcompatメソッドを呼ぶだけです。お手軽ですね。

src/main.rs
use {
    futures::{
        // 追加
        compat::{Future01CompatExt, Stream01CompatExt},
        FutureExt, TryStreamExt,
    ...
            // クライアントを生成
            let res = reqwest::r#async::Client::new()
                .get(uri.as_str())
                .send()
                .compat() // Future01CompatExt::compat
                .await;

            // Content-Length を取得
            let content_length = res.content_length().map(|cl| cl as f64);
            // ボディを返す Stream を取得
            let mut decoder = res.into_body().compat(); // Stream01CompatExt::compat
            ...

コンパイルが通りました。やったね!
では早速動かしてみましょう。

not_work.gif

……おや?

残念ながら……

このコードはコンパイルこそ通るものの、実際に動作はしません。なぜでしょうか? エラーメッセージを見てみます。

error: Error(Hyper(Error(User(Execute), "tokio::spawn failed (is a tokio runtime running this future?)")), "...")

Executor が tokio でないことが問題だと言っているようです。reqwest は hyper をバックエンドにしていて、その hyper は tokio に依存しています。つまり tokio でしか動かないようハードコードされている部分があるので API が適合していても結局は動かないわけですね……無念。

コード全文は以下のようになります。

詳細
Cargo.toml
[package]
name = "async-image-downloader"
version = "0.1.0"
authors = ["Your Name <you@example.com>"]
edition = "2018"

[dependencies]
gtk = "0.7.0"
glib = { version = "0.8.1", features = ["futures"] }
gio = "0.7.0"
gdk-pixbuf = "0.7.0"
reqwest = "0.9.20"
futures-preview = { version = "0.3.0-alpha.18", features = ["compat"] }
src/main.rs
#![feature(async_await)]

mod client;

use {
    futures::{
        // compat::{Future01CompatExt, Stream01CompatExt},
        FutureExt,
        TryStreamExt,
    },
    gdk_pixbuf::PixbufLoaderExt,
    gio::{ApplicationExt, ApplicationExtManual},
    glib::Cast,
    gtk::{ButtonExt, ContainerExt, EntryExt, ImageExt, ProgressBarExt, WidgetExt},
};

fn build_ui(app: &gtk::Application) {
    // 各ウィジェットの生成

    // イメージ
    let layout = gtk::LayoutBuilder::new().expand(true).build();
    let image = gtk::ImageBuilder::new().parent(layout.upcast_ref()).build();

    // ブロッキングを視覚化するスピナー
    let spinner = gtk::SpinnerBuilder::new()
        .height_request(32)
        .active(true)
        .build();

    // プログレスバー
    let progress = gtk::ProgressBarBuilder::new()
        .text("0%")
        .show_text(true)
        .build();

    // URL 入力ボックス
    let entry = gtk::EntryBuilder::new()
        .placeholder_text("Enter image url")
        .build();

    // ダウンロード開始ボタン
    let get_button = gtk::ButtonBuilder::new()
        .label("GET")
        .sensitive(false)
        .build();

    // ルートコンテナ
    let root = gtk::Box::new(gtk::Orientation::Vertical, 0);
    // 各ウィジェットを追加
    root.add(&layout);
    root.add(&spinner);
    root.add(&progress);
    root.add(&entry);
    root.add(&get_button);

    // シグナルハンドラの登録

    // エントリが空欄でなければボタンを有効化
    entry.connect_property_text_length_notify({
        let get_button = get_button.clone();
        move |entry| {
            get_button.set_sensitive(entry.get_text_length() != 0);
        }
    });

    // ボタンのクリックでダウンロードを開始する
    get_button.connect_clicked(move |button| {
        // ダウンロード中はウィジェットを無効化
        entry.set_sensitive(false);
        button.set_sensitive(false);

        // ボタンがクリックされている == 有効な時点でエントリは空欄ではないので unwrap して取り出す
        let uri = entry.get_text().unwrap();

        let progress = progress.clone();
        let image = image.clone();
        // Future の生成
        let future = async move {
            // クライアントを生成
            let res = reqwest::r#async::Client::new()
                .get(uri.as_str())?
                .send()
                .compat()
                .await?;

            // Content-Length を取得
            let content_length = res.content_length().map(|cl| cl as f64);
            // ボディを返す Stream を取得
            let mut decoder = res.into_body().compat();

            // Stream からボディを継続受信
            let mut body = Vec::new();
            while let Some(chunk) = decoder.try_next().await? {
                body.extend(chunk);

                // 進捗に応じてプログレスバーを更新
                if let Some(content_length) = content_length {
                    let percent = decoder.downloaded() as f64 / content_length;
                    progress.set_fraction(percent);
                    progress.set_text(Some(&format!("{:>6.02}%", percent * 100.0)));
                }
            }

            // ダウンロード完了

            progress.set_fraction(100.0);
            progress.set_text(Some("100.00%"));

            // Pixbuf にバイト列を流し込んでイメージにセット
            let loader = gdk_pixbuf::PixbufLoader::new();
            loader.write(&body)?;
            loader.close()?;
            image.set_from_pixbuf(loader.get_pixbuf().as_ref());

            Result::<_, Box<dyn std::error::Error>>::Ok(())
        }
            // ダウンロード後の処理
            .map({
                let entry = entry.clone();
                let button = button.clone();
                move |res| {
                    // 標準エラー出力にエラーの内容を表示
                    if let Err(err) = res {
                        eprintln!("Download failed: {:?}", err);
                    }
                    // 無効化したウィジェットを復元
                    entry.set_sensitive(true);
                    button.set_sensitive(true);
                }
            });

        // Executor にタスクを追加
        glib::MainContext::default().spawn_local(future);
    });

    // メインウィンドウ
    gtk::ApplicationWindowBuilder::new()
        // Application をセット
        .application(app)
        // ルートコンテナをセット
        .child(&root.upcast())
        // 各プロパティをセット
        .window_position(gtk::WindowPosition::Center)
        .default_width(400)
        .default_height(400)
        .title("Async Download Test")
        .build()
        // 全体を表示
        .show_all();
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // アプリケーションの生成
    let app = gtk::Application::new(None, Default::default())?;
    // activate シグナルでビューを構築
    app.connect_activate(build_ui);
    // 実行開始
    app.run(&[]);

    Ok(())
}

まとめ

web クライアントのライブラリは他にもあるのですが、actix-webなども同様に hyper をバックエンドにしているので動作しません。
最近は curl をバックエンドにしたisahcsurfといったライブラリも登場し、こちらは tokio に依存してはいないのですが、バグがあってまだ安定的に動作するとは言えないようです。

しかしそれはそれとして感じたのは、Rust(というかこの非同期実行モデル)と GUI の意外な相性の良さです。
ウィジェット間の連携などでどうにも野暮ったさが拭えない印象の gtk-rs ですが、こと非同期に関しては「タスク内からウィジェットを触る」「メインスレッド外からのウィジェットに対するアクセスを禁止する」といった、C#ですら油断するとバグを埋め込んでしまう部分を実にスマートに解決しています。
これは Rust がポーリングモデルを採用し、タスクに Send などの境界を課すことができるからこその利点ですね。
Rust+GUI の可能性を感じさせる体験でした。

という訳でタイトル詐欺になってしまい申し訳ないのですが、Rust+async/awaitでノンブロッキングGUIという夢は未だ実現不可能という結果となってしまいました。
しかしエコシステムが futures 0.3 に対応した暁にはこれらのコードはそのまま動作するはずです。
頃合いを見てまた挑戦してみたいと思います。

実はある

futures 0.3 に対応した非同期 I/O ライブラリは既に存在します。
この記事の冒頭で追加した gio が実はそうでした。
関連する API を抜粋します。

// SocketClientExt
fn connect_to_host_async_future(
    &self,
    host_and_port: &str,
    default_port: u16
) -> Box_<dyn Future<Output = Result<SocketConnection, Error>> + Unpin>;

// OutputStreamExtManual
fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
    &self,
    buffer: B,
    io_priority: Priority
) -> Box<dyn Future<Output = Result<(B, usize, Option<Error>), (B, Error)>> + Unpin>;

// InputStreamExtManual
fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
    &self,
    buffer: B,
    io_priority: Priority
) -> Box<dyn Future<Output = Result<(B, usize, Option<Error>), (B, Error)>> + Unpin>;

大分ローレベルな感じがしますね……シグニチャも奇っ怪で使い勝手があまり良くなさそうです。
しかし動くことは動きます。ということでこれを用いてStreamを実装する構造体を自ら定義していきたいと思います。

長くなりましたので続きは別の記事として投稿します。

14
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11