Help us understand the problem. What is going on with this article?

Rustの非同期フレームワークtokioを少しずつ試しながら勉強する その2

More than 1 year has passed since last update.

注意事項(2019.8.19追記)
本記事で使っている Future は、futures-rs( https://github.com/rust-lang-nursery/futures-rs )の0.1です。
今は Future もstdに取り込まれ、async/await も実装間近ですので、この記事の内容は参考程度に考えてもらうと良いかもしれません。

前回( https://qiita.com/mas-yo/items/17848c0bc35474a7db5d )から、随分時間が経ってしまいました・・・。

前回Futureをtokioから呼んでみるところまで行きましたが、今回は、ただ呼ぶだけでなくて、時間がかかる処理を実行して、その結果を受け取るというのをやってみましょう。

前回のソース

use tokio::prelude::*;

struct FutureTest {

}

impl Future for FutureTest {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("poll called!");
        Ok(Async::Ready(()))
    }
}

fn main() {
    let f = FutureTest{};

    tokio::run(f);
}

Futureを実装するstructを作って、tokioに渡すと、poll()を呼んでくれるのでした。
さて、このFutureは何を意味するのか?
tokioのチュートリアルの、この辺りに書いてあります。

https://tokio.rs/docs/getting-started/futures/

簡単に説明すると、Futureは、時間のかかる処理(DBクエリなど)や、何かを待つ処理(ソケットからのデータ受信など)を表すのに使うということです。

そして、poll()メソッドが返す値が、処理の状態を表します。
リファレンスによると
https://docs.rs/futures/0.1.27/futures/type.Poll.html

  • Ok(Async::Ready(結果)) --- 結果でた
  • Ok(Async::NotReady) --- まだ結果出てない
  • Error(エラー内容) --- なんか失敗した

ですね。
前回、Async::Readyを返して試してみましたので、今度は、NotReadyを返してみましょう。

use tokio::prelude::*;

struct FutureTest {

}

impl Future for FutureTest {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("poll called!");
        Ok(Async::NotReady)   //まだ結果でてない!を表す
    }
}

fn main() {
    let f = FutureTest{};

    tokio::run(f);
}

これを実行してみると、poll()は呼ばれるのですが、プログラムは停止したままで、終わることがありませんでした。
NotReadyを返したので、結果が出るのを待ってるんですね!

では、結果が出たよ、ということを伝えるには、どうしたら良いのでしょうか?
そのためには、Task
https://docs.rs/futures/0.1.27/futures/task/struct.Task.html
これを使います。

use tokio::prelude::*;
use std::thread;
use std::time;

struct FutureTest {
    poll_count: i32, //pollが呼ばれた回数
}

impl Future for FutureTest {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        println!("poll called!");
        self.poll_count += 1;

        let task = task::current(); //現在のTaskを取得

        //別スレッドで時間のかかる処理を実行
        thread::spawn(move || {
            thread::sleep(time::Duration::from_millis(1000));
            task.notify(); //処理が終わったことを通知
        });

        if self.poll_count <= 1 {
            println!("not ready");
            Ok(Async::NotReady)   //一回目呼ばれた時は NotReady
        }
        else {
            println!("ready!");
            Ok(Async::Ready(()))  //次に呼ばれたら、Ready
        }
    }
}

fn main() {
    let f = FutureTest{poll_count:0};

    tokio::run(f);
}

実行結果

poll called!
not ready
[1秒後]
poll called!
ready!

実行結果を見てわかるとおり、notify() を呼ぶと、poll()が呼ばれることがわかります。

このように、メインスレッド(pollした方)は、待ち状態となっていて、別のスレッドで時間のかかる処理を実行し、結果が出たら、再びメインスレッドからpoll()が呼ばれる、という仕組みになっています。

tokioが何をしたいのか、だんだんわかってきた・・・!
その3につづきます。

今回作ったソース:
https://github.com/mas-yo/learn-tokio-step-by-step/tree/master/learn-tokio-2

mas-yo
オンラインゲームのサーバーをC++で作っています。 Rust勉強中。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away