本記事はRustその2 Advent Calendar 2017 の13日目です。
最近、有志でMastering Rustの輪読会を開き、少しずつRustの勉強をしています。
未だ、Rust入門者の域をでないレベルではありますが、先日 Rust User Community もくもく会@渋谷 1st に参加し、そこでIronのコードリーディングをしたので、以下に纏めたいと思います。
環境
本記事は下記の環境で確認しています。
- rustc: 1.21.0
- cargo: 0.22.0
- Iron: 0.6.0
Ironとは?
IronはRustで書かれたWebFrameworkです。12月15日現在Githubのstar数は4500overといくつかあるRustのWebFramework(nickel, rocket等)の中で一番starを集めています。
Iron自体はより低レベルなWebFrameworkであるhyperをラップする形で開発されており、公式ドキュメントによると
Iron is meant to be as extensible and pluggable as possible; Iron's core is concentrated and avoids unnecessary features by leaving them to middleware, plugins, and modifiers.
ということで、Ironのコア自体は必要最低限の機能のみを提供し、middleware等を追加することで、機能を拡張することができるのが特徴のようです。
Ironの使い方
まずはIronの使い方について、簡単に確認します。
利用方法はCargo.toml
に以下を追加するだけです。
[dependencies]
iron = "*"
単純なリクエストを返すWebサーバは以下の通り。
extern crate iron;
use iron::prelude::*;
use iron::status;
fn main() {
Iron::new(|_: &mut Request| {
Ok(Response::with((status::Ok, "Hello World!")))
}).http("localhost:3000").unwrap();
}
Iron::new
に渡しているのはHandlerで&mut Request
を受け取って、IronResult<Response>
を返す関数です。上記ではそれをクロージャーで渡しています。
Iron::new
でIronインスタンスを作成、httpメソッドがHttpResult<Listening>
を返すので、unwrapでListeningを取り出しています。(Result型はScalaで言うTry型に近いもので、unwrapで成功時の値を取り出すことができます。)
ここでlocalhost:3000
にアクセスするとstatus 200
で Hellow world!
をリクエストボディとして返します。
% curl -v http://localhost:3000
* Rebuilt URL to: http://localhost:3000/
* Trying ::1...
* Connected to localhost (::1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.43.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Length: 12
< Content-Type: text/plain
< Date: Sat, 09 Dec 2017 14:47:05 GMT
<
* Connection #0 to host localhost left intact
Hello World!
Ironのコードリーディング
まずコードを追う前に、rustでFrameworkを使用せずに、listenする方法について確認しましょう。
rustではTCP/UDPを扱うstd::netが標準ライブラリとして用意されています。
std::netでlistenする方法は以下の通りです。
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:3000").unwrap();
match listener.accept() {
Ok((_socket, addr)) => println!("new client: {:?}", addr),
Err(e) => println!("couldn't get client: {:?}", e),
}
}
上記の処理の内容は下記の通りです。
-
TCPListener
でport 3000番にbind -
listener.accept()
でクライアントからのconnectionを確立するまで待つ - データを受け取ったら、クライアントのIPアドレスを表示して終了
非常にシンプルで分かりやすいですね。
さて前置きが長くなりましたが、早速Ironのコードを見ていきましょう。
今回、Ironがportをbindingしてlistenするまでの流れについて確認したいと思います。
(※ 以下、普段Scalaを利用している為、ScalaにおけるXXXみたいな表現がところどころ入っていますがご容赦ください。)
Iron::new(<handler>).http("localhost:3000").unwrap();
まずはnew
によって生成されるIronの構造体から確認しましょう。
pub struct Iron<H> {
/// Iron contains a `Handler`, which it uses to create responses for client
/// requests.
pub handler: H,
/// Server timeouts.
pub timeouts: Timeouts,
/// The number of request handling threads.
///
/// Defaults to `8 * num_cpus`.
pub threads: usize,
}
当然ですが、new
する際に必要となるhandler
がフィールドとして定義されていますね。その他にもtimeoutやthreadを設定することができるようです。続いて、Ironのimplを確認しましょう。(コメント等は削除しています。)
impl<H: Handler> Iron<H> {
pub fn new(handler: H) -> Iron<H> {
Iron {
handler: handler,
timeouts: Timeouts::default(),
threads: 8 * ::num_cpus::get(),
}
}
...
}
まずトレイト境界として、Handler
トレイトが指定されています。Handler
トレイトは&mut Request
を受け取って、IronResult<Response>
を返すメソッドが定義されている為、new
が受け取る型を制限することができます。new
の役割は受け取ったhandlerでIronインスタンスを生成するだけですね。
続いて、同様にimplに定義されているhttp
を確認しましょう。
pub fn http<A>(self, addr: A) -> HttpResult<Listening>
where A: ToSocketAddrs
{
HttpListener::new(addr).and_then(|l| self.listen(l, Protocol::http()))
}
指定されたアドレスでHttpListener
を生成し、それをlisten
に渡しているだけですね。
HttpListener
がResult
型を返すので、and_then
(Scalaで言うflatMapのようなもの)を使って、中身をlisten
に渡しています。HttpListener
を確認してみましょう。
...
impl From<TcpListener> for HttpListener {
fn from(listener: TcpListener) -> HttpListener {
HttpListener {
listener: Arc::new(listener),
read_timeout : None,
write_timeout: None,
}
}
}
impl HttpListener {
/// Start listening to an address over HTTP.
pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
Ok(HttpListener::from(try!(TcpListener::bind(addr))))
}
}
...
HttpListener
はhyper
側で定義されています。new
を見た際にTcpListener::bind(addr)
を実行しているので、ここでportにbindしているのが分かります。
そのあと、from
メソッドでHttpListener
に変換していますが、これは後程見るように生成したTcpListenerを複数のthreadから参照する必要があるので、Arc
でくるむ為だと思われます。
HttpListener
でportにbindしていることが分かったので、Ironに戻って、listen
メソッドを確認しましょう。
pub fn listen<L>(self, mut listener: L, protocol: Protocol) -> HttpResult<Listening>
where L: 'static + NetworkListener + Send
{
let handler = RawHandler {
handler: self.handler,
addr: try!(listener.local_addr()),
protocol: protocol,
};
let mut server = Server::new(listener);
server.keep_alive(self.timeouts.keep_alive);
server.set_read_timeout(self.timeouts.read);
server.set_write_timeout(self.timeouts.write);
server.handle_threads(handler, self.threads)
}
listen
は渡されたlistenerを元に新たにRawHandlerを生成し、その後、Server
を生成、timeout値等を設定した後、handle_threads
を実行という流れになります。RawHandler
を確認しましょう。
struct RawHandler<H> {
handler: H,
addr: SocketAddr,
protocol: Protocol,
}
impl<H: Handler> ::hyper::server::Handler for RawHandler<H> {
fn handle(&self, http_req: HttpRequest, mut http_res: HttpResponse<Fresh>) {
// Set some defaults in case request handler panics.
// This should not be necessary anymore once stdlib's catch_panic becomes stable.
*http_res.status_mut() = status::InternalServerError;
// Create `Request` wrapper.
match Request::from_http(http_req, self.addr, &self.protocol) {
Ok(mut req) => {
// Dispatch the request, write the response back to http_res
self.handler.handle(&mut req).unwrap_or_else(|e| {
error!("Error handling:\n{:?}\nError was: {:?}", req, e.error);
e.response
}).write_back(http_res)
},
Err(e) => {
error!("Error creating request:\n {}", e);
bad_request(http_res)
}
}
}
}
handle
でRequest
をhandler
(Iron::new
にて、Requestを受け取った際に実行したい処理)に渡して実行しています。今回は説明を省略しますがhandler
が返すResponse
型にwrite_back
が定義されており、クライアントへのデータの返送処理を担当しています。
ここまでで、私達が定義したhandler
が実行され、クライアントへデータを返すまでの流れが分かりました。
後はRawHandler
のhandle
がどこで呼び出されるかを確認すれば良いだけですね。残されたhandler_threads
を確認しましょう。
impl<L: NetworkListener + Send + 'static> Server<L> {
...
pub fn handle_threads<H: Handler + 'static>(self, handler: H,
threads: usize) -> ::Result<Listening> {
handle(self, handler, threads)
}
}
fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
where H: Handler + 'static, L: NetworkListener + Send + 'static {
let socket = try!(server.listener.local_addr());
debug!("threads = {:?}", threads);
let pool = ListenerPool::new(server.listener);
let worker = Worker::new(handler, server.timeouts);
let work = move |mut stream| worker.handle_connection(&mut stream);
let guard = thread::spawn(move || pool.accept(work, threads));
Ok(Listening {
_guard: Some(guard),
socket: socket,
})
}
Server
もhyper
側で定義されています。handler_threads
は指定されたthread数をServer
のhandle
に渡しているだけですね。
handle
ではlistener(HttpListener)
を元にListenerPool
を生成し、handler(RawHandler)
を元にWorkerを生成、stream
を受取りhandler_connection
を実行するクロージャーを定義し、別スレッドでListenerPool
のaccept
を実行しています。
先にWorker
とhandler_connection
を確認しましょう。
impl<H: Handler + 'static> Worker<H> {
fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
Worker {
handler: handler,
timeouts: timeouts,
}
}
fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
...
while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
info!("set_read_timeout keep_alive {:?}", e);
break;
}
}
...
}
...
fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut NetworkStream>,
wrt: &mut W, addr: SocketAddr) -> bool {
...
{
let mut res = Response::new(wrt, &mut res_headers);
res.version = version;
self.handler.handle(req, res);
}
...
}
大分省略していますが、handle_connection
の中でkeep_alive_loop
が呼ばれており、そこでhandler.handle(RawHandlerのhandle)
が実行されていることが分かると思います。
続いて、ListenerPool
の定義は以下の通りです。
pub struct ListenerPool<A: NetworkListener> {
acceptor: A
}
impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
/// Create a thread pool to manage the acceptor.
pub fn new(acceptor: A) -> ListenerPool<A> {
ListenerPool { acceptor: acceptor }
}
pub fn accept<F>(self, work: F, threads: usize)
...
for _ in 0..threads {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
}
...
}
}
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
thread::spawn(move || {
let _sentinel = Sentinel::new(supervisor, ());
loop {
match acceptor.accept() {
Ok(stream) => work(stream),
Err(e) => {
info!("Connection failed: {}", e);
}
}
}
});
}
こちらも大分省略していますが、処理の内容としては以下の通りです。
-
accept
が指定されたthread数分、threadを生成(spawan_with
の実行) - 生成されたthreadで
listener
がaccept
して、クライアントからのconnectionを待つ - クライアントからリクエストがあれば、
work(Workerのhandle_connection)
にデータを渡して実行する
上記で見たようにhandler_connection
でRawHandler
のhandle(私達が定義した処理の内容)
が実行され、ユーザにレスポンスを返します。
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
でacceptor(HttpListener)
をclone
して別threadに渡しています。複数のthreadで共有される為、上記でlistener
をArc
でくるむ必要があったんですね。
まとめ
少し駆け足でしたが、Ironがportをbindingしてlistenするまでの流れを確認できました。
Iron自体はhyperを上手にラップし、bindingやlistenする等の低レベルの処理は実際はhyperが担当していることが分かったと思います。
Iron自体のコードはコンパクトでとても読みやすかったです。Ironにはchain
というmiddleware
を連鎖させる機能が用意されている為、機会があれば、こちらも読んでみたいと思います。