1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【36/37】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第36回になります、ついにラス前で、解説に対応した演習は今回が最後です!

今回は非同期特有の考え方や操作が中心となっているみたいですね、サクッと行きたいと思います!(事情につき3問あります)

8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!

記事は執筆時点での理解という感じです

今回の関連ページ

[08_futures/05_blocking] ブロッキングを回避すべし

問題はこちらです。

lib.rs
// TODO: the `echo` server uses non-async primitives.
//  When running the tests, you should observe that it hangs, due to a
//  deadlock between the caller and the server.
//  Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;

pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (socket, _) = listener.accept().await?;
        let mut socket = socket.into_std()?;
        socket.set_nonblocking(false)?;
        let mut buffer = Vec::new();
        socket.read_to_end(&mut buffer)?;
        socket.write_all(&buffer)?;
    }
}
テストを含めた全体
lib.rs
// TODO: the `echo` server uses non-async primitives.
//  When running the tests, you should observe that it hangs, due to a
//  deadlock between the caller and the server.
//  Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;

pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (socket, _) = listener.accept().await?;
        let mut socket = socket.into_std()?;
        socket.set_nonblocking(false)?;
        let mut buffer = Vec::new();
        socket.read_to_end(&mut buffer)?;
        socket.write_all(&buffer)?;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::SocketAddr;
    use std::panic;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio::task::JoinSet;

    async fn bind_random() -> (TcpListener, SocketAddr) {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        (listener, addr)
    }

    #[tokio::test]
    async fn test_echo() {
        let (listener, addr) = bind_random().await;
        tokio::spawn(echo(listener));

        let requests = vec![
            "hello here we go with a long message",
            "world",
            "foo",
            "bar",
        ];
        let mut join_set = JoinSet::new();

        for request in requests {
            join_set.spawn(async move {
                let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
                let (mut reader, mut writer) = socket.split();

                // Send the request
                writer.write_all(request.as_bytes()).await.unwrap();
                // Close the write side of the socket
                writer.shutdown().await.unwrap();

                // Read the response
                let mut buf = Vec::with_capacity(request.len());
                reader.read_to_end(&mut buf).await.unwrap();
                assert_eq!(&buf, request.as_bytes());
            });
        }

        while let Some(outcome) = join_set.join_next().await {
            if let Err(e) = outcome {
                if let Ok(reason) = e.try_into_panic() {
                    panic::resume_unwind(reason);
                }
            }
        }
    }
}

「ブロッキングI/Oを使用するとデッドロックすることを確かめて、 spawn_blocking を使って解消してください」という問題です。

解説

というわけで、 spawn_blocking を使うように書き換えましょう。

lib.rs
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (socket, _) = listener.accept().await?;

+        tokio::task::spawn_blocking(|| -> anyhow::Result<()> {
            let mut socket = socket.into_std()?;
            socket.set_nonblocking(false)?;
            let mut buffer = Vec::new();
            socket.read_to_end(&mut buffer)?;
            socket.write_all(&buffer)?;

+            Ok(())
+        });
    }
}

どう変わったのでしょうか...?ブロッキングとノンブロッキングバージョンでどう違うのでしょう...?解説を試みてみます。

非同期処理でブロックしないとは、「 .await でこまめに主導権をランタイムに返す」ということです。もし長時間ブロックするスレッドがあり、ずっと .await してくれない場合、tokioランタイムにはそのタスクをどうにかする術はありません!

この問題うまいなぁと思うのは、 #[tokio::test] マクロ以下なのでシングルスレッド処理になっているんですよね...よって問題では次のようにしてデッドロックが起こっています。

  • tokioランタイムはシングルスレッドしか扱えない状況
    • .await が入るたびにタスクを お手玉・・・ している
  • ノンブロッキングI/Oの場合は、 read_to_end 呼び出し時に .await できるから、そこで write_all 側のタスクに切り替わるので、デッドロックしない
  • しかし、ブロッキングI/Oの場合は、 read_to_end でtokioランタイムにスレッド主導権が返らないから、ずっと待ちになってしまう、デッドロック!

※ 以下の図は概略です!

全部非同期の場合

PoCソースコード

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=de4d03d4204a1474c50cb855fed05a4c

Rust
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (mut socket, _) = listener.accept().await?;
        let mut buffer = Vec::new();
        socket.read_to_end(&mut buffer).await?;
        socket.write_all(&buffer).await?;
    }
}
省略なし全体
Rust
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
    loop {
        let (mut socket, _) = listener.accept().await?;
        let mut buffer = Vec::new();
        socket.read_to_end(&mut buffer).await?;
        socket.write_all(&buffer).await?;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::SocketAddr;
    use std::panic;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio::task::JoinSet;

    async fn bind_random() -> (TcpListener, SocketAddr) {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        (listener, addr)
    }

    #[tokio::test]
    async fn test_echo() {
        let (listener, addr) = bind_random().await;
        tokio::spawn(echo(listener));

        let requests = vec![
            "hello here we go with a long message",
            "world",
            "foo",
            "bar",
        ];
        let mut join_set = JoinSet::new();

        for request in requests {
            join_set.spawn(async move {
                let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
                let (mut reader, mut writer) = socket.split();

                // Send the request
                writer.write_all(request.as_bytes()).await.unwrap();
                // Close the write side of the socket
                writer.shutdown().await.unwrap();

                // Read the response
                let mut buf = Vec::with_capacity(request.len());
                reader.read_to_end(&mut buf).await.unwrap();
                assert_eq!(&buf, request.as_bytes());
            });
        }

        while let Some(outcome) = join_set.join_next().await {
            if let Err(e) = outcome {
                if let Ok(reason) = e.try_into_panic() {
                    panic::resume_unwind(reason);
                }
            }
        }
    }
}

今回のブロッキングI/O混入バージョン

では spawn_blocking でどう解決するのか...?このメソッドを使うことで、 重たいブロッキング処理専用のスレッドを建ててそちらで実行 されます!

演習回答バージョン

暗号計算等重い処理やブロッキングAPIしか提供されておらずそれを使わなければならない場合などに対しては、この spawn_blocking 専用スレッドを用いると良さそうです。

[08_futures/06_async_aware_primitives] 非同期プリミティブ (非同期用実装)

問題はこちらです。

lib.rs
/// TODO: the code below will deadlock because it's using std's channels,
///  which are not async-aware.
///  Rewrite it to use `tokio`'s channels primitive (you'll have to touch
///  the testing code too, yes).
///
/// Can you understand the sequence of events that can lead to a deadlock?
use std::sync::mpsc;

pub struct Message {
    payload: String,
    response_channel: mpsc::Sender<Message>,
}

/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
    loop {
        if let Ok(msg) = receiver.recv() {
            println!("Pong received: {}", msg.payload);
            let (sender, new_receiver) = mpsc::channel();
            msg.response_channel
                .send(Message {
                    payload: "pong".into(),
                    response_channel: sender,
                })
                .unwrap();
            receiver = new_receiver;
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::{pong, Message};
    use std::sync::mpsc;

    #[tokio::test]
    async fn ping() {
        let (sender, receiver) = mpsc::channel();
        let (response_sender, response_receiver) = mpsc::channel();
        sender
            .send(Message {
                payload: "pong".into(),
                response_channel: response_sender,
            })
            .unwrap();

        tokio::spawn(pong(receiver));

        let answer = response_receiver.recv().unwrap().payload;
        assert_eq!(answer, "pong");
    }
}

ブロッキングして到着を待つ std::sync::mpsc; だとデッドロックが起きるので、非同期用の tokio::sync::mpsc を使うように書き直そうという問題です。

解説

回答としては、 .await が使用可能な tokio::sync::mpsc に置き換えるだけです!

lib.rs
// use std::sync::mpsc;
+use tokio::sync::mpsc;

pub struct Message {
    payload: String,
    response_channel: mpsc::Sender<Message>,
}

/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
    loop {
+        if let Some(msg) = receiver.recv().await {
            println!("Pong received: {}", msg.payload);
            let (sender, new_receiver) = mpsc::channel(1);
            msg.response_channel
                .send(Message {
                    payload: "pong".into(),
                    response_channel: sender,
                })
+                .await
                .unwrap();
            receiver = new_receiver;
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::{pong, Message};
    // use std::sync::mpsc;
+    use tokio::sync::mpsc;

    #[tokio::test]
    async fn ping() {
        let (sender, receiver) = mpsc::channel(1);
        let (response_sender, mut response_receiver) = mpsc::channel(1);
        sender
            .send(Message {
                payload: "pong".into(),
                response_channel: response_sender,
            })
+            .await
            .unwrap();

        tokio::spawn(pong(receiver));

+        let answer = response_receiver.recv().await.unwrap().payload;
        assert_eq!(answer, "pong");
    }
}

「なぜデッドロックするかわかる?」と問われているので、先ほどと同様にシーケンス図に描き起こしてみようと思います。

std::sync::mpsc バージョン

即効でデッドロックしましたね... recv().await したいです!

tokioに置き換えましょう。

tokio::sync::mpsc バージョン

無事にピンポン :ping_pong: できています。

ちなみに本エクササイズの Book の方の解説では std::sync::Mutextokio::sync::Mutex が取り上げられており、さらにstdの方を用いた場合のデッドロックの仕組みまで解説されています1

解説側は Mutex 、 エクササイズ側は mpsc を題材にしたみたいですね。この2つにとどまらず、前の問題に出てきた非同期I/O (tokio::io) やTCPソケット等 (tokio::net) 等、tokioクレートには非同期用プリミティブがたくさん用意されており、また大体が標準ライブラリが持つAPIに似ていてそこに .await を付けられるようになっています。tokioクレート以外でも、例えばreqwestクレートはfeatureフラグで非同期・ブロッキングを切り替えられたりできて興味深いですね。

コストの面を考えると、何でもかんでも非同期プリミティブを使うべきではない (ブロッキングプリミティブを使えという意味ではありません)という言説2もあり、考えることが多い非同期ですが、クレートの対応具合や対応の仕方を見てみると面白そうです(Bookに忖度した感想)。

[08_futures/07_cancellation] タスクのキャンセル

問題はこちらです。主要な行をハイライトしておきます。

lib.rs
// TODO: fix the `assert_eq` at the end of the tests.
//  Do you understand why that's the resulting output?
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;

pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
    let mut buffer = Vec::new();
    for _ in 0..n_messages {
        let (mut stream, _) = listener.accept().await.unwrap();
        let _ = tokio::time::timeout(timeout, async {
            stream.read_to_end(&mut buffer).await.unwrap();
        })
        .await;
    }
    buffer
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::AsyncWriteExt;

    #[tokio::test]
    async fn ping() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let messages = vec!["hello", "from", "this", "task"];
        let timeout = Duration::from_millis(20);
        let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));

        for message in messages {
            let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
            let (_, mut writer) = socket.split();

            let (beginning, end) = message.split_at(message.len() / 2);

            // Send first half
            writer.write_all(beginning.as_bytes()).await.unwrap();
            tokio::time::sleep(timeout * 2).await;
            writer.write_all(end.as_bytes()).await.unwrap();

            // Close the write side of the socket
            let _ = writer.shutdown().await;
        }

        let buffered = handle.await.unwrap();
        let buffered = std::str::from_utf8(&buffered).unwrap();
+        assert_eq!(buffered, "");
    }
}

最終回手前にして久々の「実行結果どうなる...?」問題ですね...!

解説

Book で解説されている通り、 .await がある場所でのみ tokio::time::timeout 等を利用して外から停止させることができます。裏を返すと前の問題のような .await せずブロッキングしてしまうようなタスクは停止できない ということですね。こまめな .await が大切です。

.await は入れ子構造になっています。タイムアウト時にランタイムにコントロールを返した .await の時点で停止されるため、それを考慮して回答します!

lib.rs
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;

pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
    let mut buffer = Vec::new();
    for _ in 0..n_messages {
        let (mut stream, _) = listener.accept().await.unwrap();
        let _ = tokio::time::timeout(timeout, async {
            // 最初を受け取ったところでread_to_end実装内部のawaitがyieldする
            // そしてタイムアウトするので、前半だけ残る
            stream.read_to_end(&mut buffer).await.unwrap();
        })
        .await;
    }
    buffer
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::AsyncWriteExt;

    #[tokio::test]
    async fn ping() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let messages = vec!["hello", "from", "this", "task"];
        let timeout = Duration::from_millis(20);
        let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));

+        let mut answer = String::new();

        for message in messages {
            let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
            let (_, mut writer) = socket.split();

            let (beginning, end) = message.split_at(message.len() / 2);

+            answer = format!("{}{}", answer, beginning);

            // Send first half
            writer.write_all(beginning.as_bytes()).await.unwrap();
            tokio::time::sleep(timeout * 2).await;
            writer.write_all(end.as_bytes()).await.unwrap();

            // Close the write side of the socket
            let _ = writer.shutdown().await;
        }

        let buffered = handle.await.unwrap();
        let buffered = std::str::from_utf8(&buffered).unwrap();
-        assert_eq!(buffered, "");
+        assert_eq!(buffered, &answer);
        // 答えは、各文字列の前半分を足し合わせた
        // hefrthta
    }
}

read_to_end メソッドでの .await 中にタイムアウトが来るように設定されています。 read_to_end の内部でも .await が呼ばれてバッファに書き込みを行っていると予想でき、前半部分だけが書き込まれているだろうと踏まえて回答すればそれが正解です。ハードコードせずその意図を組み入れた回答をしてみました!

AsyncDropがほしい...!(ない)

Book にてキャンセルの話のついでにクリーンアップの話題が出ています。そして「不幸なことに、Rustは非同期クリーンアップ操作に関して明確な手段を提供していません」みたいなことが書かれています。

クリーンアップと言えば、そう、我々には Drop トレイトがあります(参考: 第12回)。しかしそのシグネチャには、 当然非同期的な要素は含まれません

Rust
pub trait Drop {
    // Required method
    fn drop(&mut self);
}

例えば tokio::runtime::Handle::current を使うことで無理やり非同期処理を実行しようとすることは可能ですが、クリーンアップの確実性の担保が難しいです。

PoCソースコード

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=cab4561a2ae8af4ce62ef8585a7ed9e6

Rust
use tokio::time::{sleep, Duration};
use tokio::runtime::Handle;

#[derive(Clone)]
struct Hoge {
    // 所有するリソースには、所有権を奪える等スレッドを跨げるような細工をしておく
    resource: Option<String>,
}

impl Hoge {
    fn new(resource: String) -> Self {
        Self {
            resource: Some(resource),
        }
    }

    async fn clean_up(resource: String) {
        println!("Wait...");
        sleep(Duration::from_millis(100)).await;
        println!("Done: {}", resource);
    }
}

impl Drop for Hoge {
    fn drop(&mut self) {
        let Some(resource) = self.resource.take() else {
            return;
        };
        let handle = Handle::current();
        handle.spawn(async move {
            Hoge::clean_up(resource).await;
        });
    }
}

#[tokio::main]
async fn main() {
    let h = Hoge::new("Beep".to_string());
    
    drop(h);
    
    sleep(Duration::from_millis(1000)).await;
}

Bookでは Drop について、ケースに合わせて例えば次のように実装すると良いと書かれています。

  • ランタイムで新しいタスクを作成する (挙げたPoCはこの方法)
  • mpsc 等を使ってメッセージを送り、別タスクにクリーンアップを任せる (良さそう...?)
  • 裏で別スレッドを立ち上げる

Bookの例にも挙げられているSQLトランザクションのabort等、確実に実行してほしくて、かつ非同期ではない、いつものRustなら Drop トレイトにまかせておけばおkな処理でも、上記に挙げたような手法を取るしかないのが現状です。

このような要望に答えてくれそうな AsynDrop を考案しているWG もあるらしいですが、一方で「 AsyncDrop みたいなものが要らないような実装を心がけろ (おそらく上述したような対処をしようの意)」という言説もあります...umm

直感的に、 drop メソッドは軽い処理であってほしいような気がするので、"重め"なことをするであろう AsyncDrop というのはその思想に反していそうではあります3。一方、Pythonの with や Goの defer のような機能が Drop に求められているのも事実です。

そのため筆者的には withdefer により近い、 Drop とは異なるが、 Drop と同様スコープが外れる時にRAIIライクなクリーンアップをしてくれるトレイトがほしいなぁとか思ったりします。 drop 実行前に実行されるイメージです。もしそんな感じの CleanUp トレイト(筆者命名)があれば、 AsyncCleanUp トレイトみたいなのはありなんじゃないでしょうか?(妄想)

さて、無事にここまでこれました。次回、いよいよ最終回です...!

次の記事: 【37】 Axumでクラサバ! ~最終回~

登場したPlayground

(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9f27affc25ba4c63f8f61ea54d92dbae

Rust
use tokio;
use std::sync::{Arc, Mutex};

async fn run(m: Arc<Mutex<Vec<u64>>>) {
    let guard = m.lock().unwrap();
    http_call(&guard).await;
    println!("Sent {:?} to the server", &guard);
    // `guard` is dropped here
}

async fn http_call(_v: &[u64]) {
  // [...]
}

#[tokio::main]
async fn main() {
    let amvu = Arc::new(Mutex::new(vec![0, 1, 2]));
    
    run(amvu).await;
}

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f17679b58250e94e30774cc7b1369d8b

Rust
use std::sync::{Arc, Mutex};

async fn run(m: Arc<Mutex<Vec<u64>>>) {
    let guard = m.lock().unwrap();
    http_call(&guard).await;
    println!("Sent {:?} to the server", &guard);
    // `guard` is dropped here
}

/// Use `v` as the body of an HTTP call.
async fn http_call(v: &[u64]) {
  // [...]
}

#[tokio::main]
async fn main() {
    let m = Arc::new(Mutex::new(vec![0, 1, 2]));
    tokio::spawn(run(m));
}
  1. 解説で登場するソースコード、 MutexGuard!Send なため一見するとコンパイルが通らなそうに見えるのですが、 tokio::spawn とかで包まない場合コンパイル通りますね(包む場合はコンパイルエラー)...block_onSend を要求しないからでしょうか...?

  2. まぁこれも Mutex に限った話の可能性はありますが。

  3. そのため逆に言えば mpsc で外にクリーンアップを任せるのが筋の良い方法に見えます。ただ結局非同期 mpsc を使う都合上、考えることは多そうです。 spawn_blocking で別スレッドを建てて blocking_send とか...?

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?