1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【35/37】 非同期ランタイム・Futureトレイト ~非同期のお作法~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第35回になります!

8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!

記事は執筆時点での理解という感じです

今回の関連ページ

[08_futures/03_runtime] 非同期ランタイム tokio (もとい演習自体は Arc の使用)

問題はこちらです。

lib.rs
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
//  accept connections on both of them concurrently, and always reply clients by sending
//  the `Display` representation of the `reply` argument as a response.
use std::fmt::Display;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;

pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
    // `T` cannot be cloned. How do you share it between the two server tasks?
    T: Display + Send + Sync + 'static,
{
    todo!()
}
テストを含めた全体
lib.rs
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
//  accept connections on both of them concurrently, and always reply clients by sending
//  the `Display` representation of the `reply` argument as a response.
use std::fmt::Display;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;

pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
    // `T` cannot be cloned. How do you share it between the two server tasks?
    T: Display + Send + Sync + 'static,
{
    todo!()
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::SocketAddr;
    use std::panic;
    use tokio::io::AsyncReadExt;
    use tokio::task::JoinSet;

    async fn bind_random() -> (TcpListener, SocketAddr) {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        (listener, addr)
    }

    #[tokio::test]
    async fn test_echo() {
        let (first_listener, first_addr) = bind_random().await;
        let (second_listener, second_addr) = bind_random().await;
        let reply = "Yo";
        tokio::spawn(fixed_reply(first_listener, second_listener, reply));

        let mut join_set = JoinSet::new();

        for _ in 0..3 {
            for addr in [first_addr, second_addr] {
                join_set.spawn(async move {
                    let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
                    let (mut reader, _) = socket.split();

                    // Read the response
                    let mut buf = Vec::new();
                    reader.read_to_end(&mut buf).await.unwrap();
                    assert_eq!(&buf, reply.as_bytes());
                });
            }
        }

        while let Some(outcome) = join_set.join_next().await {
            if let Err(e) = outcome {
                if let Ok(reason) = e.try_into_panic() {
                    panic::resume_unwind(reason);
                }
            }
        }
    }
}

前回 解説した通り、 非同期処理は Send を要求 します!そして、 reply: T を複製せずに 複数タスク(複製スレッド)で所有したい とくれば...?

独り言...「「「前回作成した図、今回載せれば良かった...!!!!」」」 というわけで前回の第34回も合わせて読んでほしいです :bow:

第34回に合わせて図を作成してしまい、作り直す体力がないです... :sweat_smile:

解説

そうですね、 Arc の出番です!というわけで、非同期処理でもお世話になりましょう。

( 次回 にて非同期専用の Mutex がある話等をするつもりですが、)このように、非同期処理も並行処理の一種で、 Send の制約がある仲間ですから、まるまる7章で学んだ知識が活かせます!

lib.rs
use std::sync::Arc;

pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
    // `T` cannot be cloned. How do you share it between the two server tasks?
    T: Display + Send + Sync + 'static,
{
    let reply = Arc::new(reply);
    let reply2 = Arc::clone(&reply);
    tokio::spawn(listen(first, reply));
    tokio::spawn(listen(second, reply2));
}

async fn listen<T>(listener: TcpListener, reply: Arc<T>)
where
    T: Display + Send + Sync + 'static,
{
    loop {
        let (mut socket, _) = listener.accept().await.unwrap();
        let reply = Arc::clone(&reply);

        tokio::spawn(async move {
            let reply = reply.to_string();
            socket.write_all(reply.as_bytes()).await.unwrap();
            socket.flush().await.unwrap();
        });
    }
}

前回自分は「tokioは複数スレッドを管理する」と言ったのですが、今回の Book には「マルチスレッドとシングルスレッドの2つのフレーバーがあり選べて、 #[tokio::main] マクロを使った場合はマルチスレッド、 #[tokio::test] マクロを使用した場合はシングルスレッドになる」と解説されていたりします。

デッドロックってマルチスレッド版だと気づきにくかったりしますし、テストがシングルスレッドなのは中々ありがたいですね。

その他、「 tokio::spawn は(スコープ async {} を跨ぐ値について1) 'static を要求する」とありますが、こちらは std::thread::spawn と同様ですね。タスクがいつまで生きているかはわかりません!

[08_futures/04_future] FutureトレイトとSendトレイト

問題はこちらです。久々のコンパイルを通してください問題になります。

lib.rs
//! TODO: get the code to compile by **re-ordering** the statements
//!  in the `example` function. You're not allowed to change the
//!  `spawner` function nor what each line does in `example`.
//!   You can wrap existing statements in blocks `{}` if needed.
use std::rc::Rc;
use tokio::task::yield_now;

fn spawner() {
    tokio::spawn(example());
}

async fn example() {
    let non_send = Rc::new(1);
    yield_now().await;
    println!("{}", non_send);
}

しまった...こっちも前回解説してしまいました... :sweat_smile: Send ではない値は .await を跨げません

解説

Send を実装していない型であったところで、 .await より前にドロップしてしまえば問題ないので、そのように書いちゃりましょう!

lib.rs
async fn example() {
    {
        let non_send = Rc::new(1);
        println!("{}", non_send);
    }
    yield_now().await;
}

{} 内に収めることで Rc.await 前にドロップさせています。 drop 関数等を使っても良いかもしれません。

その他 Book で取り上げられている大きめなトピックとして、 async fn の正体、 Future トレイトの話があります。Rustでは非同期もトレイトで説明されるのです!

Rust (脱糖前)
async fn xxx(...) -> T {
    // ...
}

Rust (脱糖後)
fn xxx(...) -> impl Future<Output = T> {
    // ...
}

※ 手書きの場合は + Send + 'static みたいな形でさらに追加で色々付くことも...?また脱糖についての正確性は調べられてないです( cargo expand でわかるのかな...?)

Future トレイトを実装した型は ステートマシンである 」という内容を始めとし、かなり興味深い説明が書かれているので是非読んでほしいです!(筆者に説明の力量がない定期)

前回.await を跨ぐには Send が要る(キリッ」と筆者はぼかしながらテキトーに話していましたが、ちゃんとこれに対する理由も Future トレイトを実装した列挙体を例に挙げ説明されていたりします。

Future未来って200通りあんねん...

上記の impl Future 脱糖について、たまに型を手書きする必要があったりして、その時に筆者がよくお世話になっているQiita記事様のリンクを掲載させていただきます :bow:

非同期Rustパターン

https://qiita.com/legokichi/items/4f2c09330f90626600a6

いや、パターン多すぎるだろ...非同期への理解もかなり深まりそうです

では次の問題に行きましょう!

次の記事: 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~

登場したPlayground

(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9ca535b6f390d2ae81cdc7b3c3cd39a6

Rust
use tokio;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::spawn(async move {
        let s = 10_u32;
        let r = &s;
        sleep(Duration::from_millis(100)).await;
        println!("{}", r);
    }).await.ok();
}
  1. ローカルで生成される値への参照等であれば 'static でなくとも .await は跨げるようです。 Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9ca535b6f390d2ae81cdc7b3c3cd39a6

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?