前の記事
- 【0】 準備 ← 初回
- ...
- 【33】 チャネルなしで実装・Syncの話 ~考察回です~ ← 前回
- 【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ ← 今回
全記事一覧
- 【0】 準備
- 【1】 構文・整数・変数
- 【2】 if・パニック・演習
- 【3】 可変・ループ・オーバーフロー
- 【4】 キャスト・構造体 (たまにUFCS)
- 【5】 バリデーション・モジュールの公開範囲 ~ → カプセル化!~
- 【6】 カプセル化の続きと所有権とセッター ~そして不変参照と可変参照!~
- 【7】 スタック・ヒープと参照のサイズ ~メモリの話~
- 【8】 デストラクタ(変数の終わり)・トレイト ~終わりと始まり~
- 【9】 Orphan rule (孤児ルール)・演算子オーバーロード・derive ~Empowerment 💪 ~
- 【10】 トレイト境界・文字列・Derefトレイト ~トレイトのアレコレ~
- 【11】 Sized トレイト・From トレイト・関連型 ~おもしろトレイトと関連型~
- 【12】 Clone・Copy・Dropトレイト ~覚えるべき主要トレイトたち~
- 【13】 トレイトまとめ・列挙型・match式 ~最強のトレイトの次は、最強の列挙型~
- 【14】 フィールド付き列挙型とOption型 ~チョクワガタ~
- 【15】 Result型 ~Rust流エラーハンドリング術~
- 【16】 Errorトレイトと外部クレート ~依存はCargo.tomlに全部お任せ!~
- 【17】 thiserror・TryFrom ~トレイトもResultも自由自在!~
- 【18】 Errorのネスト・慣例的な書き方 ~Rustらしさの目醒め~
- 【19】 配列・動的配列 ~スタックが使われる配列と、ヒープに保存できる動的配列~
- 【20】 動的配列のリサイズ・イテレータ ~またまたトレイト登場!~
- 【21】 イテレータ・ライフタイム ~ライフタイム注釈ようやく登場!~
- 【22】 コンビネータ・RPIT ~ 「
Iterator
トレイトを実装してるやつ」~ - 【23】
impl Trait
・スライス ~配列の欠片~ - 【24】 可変スライス・下書き構造体 ~構造体で状態表現~
- 【25】 インデックス・可変インデックス ~インデックスもトレイト!~
- 【26】 HashMap・順序・BTreeMap ~Rustの辞書型~
- 【27】 スレッド・'staticライフタイム ~並列処理に見るRustの恩恵~
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>
~ - 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~
- 【32】
Send
・排他的ロック(Mutex
)・非対称排他的ロック(RwLock
) ~真打Arc<Mutex<T>>
登場~ - 【33】 チャネルなしで実装・Syncの話 ~考察回です~
- 【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
- 【37】 Axumでクラサバ! ~最終回~
- 【おまけ1】 Rustで勘違いしていたこと3選 🏄🌴 【100 Exercises To Learn Rust 🦀 完走記事 🏃】
- 【おまけ2】 【🙇 懺悔 🙇】Qiitanグッズ欲しさに1日に33記事投稿した話またはQiita CLIとcargo scriptを布教する的な何か
100 Exercise To Learn Rust 演習第34回になります、今回から8章、「非同期処理」に入っていきます!
8章の非同期について、全体的に筆者の理解度が低いです...他の本とかも読んでる最中...マサカリ大歓迎です!
記事は執筆時点での理解という感じです
今回の関連ページ
[08_futures/01_async_fn] async fn
問題はこちらです。
use tokio::net::TcpListener;
// TODO: write an echo server that accepts incoming TCP connections and
// echoes the received data back to the client.
// `echo` should not return when it finishes processing a connection, but should
// continue to accept new connections.
//
// Hint: you should rely on `tokio`'s structs and methods to implement the echo server.
// In particular:
// - `tokio::net::TcpListener::accept` to process the next incoming connection
// - `tokio::net::TcpStream::split` to obtain a reader and a writer from the socket
// - `tokio::io::copy` to copy data from the reader to the writer
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
async fn test_echo() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(echo(listener));
let requests = vec!["hello", "world", "foo", "bar"];
for request in requests {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
}
}
}
問題を要約すると次のような感じです。
- やってくるTCPコネクションを受け入れ、受け取ったデータをそのまま返信するエコーサーバーを書いてください
- 返信後もリターンはせず、新たにやってくるコネクションを受け入れ続けてください
- ヒント: 以下のtokioのドキュメントを見よう!
-
tokio::net::TcpListener::accept
: 新たなTCPコネクションを受け入れるのに使用 -
tokio::net::TcpStream::split
:socket
をreader, writerに分けるために使用 -
tokio::io::copy
: readerからwriterにデータを移動するのに使用
-
非同期初回からTCPリスナーを活用してエコーサーバーを作るというトバした内容になっていますね...「並行処理を突破した我々なら非同期は怖くないだろう!」みたいなノリで取り組めばよいのでしょうか?
解説
サクッと答えです!
pub async fn echo(listener: TcpListener) -> Result<(), anyhow::Error> {
loop {
let (mut socket, _) = listener.accept().await?;
let (mut reader, mut writer) = socket.split();
tokio::io::copy(&mut reader, &mut writer).await?;
}
}
コネクションは無限にやってくる可能性があるので loop
で回しています。 echo
関数は test_echo
テスト関数から tokio::spawn
でタスクとして起動されます。
説明に 次回範囲 も少しがかなり入りますが、 echo
関数は impl Future
なハンドル(以降これを簡単に"タスク"と呼ぶことにします)を返します。普通タスクは、 .await
が呼ばれるまでその中身は一切実行されません!(要はイテレータと同じ遅延評価) そんな中、 tokio::spawn
は impl Future
を返しながらも渡されたタスクを即実行してくれるランタイム提供の珍しい機能と言えます。(もとい、これがないと色々面倒そうです。)
tokio::spawn(echo(listener));
が返すハンドルはメインスレッドにて .await
( std::thread::spawn
で言う join
) をしていないためメインスレッドには終わりが訪れます。よってこの無限ループによる記法をしても問題ありません!
各行についてです。返すエラー型はバラバラですが、面倒なのでなぜか今回依存に入れてくれている anyhow::Error
に甘えてすべて ?
で中身を取り出しています。
1行目で接続を待ち、2行目で受けたソケットをreaderとwriterに分け、3行目で受け取った入力を返してます。 .await
が間に入っていたり、 async
が fn
の前についているだけで、同期プログラミングと比べて特段難しかったり読みにくかったりする部分はありません!
.await
で何をしているのか?
まだ例が簡単なうちに、 .await
で何が起こっているのか軽く触れておきます。
ズバリ、(おそらく) タスクが スレッドを跨いでいます !(おそらく)と書いたのはtokio非同期ランタイムが実際にどういう実装をしているのかちゃんと確かめたわけではないからです。
今までは比較的手動でスレッドを管理していたのに対して、 非同期処理では、ランタイム(tokio)がスレッドを管理しています。
tokioはタスクとスレッドを管理し、空いているスレッドにタスクを割り当てて処理をさせては、 .await
等が入るたびにまた引き上げて別なタスクを割り当てて...ということを行っています!手動でスレッドを管理するよりは、きめ細かく効率よくタスクを処理できる方式になっているわけです1。
概略図 (mermaid製のシーケンス図において、リソースを participant
とするためスレッドは actor
で表していきます)
※ listener.accept().await?;
された後等について、さらにこの内側のタスクが完了しているか?を調べてはまた待避させてを繰り返したりをするので、実際の図はもっと複雑です!
この図はおそらく正確に非同期タスクの状態変化を表せてはいないですが、要は何が言いたいかというと「空いたスレッドに処理を割り当てる」言い換えると「同じスコープで書いているように見せてその実 別なスレッドに値が移る 」という部分がミソです!
...そう、すなわち、「 .await
で一旦値が待避され、別スレッドに転送されて再開される」ので、値が .await
を跨ぐには Send
や Sync
が必要 になってくるわけです。前節までを振り返ると、少なくとも Send
は必要でしょう。
逆に言うとそれさえ抑えておけば非同期だからと言って特段身構える部分は少ないんじゃないかなと思っています。なぜって...?筆者がこの程度の理解で非同期処理を書けているからです!
[08_futures/02_spawn] 非同期タスク生成
問題はこちらです。
use tokio::net::TcpListener;
// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
// Multiple connections (on the same listeners) should be processed concurrently.
// The received data should be echoed back to the client.
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
todo!()
}
テストを含めた全体
use tokio::net::TcpListener;
// TODO: write an echo server that accepts TCP connections on two listeners, concurrently.
// Multiple connections (on the same listeners) should be processed concurrently.
// The received data should be echoed back to the client.
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::panic;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::JoinSet;
async fn bind_random() -> (TcpListener, SocketAddr) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
(listener, addr)
}
#[tokio::test]
async fn test_echo() {
let (first_listener, first_addr) = bind_random().await;
let (second_listener, second_addr) = bind_random().await;
tokio::spawn(echoes(first_listener, second_listener));
let requests = vec!["hello", "world", "foo", "bar"];
let mut join_set = JoinSet::new();
for request in requests.clone() {
for addr in [first_addr, second_addr] {
join_set.spawn(async move {
let mut socket = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut reader, mut writer) = socket.split();
// Send the request
writer.write_all(request.as_bytes()).await.unwrap();
// Close the write side of the socket
writer.shutdown().await.unwrap();
// Read the response
let mut buf = Vec::with_capacity(request.len());
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(&buf, request.as_bytes());
});
}
}
while let Some(outcome) = join_set.join_next().await {
if let Err(e) = outcome {
if let Ok(reason) = e.try_into_panic() {
panic::resume_unwind(reason);
}
}
}
}
}
「エコーズ、act2!!」というわけで、前節でも少し解説した tokio::spawn
を使ってタスクを生成しよう!という問題です。
解説
指示通り tokio::spawn
で前節の echo
関数により生成されるタスクを2つ生成すればおkです!
pub async fn echoes(first: TcpListener, second: TcpListener) -> Result<(), anyhow::Error> {
tokio::spawn(echo(first));
tokio::spawn(echo(second));
Ok(())
}
async fn echo(listener: TcpListener) {
loop {
let (mut socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let (mut reader, mut writer) = socket.split();
tokio::io::copy(&mut reader, &mut writer).await.unwrap();
});
}
}
tokio::spawn
を使用するメリットについてです。同じ非同期関数内で実行すると処理順が直列になってしまいますが、 tokio::spawn
としてタスクを切り出すことで、tokioランタイムが別スレッドでいい感じに処理してくれるようになります。 そのタスクの完了を待たずに 、新たなコネクションを受け入れることができています。
図を描いて説明しようかとも思ったのですが、7章で扱ったマルチスレッド処理( std::thread::spawn
)と恩恵は似ており今更かと思うので省略します。あちらは(例えば足し算処理など)似たような処理を並列化したり、サーバー・クライアント間でスレッドを分けたりしたのでした。非同期タスクでも同じようなことができます。Book の最後に言及されている通り、この管理をOSスケジューラに任せるのか、ソフトウェア(プログラム)側で制御するのかというのが本質的な違いでしょう。
では次の問題に行きましょう!
次の記事: 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
-
本当はノンブロッキングI/O・非同期I/O とか C10K問題 とかに言及するべきなのでしょうが、本気でマサカリで殺されてしまうので将来の自分に託したいと思います...まぁ非同期I/Oに注目した文脈での「非同期にしておいた方がスレッドを効率よく使えますよ」という説明なら嘘ではないはずなのでとりあえずこの理解でいます。(執筆時点のつっかかりメモ: 非同期I/Oはシグナルやコールバックで完了を通知するというが、シグナルの送信やコールバックの実行の責務を負うスレッドはいないのか...?結局どこかではスレッドがブロックされてはいないか(それでも幾分かは同期処理よりマシになるかもしれないが...)?例えばキーボードからの入力を想定したとして、結局はハードウェアレベルでの挙動まで理解しないと、この質問に回答することは叶わなさそう ) ↩