Rust
tokio
reqwest
futures

[Rust] reqwest + futures + tokioで非同期I/Oリクエスト入門

reqwestを使って、大量にAPIを呼びたい場合に結果をまとめてごにょごにょ処理したかったんですが

イマイチ使い方が分からなかったので、メモ。


環境


  • Rust 1.32

  • reqwest 0.9.9

  • tokio 0.1.15

  • futures 0.1.25


やりたいこと


  • APIを大量に呼んで、結果をまとめて別の処理をしたい


    • 1リクエスト1処理ではなく、全結果をみないとできない処理などがある




公式example

reqwest/async_multiple_requests.rs at aa6b1f4184b1093523096c8d9bdc2e36cb8f6e76 · seanmonstar/reqwest

2回くらいな場合は、joinして、tokio::runすればいいらしい


Futureの結果を取り出す


tokio::run

まずは、公式のexampleにあるtokio::run

tokio::run - Rust



  • tokio::runFuture<Item = (), Error = ()> なFutureを受け取って、結果を返さない


    • そりゃそうだ、Item=()でError=()だとしたら返すべき結果が無い




tokio::runtime::Runtime

tokio::runtime::Runtime - Rust


tokio::runtime::Runtime spawn

pub fn spawn<F>(&mut self, future: F) -> &mut Self 

where
F: Future<Item = (), Error = ()> + Send + 'static,


  • spawnはやはり、Futureの型がtokio::runと同じ


tokio::runtime::Runtime block_on

pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> 

where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,


  • これは結果を返せる


tokio::runtime::Runtime block_on_all

pub fn block_on_all<F, R, E>(self, future: F) -> Result<R, E> 

where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,


  • これも、同じか?

  • block_onとなにが違うんだろか

block_on

    pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>

where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
entered.block_on(rx).unwrap()
}

block_on_all

    pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E>

where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on_all");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
let block = rx
.map_err(|_| unreachable!())
.and_then(move |r| {
self.shutdown_on_idle()
.map(move |()| r)
});
entered.block_on(block).unwrap()
}


  • block_onは &mut self


    • なので、つまりRuntimeを使い回せる?



  • block_on_allは mut self


    • channelのreceiverがなんか受け取ったあとにshutdown_on_idleしてるので、使い回さずにそのままRuntimeを破棄して終了していいなら、block_on_allを使うのか?




tokio_core::reactor::Core

Rust で並列HTTP Request (非同期IO版) - Qiita

こちらにはtokio_core::reactor::Core runを使う方法で書かれている。

join_allして、runする感じか、なるほど。


試してみる

reqwest/async_multiple_requests.rs at master · seanmonstar/reqwest

↑こちらを参考にしました。


Cargo.toml

[package]

name = "reqwest_sample"
version = "0.1.0"
authors = ["yagince <straitwalk@gmail.com>"]
edition = "2018"

[dependencies]
reqwest = "*"
futures = "*"
serde = "*"
serde_derive = "*"
serde_json = "*"
tokio = "*"
tokio-core = "*"
tokio-threadpool = "*"
rayon = "*"
lazy_static = "*"


extern crate futures;

extern crate reqwest;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
#[macro_use]
extern crate lazy_static;
extern crate tokio_threadpool;

use futures::sync::mpsc;
use futures::Future;

lazy_static! {
pub static ref CLIENT: reqwest::r#async::Client = reqwest::r#async::Client::new();
}

#[derive(Deserialize, Debug)]
pub struct Slideshow {
title: String,
author: String,
}

#[derive(Deserialize, Debug)]
pub struct SlideshowContainer {
slideshow: Slideshow,
}

pub fn fetch_async() -> impl Future<Item = SlideshowContainer, Error = reqwest::Error> {
CLIENT
.get("http://xxx.xxx/xxx")
.send()
.and_then(move |mut res| {
res.json()
})
}

pub fn fetch() -> Result<SlideshowContainer, reqwest::Error> {
let client = reqwest::Client::new();

client.get("http://xxx.xxx/xxx").send()?.json()
}

rayonのinto_par_iterでマルチスレッド化した場合と比較する為に、同期呼び出し用のメソッドも定義してます。

実行してみます。

#[cfg(test)]

mod tests {
extern crate rayon;
extern crate tokio;
extern crate tokio_core;
use super::*;
use rayon::prelude::*;
use std::time::Instant;

const N: usize = 10;

fn measure<T, F>(f: F) -> T
where
F: FnOnce() -> T,
{
let start = Instant::now();
let ret = f();
let end = start.elapsed();
println!(
"{}.{:03} secs",
end.as_secs(),
end.subsec_nanos() / 1_000_000
);
ret
}

#[test]
fn test_fetch() {
measure(||{
(0..N)
.into_par_iter()
.map(|i| (i, fetch()))
.collect::<Vec<_>>();
});
}
#[test]
fn test_fetch_async_reactor() {
use futures::future::join_all;
use futures::prelude::*;

let mut core = tokio_core::reactor::Core::new().unwrap();

measure(||{
let all = (0..N)
.into_iter()
.map(|i| fetch_async().map(move |x| (i, x)));

match core.run(join_all(all)) {
Ok(x) => println!("{:?}", x.len()),
Err(e) => eprintln!("{:?}", e),
}
});
}

#[test]
fn test_fetch_async_runtime_block_on() {
use futures::future::join_all;
use futures::prelude::*;

let mut core = tokio::runtime::Runtime::new().unwrap();

measure(||{
let all = (0..N)
.into_iter()
.map(|i| fetch_async().map(move |x| (i, x)));

match core.block_on(join_all(all)) {
Ok(x) => println!("{:?}", x.len()),
Err(e) => eprintln!("{:?}", e),
}
});
}
}

❯ cargo test -- --nocapture --test-threads=1

Compiling reqwest_sample v0.1.0 (/Users/xxxx/workspace/rust/reqwest-sample)
Finished dev [unoptimized + debuginfo] target(s) in 3.82s
Running target/debug/deps/reqwest_sample-bfed71ee3c478524

running 3 tests
test tests::test_fetch ... 10
0.978 secs
ok
test tests::test_fetch_async_reactor ... 10
0.360 secs
ok
test tests::test_fetch_async_runtime_block_on ... 10
0.375 secs
ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
...

reactorでもRuntimeでも実行できました。

rayonでiteratorをマルチスレッドにするだけよりはるかに速い。

N=100にしてみると。。。

running 3 tests

test tests::test_fetch ... 100
8.366 secs
ok
test tests::test_fetch_async_reactor ... 100
0.542 secs
ok
test tests::test_fetch_async_runtime_block_on ... 100
0.535 secs
ok

爆速。


問題点

Nの数を少しずつ上げてみたところ、一定数を超えたところで

block_onの方でエラーが発生

Error { kind: Hyper(Error { kind: Connect, cause: Os { code: 24, kind: Other, message: "Too many open files" } }), url: Some("http://xxx.xxx/xxx") }

Too many open filesなので、ulimit上げることもできなくはないけど

Threadpoolでpool数をある程度制限してやりたいところ。

どうするのか?


join_allせずに、1個ずつspawnしたうえで結果を受け取るには?

spawnすると結果を受け取れないので、channelでreceiverから受け取る感じかな?

それってどうやるのかなぁ、と思ってexampleを探してみてもあまりみつからず。

と思っていたら、答えはすぐそばにあった。。。

    pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>

where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let mut entered = enter().expect("nested block_on");
let (tx, rx) = futures::sync::oneshot::channel();
self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
entered.block_on(rx).unwrap()
}

futuresのchannelを作って、futureのthenでsendしてる。

なるほど、こういう感じか。


書き直してみる

    #[test]

fn test_fetch_async_rumtine_spawn() {
use futures::prelude::*;

measure(|| {
let mut core = tokio::runtime::Runtime::new().unwrap();

let (tx, rx) = tokio::sync::mpsc::channel(N);

(0..N).into_iter().for_each(|i| {
let tx = tx.clone();
core.spawn(
fetch_async(i)
.then(move |x| {
tx.send((i, x))
})
.map(|_| ())
.map_err(|e| println!("{:?}", e)),
);
});

match rx.take(N as u64).map(|res| {
// println!("{:?}", res);
res
}).collect().wait() {
Ok(x) => println!("{:?}", x.len()),
Err(e) => eprintln!("{:?}", e),
}
});
}

エラーは出なくなった。

ちょっと複雑になった。


問題点2

試してみると、結果が足りないケースがみつかった。

    #[test]

fn test_fetch_async_rumtine_spawn() {
use futures::prelude::*;

measure(|| {
let mut core = tokio::runtime::Runtime::new().unwrap();

let (tx, rx) = tokio::sync::mpsc::channel(N);

(0..N).into_iter().for_each(move |i| {
let tx = tx.clone();
core.spawn(
fetch_async()
.then(move |x| {
tx.send((i, x))
})
.map(|_| ())
.map_err(|e| println!("{:?}", e)),
);
});

match rx.take(N as u64).collect().wait() {
Ok(x) => println!("{:?}", x.len()),
Err(e) => eprintln!("{:?}", e),
}
});
}

問題はfor_eachでmoveしてることっぽい。

この状態だと。

N=10でやると

test tests::test_fetch_async_rumtine_spawn ... 0

0.005 secs
ok

結果が 0 で終了する。

以下のようにmoveしない状態だと。

    #[test]

fn test_fetch_async_rumtine_spawn() {
use futures::prelude::*;

measure(|| {
let mut core = tokio::runtime::Runtime::new().unwrap();

let (tx, rx) = tokio::sync::mpsc::channel(N);

(0..N).into_iter().for_each(|i| {
let tx = tx.clone();
core.spawn(
fetch_async()
.then(move |x| {
tx.send((i, x))
})
.map(|_| ())
.map_err(|e| println!("{:?}", e)),
);
});

match rx.take(N as u64).collect().wait() {
Ok(x) => println!("{:?}", x.len()),
Err(e) => eprintln!("{:?}", e),
}
});
}

test tests::test_fetch_async_rumtine_spawn ... 10

0.875 secs
ok

ちゃんと10個取れた。

やはり、なぜmoveでそうなるのか、は理解に至らず。。。


tokio_core::reactor::Coreとtokio::runtime::Runtimeは何が違うのか?


tokio_core::reactor::Core

https://docs.rs/tokio-core/0.1.17/tokio_core/reactor/struct.Core.html#method.run

Runs a future until completion, driving the event loop while we're otherwise waiting for the future to complete.

This function will begin executing the event loop and will finish once the provided future is resolved. Note that the future argument here crucially does not require the 'static nor Send bounds. As a result the future will be "pinned" to not only this thread but also this stack frame.

This function will return the value that the future resolves to once the future has finished. If the future never resolves then this function will never return.

未来が完成するまで走り、未来が完成するのを待っている間にイベントループを駆動します。

この関数はイベントループの実行を開始し、提供された未来が解決されると終了します。ここでの将来の引数が決定的に重要なのは `` static`や `Send`の範囲を必要としないことに注意してください。その結果、将来はこのスレッドだけでなくこのスタックフレームにも「固定」されます。

この関数は、未来が終了したときに未来が解決する値を返します。未来が決して解決しないならば、この関数は決して戻りません。

んー、Threadpoolじゃなくてイベントループを作って実行するっていうことなのかな。

https://tokio-rs.github.io/tokio/tokio/runtime/struct.Runtime.html#method.reactor

Runtimeにもreactorっていうメソッドがあるけど、depreatedになっている。


tokio::runtime::Runtime

https://docs.rs/tokio/0.1.15/tokio/runtime/struct.Runtime.html#method.spawn

Spawn a future onto the Tokio runtime.

This spawns the given future onto the runtime's executor, usually a thread pool. The thread pool is then responsible for polling the future until it completes.

Tokioランタイムに未来を生み出します。

これは与えられた未来をランタイムのエクゼキュータ、通常はスレッドプールに生み出す。スレッドプールは、完了するまで将来のポーリングを行います。

https://docs.rs/tokio/0.1.15/tokio/runtime/struct.Runtime.html#method.block_on

Run a future to completion on the Tokio runtime.

This runs the given future on the runtime, blocking until it is complete, and yielding its resolved result. Any tasks or timers which the future spawns internally will be executed on the runtime.

This method should not be called from an asynchronous context.

これは指定された未来を実行時に実行し、それが完了するまでブロックして、解決された結果をもたらします。未来が内部的に生み出すすべてのタスクまたはタイマーはランタイムで実行されます。

このメソッドは非同期コンテキストから呼び出さないでください。


ベンチマーク

test tests::test_fetch_async_reactor ... 10000

6.013 secs

test tests::test_fetch_async_rumtine_spawn ... 10000
3.351 secs


まとめ