エラトステネスの篩 async版
下の記事のつづき。
Threadで書いていたものをCoroutineというか、async/awaitで書き直す。
まずは tokio
と futures
を使うので、Cargo.toml
に下記を追加。
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
ロジックは同じ。async/await の動作機構は、スレッドとはまるで違うはずなのだが、tokioを使うとかなり普通に使える。異常だ。
チャンネルを使う際に tokio::sync::mpsc::channel
を使うこと。こちらの Receiver
は iter()
がないので、1つずつrecv
してやらなければならないところが、違うといえば違う。
use tokio::sync::mpsc;
async fn async_prime(n: i32) {
let (numbers_in, to_sieves) = mpsc::channel(1);
let (result_stream, mut result_out) = mpsc::channel(1);
tokio::spawn(async move {
async_sieve(to_sieves, result_stream).await;
});
tokio::spawn(async move {
for i in 2..n {
numbers_in.send(i).await.unwrap()
}
});
let mut res: Vec<i32> = Vec::new();
while let Some(one) = result_out.recv().await {
res.push(one);
}
println!("Received {:?}", res);
}
#[tokio::main]
async fn main() {
async_prime(1000).await;
}
再帰関数のasync化
篩のほうの関数はちょっと面倒。普通に async fn
では書けない。async
で直接再帰関数を定義すると、関数の型が無限にネストしてしまうかららしい。
以下の記事に詳しい。
https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
回避策としては、async fn
としてではなく、Box
された Future
を返す、通常の関数として定義すればいい。
use futures::future::{BoxFuture, FutureExt};
fn async_sieve(mut in_stream: mpsc::Receiver<i32>, result_stream: mpsc::Sender<i32>)
-> BoxFuture<'static, ()> {
async move {
if let Some(prime) = in_stream.recv().await {
result_stream.send(prime).await.unwrap();
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
async_sieve(rx, result_stream).await;
});
while let Some(i) = in_stream.recv().await {
if i % prime != 0 {
tx.send(i).await.unwrap();
}
}
}
}.boxed()
}
あちこちに.await
が挟まってちょっとうざいぐらいで、基本的には同じ。
不思議
std::sync::mpsc::Receiver
の recv
は、
fn recv(&self) -> Result<T, RecvError>
と定義されていて、引数が不変参照で、返り値Result
なのに、
tokio::sync::mpsc::Receiver
の recv
は
async fn recv(&mut self) -> Option<T>
となっていて、 引数が可変参照で、返り値もOption
。
なぜ?
Async フィボナッチ
テスト用に書いたフィボナッチもついでに掲載しておく。これであってるのかなあ。
use tokio::sync::mpsc;
use futures::future::{BoxFuture, FutureExt};
fn async_fib(n: i64) -> BoxFuture<'static, i64> {
async move {
if n > 2 {
let f1 = tokio::spawn(async move {async_fib(n-1)}).await.unwrap();
let f2 = tokio::spawn(async move {async_fib(n-2)}).await.unwrap();
f1.await + f2.await
} else {
1
}
}.boxed()
}
#[tokio::main]
async fn main() {
let n = 30;
println!("fib {:} {:?}", n, tokio::spawn(async move {async_fib(n)}).await.unwrap().await);
}