非同期処理
Rustにおける非同期処理の歴史
Rustの非同期処理は様々な変遷を経て現在の状態に落ち着いています。
様々な変遷が生まれた理由としては、Rustが「言語仕様は最小限に保ち、外部クレート(ライブラリ)で機能を拡張させていく」方針であるためです。
これはRust自体が持つ言語仕様に柔軟な拡張性をもたらす反面、言語自体が高度な機能を提供しないという点で時に面倒な問題を引き起こします。
非同期処理はまさにその典型で、当初は様々な非同期処理用のクレートが乱立していました。
- async-std
- smol
- runtime
- fuchsia async
- Tokio
- etc...
最終的にはRust言語機能としてasync/awaitが正式に導入され、Tokioというクレートが事実上のデファクトスタンダードとなっています(様々なクレートやフレームワークで前提機能として使われており、自然体でTokioに依存する形となるため)。
2019年にRust1.39(async/await安定版)が発表されるまで、Rustの非同期処理は非常に難解で手軽には実装できない代物でしたが、現在は他言語に近い感覚で書くことができるようになっています。
Tokio
Tokioクレートは下記のような機能・特徴を持つ非同期ランタイムです。
- イベントループベースの非同期実行
- async/await構文のサポート
- 非同期I/O、タイマー、非同期用チャネル等を提供
- ゼロコスト抽象化による高いパフォーマンス
下記コマンド実行で依存関係を追加できますが、注意点として、Tokioは非常にボリュームの大きいクレートです。
一部機能だけを使いたい場合は特定機能の指定を、そうでない場合はfullを指定すると良いです(今回はわかりやすいのでfullにします)。
cargo add tokio --features "full"
async・await
async・awaitを使うことで、処理をタスク分割し、非同期実行することができます。
他の高級言語を触ったことがあればあまり違和感はないかと思います(せいぜいawaitの記述箇所が末尾なんだな〜ぐらいの感じでしょうか)。
use tokio::time::{sleep, Duration};
// この記述の実態はマクロで、main関数を非同期処理化します
#[tokio::main]
async fn main() {
println!("Hello");
// 処理をasync { }で囲むか、
let task1 = async {
sleep(Duration::from_millis(1000)).await;
println!("Task 1 completed");
};
// asyncをつけた関数を呼び出すことで非同期タスクを定義することが可能
let task2 = wait_five_hands();
// 並行実行
tokio::join!(task1, task2);
println!("All tasks completed");
}
async fn wait_five_hands() {
sleep(Duration::from_millis(500)).await;
println!("Waited for 500 milliseconds");
}
同期処理からの非同期処理実行方法
同期処理から非同期処理を呼び出す場合、ランタイムを生成し、同期的に処理したい場面でブロック指示を出します。
use tokio::time::{sleep, Duration};
use tokio::runtime::Runtime;
async fn fetch_data(no: u32) -> String {
sleep(Duration::from_millis(500)).await;
println!("データを取得しました{}", no);
"data".to_string()
}
fn main() {
println!("Hello");
let rt = Runtime::new().unwrap();
// 単一の非同期関数を実行
let result = rt.block_on(fetch_data(1));
println!("結果: {}", result);
// 複数の非同期タスクを並行実行
let result = rt.block_on(async {
let task1 = fetch_data(2);
let task2 = fetch_data(3);
let (r1, r2) = tokio::join!(task1, task2);
format!("{} & {}", r1, r2)
});
println!("並行実行結果: {}", result);
}
なお、block_onは現在のスレッドをブロックするため、非同期ランタイム内で呼び出すと処理が死にます。あくまで同期処理から呼び出すよう、ご注意ください。
非同期チャネル
非同期処理においても、異なるタスク間でデータをやり取りしたい場面はあるかと思います。
しかし非同期処理は必ずしもマルチスレッドという訳ではありません。また、それぞれのタスク実行タイミングにもばらつきがあります。
そのため、通常のスレッドチャネルと異なり送受信操作が非同期的に行われる、専用の非同期チャネルを使います。
mpsc(Multiple Producer, Single Consumer)
送受信の機能を持つ非同期チャネルです。
通常のチャネルがstd::sync::mpscだったのに対して、非同期チャネルはtokio::sync::mpscになります。
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// バッファサイズ32のチャネルを作成
let (sender, mut receiver) = mpsc::channel(32);
// 送信者を複数のタスクで使用
let sender1 = sender.clone();
tokio::spawn(async move {
for i in 1..=3 {
sleep(Duration::from_millis(100)).await;
sender1.send(format!("タスク1からのメッセージ: {}", i)).await.unwrap();
}
});
let sender2 = sender.clone();
tokio::spawn(async move {
for i in 1..=3 {
sleep(Duration::from_millis(150)).await;
sender2.send(format!("タスク2からのメッセージ: {}", i)).await.unwrap();
}
});
// 元のsenderをドロップして、すべての送信者が終了したことを示す。
// ここでdropしないとreceiver.recv().awaitが永遠の待機状態になってしまう
drop(sender);
// メッセージを受信
while let Some(msg) = receiver.recv().await {
println!("受信: {}", msg);
}
}
oneshot
一度だけ通信を可能にする非同期チャネルです。
通常のmpscよりも簡潔に書けるので、要件を満たせるならこちらも便利です。
use tokio::sync::oneshot;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
let result = "重要な計算結果".to_string();
tx.send(result).unwrap();
});
// 結果を待機
match rx.await {
Ok(result) => println!("結果を受信: {}", result),
Err(_) => println!("送信者がドロップされました"),
}
}
エラーハンドリング
非同期関数でもResult型と?演算子を使用してエラーハンドリングが可能です。
use tokio::fs;
use std::io;
async fn read_file_content(path: &str) -> Result<String, io::Error> {
let content = fs::read_to_string(path).await?;
Ok(content.trim().to_string())
}
async fn process_files() -> Result<(), Box<dyn std::error::Error>> {
// 複数のファイルを並行して読み込み
let (file1, file2) = tokio::try_join!(
read_file_content("file1.txt"),
read_file_content("file2.txt")
)?;
println!("ファイル1: {}", file1);
println!("ファイル2: {}", file2);
Ok(())
}
#[tokio::main]
async fn main() {
if let Err(e) = process_files().await {
eprintln!("エラーが発生しました: {}", e);
}
}
select!マクロによる競合実行
Tokioが提供する便利なマクロにselect!があります。
これは複数の非同期操作のうち最初に完了したものを処理できるユニークなマクロです。
この機能を使うと、例えば時間制限付きの入力待機などが簡単に実装できます。
use tokio::signal;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
_ = signal::ctrl_c() => {
println!("Ctrl+Cを検知。終了します。");
}
_ = sleep(Duration::from_secs(10)) => {
println!("10秒経過。終了します。");
}
}
}
非同期処理でCPU集約タスクの実行方法
通常の非同期タスク内でCPU集約処理を行ってしまうと、対象のスレッドが実質利用不可となってしまい、他の非同期タスク実行に遅延が発生する可能性があります。
そのため、TokioではCPU集約処理を行うための専用機能が提供されています。
それがspawn_blockingです。
spawn_blockingを使用すると、CPU集約処理を専用の別スレッド(通常の非同期スレッドプール以外)で実行することができます。
例えばrayonクレート(データ並列処理)とTokioクレート(非同期処理)を組み合わせる場合、spawn_blockingを活用することでそれぞれの特性を最大限活かすことができます。
use rayon::prelude::*;
use tokio::task;
#[tokio::main]
async fn main() {
let numbers: Vec<i64> = (1..=1_000_000).collect();
// 逐次処理を spawn_blocking 内で実行
let sequential_handle = task::spawn_blocking({
let numbers = numbers.clone();
move || {
let start = std::time::Instant::now();
let sum_sequential: i64 = numbers.iter().map(|x| x * x).sum();
let elapsed = start.elapsed();
(sum_sequential, elapsed)
}
});
// 並列処理を spawn_blocking 内で実行
let parallel_handle = task::spawn_blocking(move || {
let start = std::time::Instant::now();
let sum_parallel: i64 = numbers.par_iter().map(|x| x * x).sum();
let elapsed = start.elapsed();
(sum_parallel, elapsed)
});
// 結果取得
let (sum_sequential, sequential_time) = sequential_handle.await.unwrap();
let (sum_parallel, parallel_time) = parallel_handle.await.unwrap();
println!("逐次処理: {} (時間: {:?})", sum_sequential, sequential_time);
println!("並列処理: {} (時間: {:?})", sum_parallel, parallel_time);
}
マクロ
前提
最後にマクロについて改めて紹介です。
以前マクロを説明した際は利用方法のみのご紹介でした。
今回は自身でマクロを定義する方法の学習となります。
ただ、ぶっちゃけマクロの定義はわりと難しいというか、他の高級言語でまず意識することがない構文トークンやAST(抽象構文木)の多少の理解が求められる場面があります。
なので、入門講座としてはここはスキップしていただいても全然問題ないです。
自分も正直マクロをばりばり定義した経験はまだありません。
そういう浅い知識 & AI調査レベルでのご紹介になってしまう点はご了承ください。
概要
Rustにおけるマクロは、コンパイル時にコードを生成する仕組みです。頻出する処理の簡略化やトレイトの自動実装など、Rustの実装コスト削減を可能とする機能です。
ただし、一方でRustのフレームワークやクレートでは「マクロ依存度」が問題になるケースがあります。
マクロは便利な一方、その中でエラーや例外が起きた時にどんな処理に組み替えられているか分からず困ってしまう、という問題を秘めています。
これは完全に個人的な意見ですが、自分も「よく分からないけど裏側で何かやっている」はあまり好きではないので、マクロの実装前にまずは下記を検討する方が良いのかなと思っています。
- 関数・メソッドで対処できないか
- エラーや例外が発生するリスクがどの程度あるか
マクロの種別
Rustでは2種類のマクロが定義可能となっています。
- 宣言的マクロ (
macro_rules!): パターンマッチングベース - プロシージャルマクロ: Rustコードを操作する関数として実装
宣言的マクロ
宣言的マクロは、マクロ利用時に記述される式や型などをパターンマッチングで判定し、置き換えていく方式です。
公式提供のもので言うとvec!やprintln!が該当します。
実装例は下記の通りです。
// 基本的な宣言的マクロの定義
macro_rules! say_hello {
() => {
println!("Hello, macro world!");
};
}
// 引数を受け取るマクロ
macro_rules! create_function {
($func_name:ident) => {
fn $func_name() {
println!("You called {:?}()", stringify!($func_name));
}
};
}
// 可変長引数を処理するマクロ
macro_rules! find_min {
($x:expr) => ($x);
($x:expr, $($y:expr),+) => (
std::cmp::min($x, find_min!($($y),+))
);
}
fn main() {
say_hello!();
create_function!(foo);
foo();
println!("Minimum: {}", find_min!(1, 2, 3, 4, 5));
}
パターンマッチングでは下記のようなものが利用可能です。
ここら辺は正規表現に近い感覚なので、正規表現に慣れていればまだ理解しやすいかと思います。
-
$x:expr- 式 -
$x:ident- 識別子 -
$x:ty- 型 -
$x:pat- パターン -
$x:stmt- 文 -
$($x:expr),*- 0回以上の繰り返し -
$($x:expr),+- 1回以上の繰り返し
プロシージャルマクロ
プロシージャルマクロは、Rustコードを入力として受け取り、任意のRustコードを生成して返す方式です。
構文トークンやAST(抽象構文木)の理解が多少求められるため、宣言的マクロに比べると難しいです。
ただ、ユーティリティトレイトで出てきた#[derive(xxx)]のような機能を実現するには、このマクロが必要になります。
なお、このマクロは「別クレートとしてコンパイルされる」という性質上、ライブラリクレートにしか実装できない点に注意です。
実装例(ライブラリクレート側)
#Cargo.toml
[package]
name = "crate_test"
version = "0.1.0"
edition = "2024"
[lib]
proc-macro = true
[dependencies]
syn = "2.0"
quote = "1.0"
// lib.rs
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, DeriveInput};
#[proc_macro_derive(HelloWorld)]
pub fn hello_world_derive(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let name = input.ident;
let expanded = quote! {
// トレイト定義
pub trait HelloWorld {
fn hello();
}
// マクロ適用時の実装
impl HelloWorld for #name {
fn hello() {
println!("Hello, {}!", stringify!(#name));
}
}
};
TokenStream::from(expanded)
}
実装例(メイン側)
#Cargo.toml
[package]
name = "testes"
version = "0.1.0"
edition = "2024"
[dependencies]
crate_test = { path = "./crate_test" }
use crate_test::HelloWorld;
#[derive(HelloWorld)] // deriveだけで、
struct Point {
x: i32,
y: i32,
}
fn main() {
Point::hello(); // トレイトの関数が実行可能に!
}
お疲れ様でした!
お疲れ様でした。
これにてRust入門講座はおしまいです。
ここまでお付き合いくださった方々、ありがとうございました。
Rustの言語仕様、実践的な使い方、そして魅力が少しでも伝わりましたでしょうか?
個人の感想にはなりますが、Rustは他言語に比べて、必ずしも書きやすい言語ではないとは思っています。結構癖がありますし、コード量も他言語に比べると少し増えがちです。
ただ、きちんと使いこなせれば他の言語よりも高いパフォーマンスと安定性を提供することができます。
そういった意味で、自分の中では「エンジニアの努力でベストを尽くせる言語」だと思っています。頑張れば最大限良い結果をもたらすことができ、そしてそれは自分の努力次第、という点にちょっとワクワクする気持ちがあります。
そのワクワク感こそがRust最大の魅力なんじゃないかなと勝手に思っています。
また、例外に対する設計思想やドキュメントを重視しているスタイルなども自分の好みにマッチしているというのもありますね。
AmazonやMicrosoftなど大企業が採用している実績があり比較的メジャーな言語ではありますが、一方で難易度の問題もあって人口数はめちゃくちゃ多い訳ではないです。
結果的にエコシステムも他言語に比べるとまだまだ発展途中という印象ではありますが、逆に言えばもっと人口が増えていくとより面白い広がりが生まれるのではないかとも思っています。
今回の入門講座が少しでもそこに寄与できると嬉しいなと思いつつ、本入門講座を締めくくりたいと思います。
それでは改めて、本当にお疲れ様でした & ありがとうございました!
Hope you enjoy it!