1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【34/37】 `async fn`・非同期タスク生成 ~Rustの非同期入門~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第34回になります、今回から8章、「非同期処理」に入っていきます!

8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!

記事は執筆時点での理解という感じです

今回の関連ページ

[08_futures/01_async_fn] async fn

問題はこちらです。

lib.rs
use tokio::net::TcpListener;

// TODO: write an echo server that accepts incoming TCP connections and
//  echoes the received data back to the client.
//  `echo` should not return when it finishes processing a connection, but should
//  continue to accept new connections.
//
// Hint: you should rely on `tokio`'s structs and methods to implement the echo server.
// In particular:
// - `tokio::net::TcpListener::accept` to process the next incoming connection
// - `tokio::net::TcpStream::split` to obtain a reader and a writer from the socket
// - `tokio::io::copy` to copy data from the reader to the writer
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    todo!()
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    #[tokio::test]
    async fn test_echo() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        tokio::spawn(echo(listener));

        let requests = vec!["hello", "world", "foo", "bar"];

        for request in requests {
            let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
            let (mut reader, mut writer) = socket.split();

            // Send the request
            writer.write_all(request.as_bytes()).await.unwrap();
            // Close the write side of the socket
            writer.shutdown().await.unwrap();

            // Read the response
            let mut buf = Vec::with_capacity(request.len());
            reader.read_to_end(&mut buf).await.unwrap();
            assert_eq!(&buf, request.as_bytes());
        }
    }
}

問題を要約すると次のような感じです。

  • やってくるTCPコネクションを受け入れ、受け取ったデータをそのまま返信するエコーサーバーを書いてください
    • 返信後もリターンはせず、新たにやってくるコネクションを受け入れ続けてください
  • ヒント: 以下のtokioのドキュメントを見よう!

非同期初回からTCPリスナーを活用してエコーサーバーを作るというトバした内容になっていますね...「並行処理を突破した我々なら非同期は怖くないだろう!」みたいなノリで取り組めばよいのでしょうか?

解説

サクッと答えです!

lib.rs
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (mut socket, _) = listener.accept().await?;
        let (mut reader, mut writer) = socket.split();
        tokio::io::copy(&mut reader, &mut writer).await?;
    }
}

コネクションは無限にやってくる可能性があるので loop で回しています。 echo 関数は test_echo テスト関数から tokio::spawn でタスクとして起動されます。

説明に 次回範囲 も少しがかなり入りますが、 echo 関数は impl Future なハンドル(以降これを簡単に"タスク"と呼ぶことにします)を返します。普通タスクは、 .await が呼ばれるまでその中身は一切実行されません!(要はイテレータと同じ遅延評価) そんな中、 tokio::spawnimpl Future を返しながらも渡されたタスクを即実行してくれるランタイム提供の珍しい機能と言えます。(もとい、これがないと色々面倒そうです。)

tokio::spawn(echo(listener)); が返すハンドルはメインスレッドにて .await ( std::thread::spawn で言う join ) をしていないためメインスレッドには終わりが訪れます。よってこの無限ループによる記法をしても問題ありません!

各行についてです。返すエラー型はバラバラですが、面倒なのでなぜか今回依存に入れてくれている anyhow::Error に甘えてすべて ? で中身を取り出しています。

1行目で接続を待ち、2行目で受けたソケットをreaderとwriterに分け、3行目で受け取った入力を返してます。 .await が間に入っていたり、 asyncfn の前についているだけで、同期プログラミングと比べて特段難しかったり読みにくかったりする部分はありません!

.await で何をしているのか?

まだ例が簡単なうちに、 .await で何が起こっているのか軽く触れておきます。

ズバリ、(おそらく) タスクが スレッドを跨いでいます !(おそらく)と書いたのはtokio非同期ランタイムが実際にどういう実装をしているのかちゃんと確かめたわけではないからです。

今までは比較的手動でスレッドを管理していたのに対して、 非同期処理では、ランタイム(tokio)がスレッドを管理しています

tokioはタスクとスレッドを管理し、空いているスレッドにタスクを割り当てて処理をさせては、 .await 等が入るたびにまた引き上げて別なタスクを割り当てて...ということを行っています!手動でスレッドを管理するよりは、きめ細かく効率よくタスクを処理できる方式になっているわけです1

概略図 (mermaid製のシーケンス図において、リソースを participant とするためスレッドは actor で表していきます)

listener.accept().await?; された後等について、さらにこの内側のタスクが完了しているか?を調べてはまた待避させてを繰り返したりをするので、実際の図はもっと複雑です!

この図はおそらく正確に非同期タスクの状態変化を表せてはいないですが、要は何が言いたいかというと「空いたスレッドに処理を割り当てる」言い換えると「同じスコープで書いているように見せてその実 別なスレッドに値が移る 」という部分がミソです!

...そう、すなわち、「 .await で一旦値が待避され、別スレッドに転送されて再開される」ので、値が .await を跨ぐには SendSync が必要 になってくるわけです。前節までを振り返ると、少なくとも Send は必要でしょう。

逆に言うとそれさえ抑えておけば非同期だからと言って特段身構える部分は少ないんじゃないかなと思っています。なぜって...?筆者がこの程度の理解で非同期処理を書けているからです! :upside_down:

[08_futures/02_spawn] 非同期タスク生成

問題はこちらです。

lib.rs
use tokio::net::TcpListener;

// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
//  Multiple connections (on the same listeners) should be processed concurrently.
//  The received data should be echoed back to the client.
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
    todo!()
}
テストを含めた全体
lib.rs
use tokio::net::TcpListener;

// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
//  Multiple connections (on the same listeners) should be processed concurrently.
//  The received data should be echoed back to the client.
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
    todo!()
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::SocketAddr;
    use std::panic;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio::task::JoinSet;

    async fn bind_random() -> (TcpListener, SocketAddr) {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        (listener, addr)
    }

    #[tokio::test]
    async fn test_echo() {
        let (first_listener, first_addr) = bind_random().await;
        let (second_listener, second_addr) = bind_random().await;
        tokio::spawn(echoes(first_listener, second_listener));

        let requests = vec!["hello", "world", "foo", "bar"];
        let mut join_set = JoinSet::new();

        for request in requests.clone() {
            for addr in [first_addr, second_addr] {
                join_set.spawn(async move {
                    let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
                    let (mut reader, mut writer) = socket.split();

                    // Send the request
                    writer.write_all(request.as_bytes()).await.unwrap();
                    // Close the write side of the socket
                    writer.shutdown().await.unwrap();

                    // Read the response
                    let mut buf = Vec::with_capacity(request.len());
                    reader.read_to_end(&mut buf).await.unwrap();
                    assert_eq!(&buf, request.as_bytes());
                });
            }
        }

        while let Some(outcome) = join_set.join_next().await {
            if let Err(e) = outcome {
                if let Ok(reason) = e.try_into_panic() {
                    panic::resume_unwind(reason);
                }
            }
        }
    }
}

「エコーズ、act2!!」というわけで、前節でも少し解説した tokio::spawn を使ってタスクを生成しよう!という問題です。

解説

指示通り tokio::spawn で前節の echo 関数により生成されるタスクを2つ生成すればおkです!

lib.rs
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
    tokio::spawn(echo(first));
    tokio::spawn(echo(second));

    Ok(())
}

async fn echo(listener: TcpListener) {
    loop {
        let (mut socket, _) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            let (mut reader, mut writer) = socket.split();
            tokio::io::copy(&mut reader, &mut writer).await.unwrap();
        });
    }
}

tokio::spawn を使用するメリットについてです。同じ非同期関数内で実行すると処理順が直列になってしまいますが、 tokio::spawn としてタスクを切り出すことで、tokioランタイムが別スレッドでいい感じに処理してくれるようになります。 そのタスクの完了を待たずに 、新たなコネクションを受け入れることができています。

図を描いて説明しようかとも思ったのですが、7章で扱ったマルチスレッド処理( std::thread::spawn )と恩恵は似ており今更かと思うので省略します。あちらは(例えば足し算処理など)似たような処理を並列化したり、サーバー・クライアント間でスレッドを分けたりしたのでした。非同期タスクでも同じようなことができます。Book の最後に言及されている通り、この管理をOSスケジューラに任せるのか、ソフトウェア(プログラム)側で制御するのかというのが本質的な違いでしょう。

では次の問題に行きましょう!

次の記事: 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~

  1. 本当はノンブロッキングI/O非同期I/O とか C10K問題 とかに言及するべきなのでしょうが、本気でマサカリで殺されてしまうので将来の自分に託したいと思います...まぁ非同期I/Oに注目した文脈での「非同期にしておいた方がスレッドを効率よく使えますよ」という説明なら嘘ではないはずなのでとりあえずこの理解でいます。(執筆時点のつっかかりメモ: 非同期I/Oはシグナルやコールバックで完了を通知するというが、シグナルの送信やコールバックの実行の責務を負うスレッドはいないのか...?結局どこかではスレッドがブロックされてはいないか(それでも幾分かは同期処理よりマシになるかもしれないが...)?例えばキーボードからの入力を想定したとして、結局はハードウェアレベルでの挙動まで理解しないと、この質問に回答することは叶わなさそう )

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?