Rust の練習に、昔 Haskell や Clojure で書いた、並列にHTTP Request を送るプログラムを書いてみました。
但し、 HTTP ライブラリの hyper の非同期処理は絶賛開発中?のようだったので、今回は非同期IOによる並列処理ではなくて、スレッドによるものです。
extern crate hyper;
use std::io::prelude::*;
use std::io::stderr;
use std::sync::Arc;
use std::thread;
use std::time::{ Duration, SystemTime };
use hyper::Client;
fn to_seconds(duration: &Duration) -> f64 {
let nanos = duration.subsec_nanos() as u64;
let secs = duration.as_secs();
((1000_000_000 * secs + nanos) as f64) / 1000_000_000.
}
fn do_http_request(client: Arc<Client>, index: usize, url: &str) {
let time = SystemTime::now();
let result = client.get(url).send();
let duration = to_seconds(&time.elapsed().unwrap());
match result {
Ok(response) => {
println!("{:2}: {:5.3}: {} => {}",
index, duration, url, response.status);
}
Err(e) => {
let _ = writeln!(&mut stderr(), "{:2}: Error: {}", index, e);
}
};
}
fn main() {
let gen_url = |q: &str| -> String { format!("http://www.google.com/search?q={}", q) };
let urls: Vec<String> =
vec!["clojure", "haskell", "ocaml", "rust", "scala"]
.iter().map(|q| gen_url(q))
.chain(vec!["http://127.0.0.1:5000/".to_owned()].into_iter())
.chain(vec!["javascript", "python", "perl", "php", "ruby"]
.iter().map(|q| gen_url(q)))
.collect();
let client = Arc::new(Client::new());
let mut children = vec![];
for (i, url) in urls.into_iter().enumerate() {
let client = client.clone();
children.push(thread::spawn(move || {
do_http_request(client, i+1, &url)
}));
}
for child in children {
let _ = child.join();
}
}
// To check parallel http requests, prepare a long time request:
// $ rackup -s webrick --port 5000 -b 'run(proc { |env| sleep 5; ["200", {"Content-Type" => "text/html"}, ["Hello World"]] })'
// $ cargo run
// ...
// 1: 0.221: http://www.google.com/search?q=clojure => 200 OK
// 9: 0.223: http://www.google.com/search?q=perl => 200 OK
// 8: 0.223: http://www.google.com/search?q=python => 200 OK
// 10: 0.229: http://www.google.com/search?q=php => 200 OK
// 7: 0.235: http://www.google.com/search?q=javascript => 200 OK
// 5: 0.287: http://www.google.com/search?q=scala => 200 OK
// 11: 0.294: http://www.google.com/search?q=ruby => 200 OK
// 3: 0.303: http://www.google.com/search?q=ocaml => 200 OK
// 2: 0.429: http://www.google.com/search?q=haskell => 200 OK
// 4: 0.550: http://www.google.com/search?q=rust => 200 OK
// 6: 5.005: http://127.0.0.1:5000/ => 200 OK
Cargo.toml
[package]
name = "http_request1"
version = "0.1.0"
[dependencies]
hyper = "*"
はまったところ
最初、スレッド一覧を生成するのに、イテレーターを使って以下のようにしていたのですが、
let children = urls.into_iter().enumerate().map(|(i, url)| {
let client = client.clone();
thread::spawn(move || do_http_request(client, i+1, &url))
});
これだと、以下のように、常に HTTP Request が逐次実行されてしまい、並列に実行されずに悩んだのですが、
$ cargo run
...
1: 0.708: http://www.google.com/search?q=clojure => 200 OK
2: 0.186: http://www.google.com/search?q=haskell => 200 OK
3: 0.179: http://www.google.com/search?q=ocaml => 200 OK
4: 0.508: http://www.google.com/search?q=rust => 200 OK
5: 0.179: http://www.google.com/search?q=scala => 200 OK
6: 5.045: http://127.0.0.1:5000/ => 200 OK
7: 0.173: http://www.google.com/search?q=javascript => 200 OK
8: 0.181: http://www.google.com/search?q=python => 200 OK
9: 0.177: http://www.google.com/search?q=perl => 200 OK
10: 0.192: http://www.google.com/search?q=php => 200 OK
11: 0.202: http://www.google.com/search?q=ruby => 200 OK
どうやら イテレーターは遅延評価されるらしく 、そのため今回、 map
に渡したクロージャが即時評価されず、 child.join()
で初めて値が評価されて実行が開始された、という感じのようです。
追記 - 2017-05-29
はまったところ でイテレーターの遅延評価について書きましたが、コメント にて、イテレーターとコンシューマの関係、そして for 以外にも collect
メソッドがコンシューマであることを指摘頂いたので、 for
の代わりに collect
メソッドを呼ぶように修正しました。
extern crate hyper;
use std::io::prelude::*;
use std::io::stderr;
use std::sync::Arc;
use std::thread;
use std::time::{ Duration, SystemTime };
use hyper::Client;
fn to_seconds(duration: &Duration) -> f64 {
let nanos = duration.subsec_nanos() as u64;
let secs = duration.as_secs();
((1000_000_000 * secs + nanos) as f64) / 1000_000_000.
}
fn do_http_request(client: Arc<Client>, index: usize, url: &str) {
let time = SystemTime::now();
let result = client.get(url).send();
let duration = to_seconds(&time.elapsed().unwrap());
match result {
Ok(response) => {
println!("{:2}: {:5.3}: {} => {}",
index, duration, url, response.status);
}
Err(e) => {
let _ = writeln!(&mut stderr(), "{:2}: Error: {}", index, e);
}
};
}
fn main() {
let gen_url = |q: &str| -> String { format!("http://www.google.com/search?q={}", q) };
let urls: Vec<String> =
vec![].into_iter()
.chain(vec!["clojure", "haskell", "ocaml", "rust", "scala"]
.iter().map(|q| gen_url(q)))
.chain(vec!["http://127.0.0.1:5000/".to_owned()].into_iter())
.chain(vec!["javascript", "python", "perl", "php", "ruby"]
.iter().map(|q| gen_url(q)))
.collect();
let client = Arc::new(Client::new());
let children = urls.into_iter().enumerate().map(|(i, url)| {
let client = client.clone();
thread::spawn(move || do_http_request(client, i+1, &url))
}).collect::<Vec<_>>();
for child in children {
let _ = child.join();
}
}