前の記事
- 【0】 準備 ← 初回
- ...
- 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~ ← 前回
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~ ← 今回
全記事一覧
- 【0】 準備
- 【1】 構文・整数・変数
- 【2】 if・パニック・演習
- 【3】 可変・ループ・オーバーフロー
- 【4】 キャスト・構造体 (たまにUFCS)
- 【5】 バリデーション・モジュールの公開範囲 ~ → カプセル化!~
- 【6】 カプセル化の続きと所有権とセッター ~そして不変参照と可変参照!~
- 【7】 スタック・ヒープと参照のサイズ ~メモリの話~
- 【8】 デストラクタ(変数の終わり)・トレイト ~終わりと始まり~
- 【9】 Orphan rule (孤児ルール)・演算子オーバーロード・derive ~Empowerment 💪 ~
- 【10】 トレイト境界・文字列・Derefトレイト ~トレイトのアレコレ~
- 【11】 Sized トレイト・From トレイト・関連型 ~おもしろトレイトと関連型~
- 【12】 Clone・Copy・Dropトレイト ~覚えるべき主要トレイトたち~
- 【13】 トレイトまとめ・列挙型・match式 ~最強のトレイトの次は、最強の列挙型~
- 【14】 フィールド付き列挙型とOption型 ~チョクワガタ~
- 【15】 Result型 ~Rust流エラーハンドリング術~
- 【16】 Errorトレイトと外部クレート ~依存はCargo.tomlに全部お任せ!~
- 【17】 thiserror・TryFrom ~トレイトもResultも自由自在!~
- 【18】 Errorのネスト・慣例的な書き方 ~Rustらしさの目醒め~
- 【19】 配列・動的配列 ~スタックが使われる配列と、ヒープに保存できる動的配列~
- 【20】 動的配列のリサイズ・イテレータ ~またまたトレイト登場!~
- 【21】 イテレータ・ライフタイム ~ライフタイム注釈ようやく登場!~
- 【22】 コンビネータ・RPIT ~ 「
Iterator
トレイトを実装してるやつ」~ - 【23】
impl Trait
・スライス ~配列の欠片~ - 【24】 可変スライス・下書き構造体 ~構造体で状態表現~
- 【25】 インデックス・可変インデックス ~インデックスもトレイト!~
- 【26】 HashMap・順序・BTreeMap ~Rustの辞書型~
- 【27】 スレッド・'staticライフタイム ~並列処理に見るRustの恩恵~
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>
~ - 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~
- 【32】
Send
・排他的ロック(Mutex
)・非対称排他的ロック(RwLock
) ~真打Arc<Mutex<T>>
登場~ - 【33】 チャネルなしで実装・Syncの話 ~考察回です~
- 【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
- 【37】 Axumでクラサバ! ~最終回~
- 【おまけ1】 Rustで勘違いしていたこと3選 🏄🌴 【100 Exercises To Learn Rust 🦀 完走記事 🏃】
- 【おまけ2】 【🙇 懺悔 🙇】Qiitanグッズ欲しさに1日に33記事投稿した話またはQiita CLIとcargo scriptを布教する的な何か
100 Exercise To Learn Rust 演習第36回になります、ついにラス前で、解説に対応した演習は今回が最後です!
今回は非同期特有の考え方や操作が中心となっているみたいですね、サクッと行きたいと思います!(事情につき3問あります)
8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!
記事は執筆時点での理解という感じです
今回の関連ページ
[08_futures/05_blocking] ブロッキングを回避すべし
問題はこちらです。
// TODO: the `echo` server uses non-async primitives.
// When running the tests, you should observe that it hangs, due to a
// deadlock between the caller and the server.
// Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer)?;
socket.write_all(&buffer)?;
}
}
テストを含めた全体
// TODO: the `echo` server uses non-async primitives.
// When running the tests, you should observe that it hangs, due to a
// deadlock between the caller and the server.
// Use `spawn_blocking` inside `echo` to resolve the issue.
use std::io::{Read, Write};
use tokio::net::TcpListener;
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer)?;
socket.write_all(&buffer)?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (listener, addr) = bind_random().await;
tokio::spawn(echo(listener));
let requests = vec![
"hello here we go with a long message",
"world",
"foo",
"bar",
];
let mut join_set = JoinSet::new();
for request in requests {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
});
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}
「ブロッキングI/Oを使用するとデッドロックすることを確かめて、 spawn_blocking
を使って解消してください」という問題です。
解説
というわけで、 spawn_blocking
を使うように書き換えましょう。
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (socket, _) = listener.accept().await?;
+ tokio::task::spawn_blocking(|| -> anyhow::Result<()> {
let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer)?;
socket.write_all(&buffer)?;
+ Ok(())
+ });
}
}
どう変わったのでしょうか...?ブロッキングとノンブロッキングバージョンでどう違うのでしょう...?解説を試みてみます。
非同期処理でブロックしないとは、「 .await
でこまめに主導権をランタイムに返す」ということです。もし長時間ブロックするスレッドがあり、ずっと .await
してくれない場合、tokioランタイムにはそのタスクをどうにかする術はありません!
この問題うまいなぁと思うのは、 #[tokio::test]
マクロ以下なのでシングルスレッド処理になっているんですよね...よって問題では次のようにしてデッドロックが起こっています。
- tokioランタイムはシングルスレッドしか扱えない状況
-
.await
が入るたびにタスクを お手玉 している
-
- ノンブロッキングI/Oの場合は、
read_to_end
呼び出し時に.await
できるから、そこでwrite_all
側のタスクに切り替わるので、デッドロックしない - しかし、ブロッキングI/Oの場合は、
read_to_end
でtokioランタイムにスレッド主導権が返らないから、ずっと待ちになってしまう、デッドロック!
※ 以下の図は概略です!
全部非同期の場合
PoCソースコード
Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=de4d03d4204a1474c50cb855fed05a4c
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (mut socket, _) = listener.accept().await?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer).await?;
socket.write_all(&buffer).await?;
}
}
省略なし全体
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (mut socket, _) = listener.accept().await?;
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer).await?;
socket.write_all(&buffer).await?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (listener, addr) = bind_random().await;
tokio::spawn(echo(listener));
let requests = vec![
"hello here we go with a long message",
"world",
"foo",
"bar",
];
let mut join_set = JoinSet::new();
for request in requests {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
});
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}
今回のブロッキングI/O混入バージョン
では spawn_blocking
でどう解決するのか...?このメソッドを使うことで、 重たいブロッキング処理専用のスレッドを建ててそちらで実行 されます!
演習回答バージョン
暗号計算等重い処理やブロッキングAPIしか提供されておらずそれを使わなければならない場合などに対しては、この spawn_blocking
専用スレッドを用いると良さそうです。
[08_futures/06_async_aware_primitives] 非同期プリミティブ (非同期用実装)
問題はこちらです。
/// TODO: the code below will deadlock because it's using std's channels,
/// which are not async-aware.
/// Rewrite it to use `tokio`'s channels primitive (you'll have to touch
/// the testing code too, yes).
///
/// Can you understand the sequence of events that can lead to a deadlock?
use std::sync::mpsc;
pub struct Message {
payload: String,
response_channel: mpsc::Sender<Message>,
}
/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
loop {
if let Ok(msg) = receiver.recv() {
println!("Pong received: {}", msg.payload);
let (sender, new_receiver) = mpsc::channel();
msg.response_channel
.send(Message {
payload: "pong".into(),
response_channel: sender,
})
.unwrap();
receiver = new_receiver;
}
}
}
#[cfg(test)]
mod tests {
use crate::{pong, Message};
use std::sync::mpsc;
#[tokio::test]
async fn ping() {
let (sender, receiver) = mpsc::channel();
let (response_sender, response_receiver) = mpsc::channel();
sender
.send(Message {
payload: "pong".into(),
response_channel: response_sender,
})
.unwrap();
tokio::spawn(pong(receiver));
let answer = response_receiver.recv().unwrap().payload;
assert_eq!(answer, "pong");
}
}
ブロッキングして到着を待つ std::sync::mpsc;
だとデッドロックが起きるので、非同期用の tokio::sync::mpsc
を使うように書き直そうという問題です。
解説
回答としては、 .await
が使用可能な tokio::sync::mpsc
に置き換えるだけです!
// use std::sync::mpsc;
+use tokio::sync::mpsc;
pub struct Message {
payload: String,
response_channel: mpsc::Sender<Message>,
}
/// Replies with `pong` to any message it receives, setting up a new
/// channel to continue communicating with the caller.
pub async fn pong(mut receiver: mpsc::Receiver<Message>) {
loop {
+ if let Some(msg) = receiver.recv().await {
println!("Pong received: {}", msg.payload);
let (sender, new_receiver) = mpsc::channel(1);
msg.response_channel
.send(Message {
payload: "pong".into(),
response_channel: sender,
})
+ .await
.unwrap();
receiver = new_receiver;
}
}
}
#[cfg(test)]
mod tests {
use crate::{pong, Message};
// use std::sync::mpsc;
+ use tokio::sync::mpsc;
#[tokio::test]
async fn ping() {
let (sender, receiver) = mpsc::channel(1);
let (response_sender, mut response_receiver) = mpsc::channel(1);
sender
.send(Message {
payload: "pong".into(),
response_channel: response_sender,
})
+ .await
.unwrap();
tokio::spawn(pong(receiver));
+ let answer = response_receiver.recv().await.unwrap().payload;
assert_eq!(answer, "pong");
}
}
「なぜデッドロックするかわかる?」と問われているので、先ほどと同様にシーケンス図に描き起こしてみようと思います。
std::sync::mpsc
バージョン
即効でデッドロックしましたね... recv()
は .await
したいです!
tokioに置き換えましょう。
tokio::sync::mpsc
バージョン
無事にピンポン できています。
ちなみに本エクササイズの Book の方の解説では std::sync::Mutex
と tokio::sync::Mutex
が取り上げられており、さらにstdの方を用いた場合のデッドロックの仕組みまで解説されています1。
解説側は Mutex
、 エクササイズ側は mpsc
を題材にしたみたいですね。この2つにとどまらず、前の問題に出てきた非同期I/O (tokio::io
) やTCPソケット等 (tokio::net
) 等、tokioクレートには非同期用プリミティブがたくさん用意されており、また大体が標準ライブラリが持つAPIに似ていてそこに .await
を付けられるようになっています。tokioクレート以外でも、例えばreqwestクレートはfeatureフラグで非同期・ブロッキングを切り替えられたりできて興味深いですね。
コストの面を考えると、何でもかんでも非同期プリミティブを使うべきではない (ブロッキングプリミティブを使えという意味ではありません)という言説2もあり、考えることが多い非同期ですが、クレートの対応具合や対応の仕方を見てみると面白そうです(Bookに忖度した感想)。
[08_futures/07_cancellation] タスクのキャンセル
問題はこちらです。主要な行をハイライトしておきます。
// TODO: fix the `assert_eq` at the end of the tests.
// Do you understand why that's the resulting output?
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
let mut buffer = Vec::new();
for _ in 0..n_messages {
let (mut stream, _) = listener.accept().await.unwrap();
let _ = tokio::time::timeout(timeout, async {
stream.read_to_end(&mut buffer).await.unwrap();
})
.await;
}
buffer
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn ping() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let messages = vec!["hello", "from", "this", "task"];
let timeout = Duration::from_millis(20);
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
for message in messages {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = socket.split();
let (beginning, end) = message.split_at(message.len() / 2);
// Send first half
writer.write_all(beginning.as_bytes()).await.unwrap();
tokio::time::sleep(timeout * 2).await;
writer.write_all(end.as_bytes()).await.unwrap();
// Close the write side of the socket
let _ = writer.shutdown().await;
}
let buffered = handle.await.unwrap();
let buffered = std::str::from_utf8(&buffered).unwrap();
+ assert_eq!(buffered, "");
}
}
最終回手前にして久々の「実行結果どうなる...?」問題ですね...!
解説
Book で解説されている通り、 .await
がある場所でのみ tokio::time::timeout
等を利用して外から停止させることができます。裏を返すと前の問題のような .await
せずブロッキングしてしまうようなタスクは停止できない ということですね。こまめな .await
が大切です。
.await
は入れ子構造になっています。タイムアウト時にランタイムにコントロールを返した .await
の時点で停止されるため、それを考慮して回答します!
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
pub async fn run(listener: TcpListener, n_messages: usize, timeout: Duration) -> Vec<u8> {
let mut buffer = Vec::new();
for _ in 0..n_messages {
let (mut stream, _) = listener.accept().await.unwrap();
let _ = tokio::time::timeout(timeout, async {
// 最初を受け取ったところでread_to_end実装内部のawaitがyieldする
// そしてタイムアウトするので、前半だけ残る
stream.read_to_end(&mut buffer).await.unwrap();
})
.await;
}
buffer
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn ping() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let messages = vec!["hello", "from", "this", "task"];
let timeout = Duration::from_millis(20);
let handle = tokio::spawn(run(listener, messages.len(), timeout.clone()));
+ let mut answer = String::new();
for message in messages {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = socket.split();
let (beginning, end) = message.split_at(message.len() / 2);
+ answer = format!("{}{}", answer, beginning);
// Send first half
writer.write_all(beginning.as_bytes()).await.unwrap();
tokio::time::sleep(timeout * 2).await;
writer.write_all(end.as_bytes()).await.unwrap();
// Close the write side of the socket
let _ = writer.shutdown().await;
}
let buffered = handle.await.unwrap();
let buffered = std::str::from_utf8(&buffered).unwrap();
- assert_eq!(buffered, "");
+ assert_eq!(buffered, &answer);
// 答えは、各文字列の前半分を足し合わせた
// hefrthta
}
}
read_to_end
メソッドでの .await
中にタイムアウトが来るように設定されています。 read_to_end
の内部でも .await
が呼ばれてバッファに書き込みを行っていると予想でき、前半部分だけが書き込まれているだろうと踏まえて回答すればそれが正解です。ハードコードせずその意図を組み入れた回答をしてみました!
AsyncDropがほしい...!(ない)
Book にてキャンセルの話のついでにクリーンアップの話題が出ています。そして「不幸なことに、Rustは非同期クリーンアップ操作に関して明確な手段を提供していません」みたいなことが書かれています。
クリーンアップと言えば、そう、我々には Drop
トレイトがあります(参考: 第12回)。しかしそのシグネチャには、 当然非同期的な要素は含まれません 。
pub trait Drop {
// Required method
fn drop(&mut self);
}
例えば tokio::runtime::Handle::current
を使うことで無理やり非同期処理を実行しようとすることは可能ですが、クリーンアップの確実性の担保が難しいです。
PoCソースコード
Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=cab4561a2ae8af4ce62ef8585a7ed9e6
use tokio::time::{sleep, Duration};
use tokio::runtime::Handle;
#[derive(Clone)]
struct Hoge {
// 所有するリソースには、所有権を奪える等スレッドを跨げるような細工をしておく
resource: Option<String>,
}
impl Hoge {
fn new(resource: String) -> Self {
Self {
resource: Some(resource),
}
}
async fn clean_up(resource: String) {
println!("Wait...");
sleep(Duration::from_millis(100)).await;
println!("Done: {}", resource);
}
}
impl Drop for Hoge {
fn drop(&mut self) {
let Some(resource) = self.resource.take() else {
return;
};
let handle = Handle::current();
handle.spawn(async move {
Hoge::clean_up(resource).await;
});
}
}
#[tokio::main]
async fn main() {
let h = Hoge::new("Beep".to_string());
drop(h);
sleep(Duration::from_millis(1000)).await;
}
Bookでは Drop
について、ケースに合わせて例えば次のように実装すると良いと書かれています。
- ランタイムで新しいタスクを作成する (挙げたPoCはこの方法)
-
mpsc
等を使ってメッセージを送り、別タスクにクリーンアップを任せる (良さそう...?) - 裏で別スレッドを立ち上げる
Bookの例にも挙げられているSQLトランザクションのabort等、確実に実行してほしくて、かつ非同期ではない、いつものRustなら Drop
トレイトにまかせておけばおkな処理でも、上記に挙げたような手法を取るしかないのが現状です。
このような要望に答えてくれそうな AsynDrop
を考案しているWG もあるらしいですが、一方で「 AsyncDrop
みたいなものが要らないような実装を心がけろ (おそらく上述したような対処をしようの意)」という言説もあります...umm
直感的に、 drop
メソッドは軽い処理であってほしいような気がするので、"重め"なことをするであろう AsyncDrop
というのはその思想に反していそうではあります3。一方、Pythonの with
や Goの defer
のような機能が Drop
に求められているのも事実です。
そのため筆者的には with
や defer
により近い、 Drop
とは異なるが、 Drop
と同様スコープが外れる時にRAIIライクなクリーンアップをしてくれるトレイトがほしいなぁとか思ったりします。 drop
実行前に実行されるイメージです。もしそんな感じの CleanUp
トレイト(筆者命名)があれば、 AsyncCleanUp
トレイトみたいなのはありなんじゃないでしょうか?(妄想)
さて、無事にここまでこれました。次回、いよいよ最終回です...!
次の記事: 【37】 Axumでクラサバ! ~最終回~
登場したPlayground
(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。
use tokio;
use std::sync::{Arc, Mutex};
async fn run(m: Arc<Mutex<Vec<u64>>>) {
let guard = m.lock().unwrap();
http_call(&guard).await;
println!("Sent {:?} to the server", &guard);
// `guard` is dropped here
}
async fn http_call(_v: &[u64]) {
// [...]
}
#[tokio::main]
async fn main() {
let amvu = Arc::new(Mutex::new(vec![0, 1, 2]));
run(amvu).await;
}
use std::sync::{Arc, Mutex};
async fn run(m: Arc<Mutex<Vec<u64>>>) {
let guard = m.lock().unwrap();
http_call(&guard).await;
println!("Sent {:?} to the server", &guard);
// `guard` is dropped here
}
/// Use `v` as the body of an HTTP call.
async fn http_call(v: &[u64]) {
// [...]
}
#[tokio::main]
async fn main() {
let m = Arc::new(Mutex::new(vec![0, 1, 2]));
tokio::spawn(run(m));
}
-
解説で登場するソースコード、
MutexGuard
は!Send
なため一見するとコンパイルが通らなそうに見えるのですが、tokio::spawn
とかで包まない場合コンパイル通りますね(包む場合はコンパイルエラー)...block_on
はSend
を要求しないからでしょうか...? ↩ -
そのため逆に言えば
mpsc
で外にクリーンアップを任せるのが筋の良い方法に見えます。ただ結局非同期mpsc
を使う都合上、考えることは多そうです。spawn_blocking
で別スレッドを建ててblocking_send
とか...? ↩