注意事項(2019.8.19追記)
本記事で使っている Future は、futures-rs( https://github.com/rust-lang-nursery/futures-rs )の0.1です。
今は Future もstdに取り込まれ、async/await も実装間近ですので、この記事の内容は参考程度に考えてもらうと良いかもしれません。
前回( https://qiita.com/mas-yo/items/17848c0bc35474a7db5d )から、随分時間が経ってしまいました・・・。
前回Futureをtokioから呼んでみるところまで行きましたが、今回は、ただ呼ぶだけでなくて、時間がかかる処理を実行して、その結果を受け取るというのをやってみましょう。
前回のソース
use tokio::prelude::*;
struct FutureTest {
}
impl Future for FutureTest {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("poll called!");
Ok(Async::Ready(()))
}
}
fn main() {
let f = FutureTest{};
tokio::run(f);
}
Futureを実装するstructを作って、tokioに渡すと、poll()を呼んでくれるのでした。
さて、このFutureは何を意味するのか?
tokioのチュートリアルの、この辺りに書いてあります。
簡単に説明すると、Futureは、時間のかかる処理(DBクエリなど)や、何かを待つ処理(ソケットからのデータ受信など)を表すのに使うということです。
そして、poll()メソッドが返す値が、処理の状態を表します。
リファレンスによると
https://docs.rs/futures/0.1.27/futures/type.Poll.html
- Ok(Async::Ready(結果)) --- 結果でた
- Ok(Async::NotReady) --- まだ結果出てない
- Error(エラー内容) --- なんか失敗した
ですね。
前回、Async::Readyを返して試してみましたので、今度は、NotReadyを返してみましょう。
use tokio::prelude::*;
struct FutureTest {
}
impl Future for FutureTest {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("poll called!");
Ok(Async::NotReady) //まだ結果でてない!を表す
}
}
fn main() {
let f = FutureTest{};
tokio::run(f);
}
これを実行してみると、poll()は呼ばれるのですが、プログラムは停止したままで、終わることがありませんでした。
NotReadyを返したので、結果が出るのを待ってるんですね!
では、結果が出たよ、ということを伝えるには、どうしたら良いのでしょうか?
そのためには、Task
https://docs.rs/futures/0.1.27/futures/task/struct.Task.html
これを使います。
use tokio::prelude::*;
use std::thread;
use std::time;
struct FutureTest {
poll_count: i32, //pollが呼ばれた回数
}
impl Future for FutureTest {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
println!("poll called!");
self.poll_count += 1;
let task = task::current(); //現在のTaskを取得
//別スレッドで時間のかかる処理を実行
thread::spawn(move || {
thread::sleep(time::Duration::from_millis(1000));
task.notify(); //処理が終わったことを通知
});
if self.poll_count <= 1 {
println!("not ready");
Ok(Async::NotReady) //一回目呼ばれた時は NotReady
}
else {
println!("ready!");
Ok(Async::Ready(())) //次に呼ばれたら、Ready
}
}
}
fn main() {
let f = FutureTest{poll_count:0};
tokio::run(f);
}
実行結果
poll called!
not ready
[1秒後]
poll called!
ready!
実行結果を見てわかるとおり、notify() を呼ぶと、poll()が呼ばれることがわかります。
このように、メインスレッド(pollした方)は、待ち状態となっていて、別のスレッドで時間のかかる処理を実行し、結果が出たら、再びメインスレッドからpoll()が呼ばれる、という仕組みになっています。
tokioが何をしたいのか、だんだんわかってきた・・・!
その3につづきます。
今回作ったソース:
https://github.com/mas-yo/learn-tokio-step-by-step/tree/master/learn-tokio-2