前の記事
- 【0】 準備 ← 初回
- ...
-
【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ ← 前回 - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~ ← 今回
全記事一覧
- 【0】 準備
- 【1】 構文・整数・変数
- 【2】 if・パニック・演習
- 【3】 可変・ループ・オーバーフロー
- 【4】 キャスト・構造体 (たまにUFCS)
- 【5】 バリデーション・モジュールの公開範囲 ~ → カプセル化!~
- 【6】 カプセル化の続きと所有権とセッター ~そして不変参照と可変参照!~
- 【7】 スタック・ヒープと参照のサイズ ~メモリの話~
- 【8】 デストラクタ(変数の終わり)・トレイト ~終わりと始まり~
- 【9】 Orphan rule (孤児ルール)・演算子オーバーロード・derive ~Empowerment 💪 ~
- 【10】 トレイト境界・文字列・Derefトレイト ~トレイトのアレコレ~
- 【11】 Sized トレイト・From トレイト・関連型 ~おもしろトレイトと関連型~
- 【12】 Clone・Copy・Dropトレイト ~覚えるべき主要トレイトたち~
- 【13】 トレイトまとめ・列挙型・match式 ~最強のトレイトの次は、最強の列挙型~
- 【14】 フィールド付き列挙型とOption型 ~チョクワガタ~
- 【15】 Result型 ~Rust流エラーハンドリング術~
- 【16】 Errorトレイトと外部クレート ~依存はCargo.tomlに全部お任せ!~
- 【17】 thiserror・TryFrom ~トレイトもResultも自由自在!~
- 【18】 Errorのネスト・慣例的な書き方 ~Rustらしさの目醒め~
- 【19】 配列・動的配列 ~スタックが使われる配列と、ヒープに保存できる動的配列~
- 【20】 動的配列のリサイズ・イテレータ ~またまたトレイト登場!~
- 【21】 イテレータ・ライフタイム ~ライフタイム注釈ようやく登場!~
- 【22】 コンビネータ・RPIT ~ 「
Iterator
トレイトを実装してるやつ」~ - 【23】
impl Trait
・スライス ~配列の欠片~ - 【24】 可変スライス・下書き構造体 ~構造体で状態表現~
- 【25】 インデックス・可変インデックス ~インデックスもトレイト!~
- 【26】 HashMap・順序・BTreeMap ~Rustの辞書型~
- 【27】 スレッド・'staticライフタイム ~並列処理に見るRustの恩恵~
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>
~ - 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~
- 【32】
Send
・排他的ロック(Mutex
)・非対称排他的ロック(RwLock
) ~真打Arc<Mutex<T>>
登場~ - 【33】 チャネルなしで実装・Syncの話 ~考察回です~
- 【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
- 【37】 Axumでクラサバ! ~最終回~
- 【おまけ1】 Rustで勘違いしていたこと3選 🏄🌴 【100 Exercises To Learn Rust 🦀 完走記事 🏃】
- 【おまけ2】 【🙇 懺悔 🙇】Qiitanグッズ欲しさに1日に33記事投稿した話またはQiita CLIとcargo scriptを布教する的な何か
100 Exercise To Learn Rust 演習第35回になります!
8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!
記事は執筆時点での理解という感じです
今回の関連ページ
[08_futures/03_runtime] 非同期ランタイム tokio (もとい演習自体は Arc
の使用)
問題はこちらです。
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
// accept connections on both of them concurrently, and always reply clients by sending
// the `Display` representation of the `reply` argument as a response.
use std::fmt::Display;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
// `T` cannot be cloned. How do you share it between the two server tasks?
T: Display + Send + Sync + 'static,
{
todo!()
}
テストを含めた全体
// TODO: Implement the `fixed_reply` function. It should accept two `TcpListener` instances,
// accept connections on both of them concurrently, and always reply clients by sending
// the `Display` representation of the `reply` argument as a response.
use std::fmt::Display;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
// `T` cannot be cloned. How do you share it between the two server tasks?
T: Display + Send + Sync + 'static,
{
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::AsyncReadExt;
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (first_listener, first_addr) = bind_random().await;
let (second_listener, second_addr) = bind_random().await;
let reply = "Yo";
tokio::spawn(fixed_reply(first_listener, second_listener, reply));
let mut join_set = JoinSet::new();
for _ in 0..3 {
for addr in [first_addr, second_addr] {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, _) = socket.split();
// Read the response
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, reply.as_bytes());
});
}
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}
前回 解説した通り、 非同期処理は Send
を要求 します!そして、 reply: T
を複製せずに 複数タスク(複製スレッド)で所有したい とくれば...?
解説
そうですね、 Arc
の出番です!というわけで、非同期処理でもお世話になりましょう。
( 次回 にて非同期専用の Mutex
がある話等をするつもりですが、)このように、非同期処理も並行処理の一種で、 Send
の制約がある仲間ですから、まるまる7章で学んだ知識が活かせます!
use std::sync::Arc;
pub async fn fixed_reply<T>(first: TcpListener, second: TcpListener, reply: T)
where
// `T` cannot be cloned. How do you share it between the two server tasks?
T: Display + Send + Sync + 'static,
{
let reply = Arc::new(reply);
let reply2 = Arc::clone(&reply);
tokio::spawn(listen(first, reply));
tokio::spawn(listen(second, reply2));
}
async fn listen<T>(listener: TcpListener, reply: Arc<T>)
where
T: Display + Send + Sync + 'static,
{
loop {
let (mut socket, _) = listener.accept().await.unwrap();
let reply = Arc::clone(&reply);
tokio::spawn(async move {
let reply = reply.to_string();
socket.write_all(reply.as_bytes()).await.unwrap();
socket.flush().await.unwrap();
});
}
}
前回自分は「tokioは複数スレッドを管理する」と言ったのですが、今回の Book には「マルチスレッドとシングルスレッドの2つのフレーバーがあり選べて、 #[tokio::main]
マクロを使った場合はマルチスレッド、 #[tokio::test]
マクロを使用した場合はシングルスレッドになる」と解説されていたりします。
デッドロックってマルチスレッド版だと気づきにくかったりしますし、テストがシングルスレッドなのは中々ありがたいですね。
その他、「 tokio::spawn
は(スコープ async {}
を跨ぐ値について1) 'static
を要求する」とありますが、こちらは std::thread::spawn
と同様ですね。タスクがいつまで生きているかはわかりません!
[08_futures/04_future] FutureトレイトとSendトレイト
問題はこちらです。久々のコンパイルを通してください問題になります。
//! TODO: get the code to compile by **re-ordering** the statements
//! in the `example` function. You're not allowed to change the
//! `spawner` function nor what each line does in `example`.
//! You can wrap existing statements in blocks `{}` if needed.
use std::rc::Rc;
use tokio::task::yield_now;
fn spawner() {
tokio::spawn(example());
}
async fn example() {
let non_send = Rc::new(1);
yield_now().await;
println!("{}", non_send);
}
しまった...こっちも前回解説してしまいました... Send
ではない値は .await
を跨げません 。
解説
Send
を実装していない型であったところで、 .await
より前にドロップしてしまえば問題ないので、そのように書いちゃりましょう!
async fn example() {
{
let non_send = Rc::new(1);
println!("{}", non_send);
}
yield_now().await;
}
{}
内に収めることで Rc
を .await
前にドロップさせています。 drop
関数等を使っても良いかもしれません。
その他 Book で取り上げられている大きめなトピックとして、 async fn
の正体、 Future
トレイトの話があります。Rustでは非同期もトレイトで説明されるのです!
async fn xxx(...) -> T {
// ...
}
↓
fn xxx(...) -> impl Future<Output = T> {
// ...
}
※ 手書きの場合は + Send + 'static
みたいな形でさらに追加で色々付くことも...?また脱糖についての正確性は調べられてないです( cargo expand
でわかるのかな...?)
「 Future
トレイトを実装した型は ステートマシンである 」という内容を始めとし、かなり興味深い説明が書かれているので是非読んでほしいです!(筆者に説明の力量がない定期)
前回「 .await
を跨ぐには Send
が要る(キリッ」と筆者はぼかしながらテキトーに話していましたが、ちゃんとこれに対する理由も Future
トレイトを実装した列挙体を例に挙げ説明されていたりします。
Futureって200通りあんねん...
上記の impl Future
脱糖について、たまに型を手書きする必要があったりして、その時に筆者がよくお世話になっているQiita記事様のリンクを掲載させていただきます
https://qiita.com/legokichi/items/4f2c09330f90626600a6
いや、パターン多すぎるだろ...非同期への理解もかなり深まりそうです
では次の問題に行きましょう!
次の記事: 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
登場したPlayground
(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。
use tokio;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::spawn(async move {
let s = 10_u32;
let r = &s;
sleep(Duration::from_millis(100)).await;
println!("{}", r);
}).await.ok();
}
-
ローカルで生成される値への参照等であれば
'static
でなくとも.await
は跨げるようです。 Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9ca535b6f390d2ae81cdc7b3c3cd39a6 ↩