LoginSignup
0
0

Rustのチャンネルでエラトステネスの篩、async版

Last updated at Posted at 2024-05-05

エラトステネスの篩 async版

下の記事のつづき。

Threadで書いていたものをCoroutineというか、async/awaitで書き直す。

まずは tokiofutures を使うので、Cargo.toml に下記を追加。

[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"

ロジックは同じ。async/await の動作機構は、スレッドとはまるで違うはずなのだが、tokioを使うとかなり普通に使える。異常だ。
チャンネルを使う際に tokio::sync::mpsc::channel を使うこと。こちらの Receiveriter() がないので、1つずつrecv してやらなければならないところが、違うといえば違う。

use tokio::sync::mpsc;
async fn async_prime(n: i32) {
    let (numbers_in, to_sieves) = mpsc::channel(1);
    let (result_stream, mut result_out) = mpsc::channel(1);
    tokio::spawn(async move {
        async_sieve(to_sieves, result_stream).await;
    });
    tokio::spawn(async move {
        for i in 2..n {
            numbers_in.send(i).await.unwrap()
        }
    });

    let mut res: Vec<i32> = Vec::new();
    while let Some(one) = result_out.recv().await {
        res.push(one);
    }
    println!("Received {:?}", res);
}

#[tokio::main]
async fn main() {
    async_prime(1000).await;
}

再帰関数のasync化

篩のほうの関数はちょっと面倒。普通に async fn では書けない。async で直接再帰関数を定義すると、関数の型が無限にネストしてしまうかららしい。

以下の記事に詳しい。
https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html

回避策としては、async fn としてではなく、Box された Futureを返す、通常の関数として定義すればいい。

use futures::future::{BoxFuture, FutureExt};

fn async_sieve(mut in_stream: mpsc::Receiver<i32>, result_stream: mpsc::Sender<i32>) 
-> BoxFuture<'static, ()> {
    async move {
        if let Some(prime) = in_stream.recv().await {
            result_stream.send(prime).await.unwrap();
            let (tx, rx) = mpsc::channel(1);
            tokio::spawn(async move {
                async_sieve(rx, result_stream).await;
            });

            while let Some(i) = in_stream.recv().await {
                if i % prime != 0 {
                    tx.send(i).await.unwrap();
                }
            }
        }
    }.boxed()
}

あちこちに.await が挟まってちょっとうざいぐらいで、基本的には同じ。

不思議

std::sync::mpsc::Receiverrecv は、

fn recv(&self) -> Result<T, RecvError>

と定義されていて、引数が不変参照で、返り値Resultなのに、

tokio::sync::mpsc::Receiverrecv

async fn recv(&mut self) -> Option<T>

となっていて、 引数が可変参照で、返り値もOption

なぜ?

Async フィボナッチ

テスト用に書いたフィボナッチもついでに掲載しておく。これであってるのかなあ。

use tokio::sync::mpsc;
use futures::future::{BoxFuture, FutureExt};

fn async_fib(n: i64) -> BoxFuture<'static, i64> {
    async move {
        if n > 2 {
            let f1 = tokio::spawn(async move {async_fib(n-1)}).await.unwrap();
            let f2 = tokio::spawn(async move {async_fib(n-2)}).await.unwrap();
            f1.await + f2.await
        } else {
            1
        }
    }.boxed()
}

#[tokio::main]
async fn main() {
    let n = 30;
    println!("fib {:} {:?}", n, tokio::spawn(async move {async_fib(n)}).await.unwrap().await);
}
0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0