rust
Iron

Ironのコードを読んでみた

本記事はRustその2 Advent Calendar 2017 の13日目です。

最近、有志でMastering Rustの輪読会を開き、少しずつRustの勉強をしています。
未だ、Rust入門者の域をでないレベルではありますが、先日 Rust User Community もくもく会@渋谷 1st に参加し、そこでIronのコードリーディングをしたので、以下に纏めたいと思います。

環境

本記事は下記の環境で確認しています。

  • rustc: 1.21.0
  • cargo: 0.22.0
  • Iron: 0.6.0

Ironとは?

IronはRustで書かれたWebFrameworkです。12月15日現在Githubのstar数は4500overといくつかあるRustのWebFramework(nickel, rocket等)の中で一番starを集めています。
Iron自体はより低レベルなWebFrameworkであるhyperをラップする形で開発されており、公式ドキュメントによると

Iron is meant to be as extensible and pluggable as possible; Iron's core is concentrated and avoids unnecessary features by leaving them to middleware, plugins, and modifiers.

ということで、Ironのコア自体は必要最低限の機能のみを提供し、middleware等を追加することで、機能を拡張することができるのが特徴のようです。

Ironの使い方

まずはIronの使い方について、簡単に確認します。

利用方法はCargo.tomlに以下を追加するだけです。

[dependencies]
iron = "*"

単純なリクエストを返すWebサーバは以下の通り。

extern crate iron;

use iron::prelude::*;
use iron::status;

fn main() {
    Iron::new(|_: &mut Request| {
        Ok(Response::with((status::Ok, "Hello World!")))
    }).http("localhost:3000").unwrap();
}

Iron::newに渡しているのはHandlerで&mut Requestを受け取って、IronResult<Response>を返す関数です。上記ではそれをクロージャーで渡しています。

Iron::newでIronインスタンスを作成、httpメソッドがHttpResult<Listening>を返すので、unwrapでListeningを取り出しています。(Result型はScalaで言うTry型に近いもので、unwrapで成功時の値を取り出すことができます。)

ここでlocalhost:3000にアクセスするとstatus 200Hellow world!をリクエストボディとして返します。

% curl -v http://localhost:3000                                                                                                                                        
* Rebuilt URL to: http://localhost:3000/
*   Trying ::1...
* Connected to localhost (::1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.43.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Length: 12
< Content-Type: text/plain
< Date: Sat, 09 Dec 2017 14:47:05 GMT
<
* Connection #0 to host localhost left intact
Hello World!

Ironのコードリーディング

まずコードを追う前に、rustでFrameworkを使用せずに、listenする方法について確認しましょう。

rustではTCP/UDPを扱うstd::netが標準ライブラリとして用意されています。
std::netでlistenする方法は以下の通りです。

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:3000").unwrap();
    match listener.accept() {
        Ok((_socket, addr)) => println!("new client: {:?}", addr),
        Err(e) => println!("couldn't get client: {:?}", e),
    }
}

上記の処理の内容は下記の通りです。

  1. TCPListenerでport 3000番にbind
  2. listener.accept()でクライアントからのconnectionを確立するまで待つ
  3. データを受け取ったら、クライアントのIPアドレスを表示して終了

非常にシンプルで分かりやすいですね。

さて前置きが長くなりましたが、早速Ironのコードを見ていきましょう。
今回、Ironがportをbindingしてlistenするまでの流れについて確認したいと思います。
(※ 以下、普段Scalaを利用している為、ScalaにおけるXXXみたいな表現がところどころ入っていますがご容赦ください。)

Iron::new(<handler>).http("localhost:3000").unwrap();

まずはnewによって生成されるIronの構造体から確認しましょう。

iron/src/iron.rs
pub struct Iron<H> {
    /// Iron contains a `Handler`, which it uses to create responses for client
    /// requests.
    pub handler: H,

    /// Server timeouts.
    pub timeouts: Timeouts,

    /// The number of request handling threads.
    ///
    /// Defaults to `8 * num_cpus`.
    pub threads: usize,
}

当然ですが、newする際に必要となるhandlerがフィールドとして定義されていますね。その他にもtimeoutやthreadを設定することができるようです。続いて、Ironのimplを確認しましょう。(コメント等は削除しています。)

iron/src/iron.rs
impl<H: Handler> Iron<H> {
    pub fn new(handler: H) -> Iron<H> {
        Iron {
            handler: handler,
            timeouts: Timeouts::default(),
            threads: 8 * ::num_cpus::get(),
        }
    }
    ...
}

まずトレイト境界として、Handlerトレイトが指定されています。Handlerトレイトは&mut Requestを受け取って、IronResult<Response>を返すメソッドが定義されている為、newが受け取る型を制限することができます。newの役割は受け取ったhandlerでIronインスタンスを生成するだけですね。

続いて、同様にimplに定義されているhttpを確認しましょう。

iron/src/iron.rs
pub fn http<A>(self, addr: A) -> HttpResult<Listening>
    where A: ToSocketAddrs
{
    HttpListener::new(addr).and_then(|l| self.listen(l, Protocol::http()))
}

指定されたアドレスでHttpListenerを生成し、それをlistenに渡しているだけですね。
HttpListenerResult型を返すので、and_then(Scalaで言うflatMapのようなもの)を使って、中身をlistenに渡しています。HttpListenerを確認してみましょう。

hyper/src/net.rs
...
impl From<TcpListener> for HttpListener {
    fn from(listener: TcpListener) -> HttpListener {
        HttpListener {
            listener: Arc::new(listener),

            read_timeout : None,
            write_timeout: None,
        }
    }
}

impl HttpListener {
    /// Start listening to an address over HTTP.
    pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
        Ok(HttpListener::from(try!(TcpListener::bind(addr))))
    }
}
...

HttpListenerhyper側で定義されています。newを見た際にTcpListener::bind(addr)を実行しているので、ここでportにbindしているのが分かります。
そのあと、fromメソッドでHttpListenerに変換していますが、これは後程見るように生成したTcpListenerを複数のthreadから参照する必要があるので、Arcでくるむ為だと思われます。

HttpListenerでportにbindしていることが分かったので、Ironに戻って、listenメソッドを確認しましょう。

iron/src/iron.rs
pub fn listen<L>(self, mut listener: L, protocol: Protocol) -> HttpResult<Listening>
    where L: 'static + NetworkListener + Send
{
    let handler = RawHandler {
        handler: self.handler,
        addr: try!(listener.local_addr()),
        protocol: protocol,
    };

    let mut server = Server::new(listener);
    server.keep_alive(self.timeouts.keep_alive);
    server.set_read_timeout(self.timeouts.read);
    server.set_write_timeout(self.timeouts.write);
    server.handle_threads(handler, self.threads)
}

listenは渡されたlistenerを元に新たにRawHandlerを生成し、その後、Serverを生成、timeout値等を設定した後、handle_threadsを実行という流れになります。RawHandlerを確認しましょう。

iron/src/iron.rs
struct RawHandler<H> {
    handler: H,
    addr: SocketAddr,
    protocol: Protocol,
}

impl<H: Handler> ::hyper::server::Handler for RawHandler<H> {
    fn handle(&self, http_req: HttpRequest, mut http_res: HttpResponse<Fresh>) {
        // Set some defaults in case request handler panics.
        // This should not be necessary anymore once stdlib's catch_panic becomes stable.
        *http_res.status_mut() = status::InternalServerError;

        // Create `Request` wrapper.
        match Request::from_http(http_req, self.addr, &self.protocol) {
            Ok(mut req) => {
                // Dispatch the request, write the response back to http_res
                self.handler.handle(&mut req).unwrap_or_else(|e| {
                    error!("Error handling:\n{:?}\nError was: {:?}", req, e.error);
                    e.response
                }).write_back(http_res)
            },
            Err(e) => {
                error!("Error creating request:\n    {}", e);
                bad_request(http_res)
            }
        }
    }
}

handleRequesthandlerIron::newにて、Requestを受け取った際に実行したい処理)に渡して実行しています。今回は説明を省略しますがhandlerが返すResponse型にwrite_backが定義されており、クライアントへのデータの返送処理を担当しています。

ここまでで、私達が定義したhandlerが実行され、クライアントへデータを返すまでの流れが分かりました。
後はRawHandlerhandleがどこで呼び出されるかを確認すれば良いだけですね。残されたhandler_threadsを確認しましょう。

hyper/src/server/mod.rs
impl<L: NetworkListener + Send + 'static> Server<L> {
    ...
    pub fn handle_threads<H: Handler + 'static>(self, handler: H,
            threads: usize) -> ::Result<Listening> {
        handle(self, handler, threads)
    }
}

fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
where H: Handler + 'static, L: NetworkListener + Send + 'static {
    let socket = try!(server.listener.local_addr());

    debug!("threads = {:?}", threads);
    let pool = ListenerPool::new(server.listener);
    let worker = Worker::new(handler, server.timeouts);
    let work = move |mut stream| worker.handle_connection(&mut stream);

    let guard = thread::spawn(move || pool.accept(work, threads));

    Ok(Listening {
        _guard: Some(guard),
        socket: socket,
    })
}

Serverhyper側で定義されています。handler_threadsは指定されたthread数をServerhandleに渡しているだけですね。

handleではlistener(HttpListener)を元にListenerPoolを生成し、handler(RawHandler)を元にWorkerを生成、streamを受取りhandler_connectionを実行するクロージャーを定義し、別スレッドでListenerPoolacceptを実行しています。

先にWorkerhandler_connectionを確認しましょう。

hyper/src/server/mod.rs
impl<H: Handler + 'static> Worker<H> {
    fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
        Worker {
            handler: handler,
            timeouts: timeouts,
        }
    }

    fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
        ...
        while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
            if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
                info!("set_read_timeout keep_alive {:?}", e);
                break;
            }
        }
        ...
    }
    ...

    fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut NetworkStream>,
            wrt: &mut W, addr: SocketAddr) -> bool {
        ...
        {
            let mut res = Response::new(wrt, &mut res_headers);
            res.version = version;
            self.handler.handle(req, res);
        }
        ...
    }

大分省略していますが、handle_connectionの中でkeep_alive_loopが呼ばれており、そこでhandler.handle(RawHandlerのhandle)が実行されていることが分かると思います。

続いて、ListenerPoolの定義は以下の通りです。

hyper/src/server/listener.rs
pub struct ListenerPool<A: NetworkListener> {
    acceptor: A
}

impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
    /// Create a thread pool to manage the acceptor.
    pub fn new(acceptor: A) -> ListenerPool<A> {
        ListenerPool { acceptor: acceptor }
    }

    pub fn accept<F>(self, work: F, threads: usize)
        ...

        for _ in 0..threads {
            spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
        }
        ...

    }
}

fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
      F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
    thread::spawn(move || {
        let _sentinel = Sentinel::new(supervisor, ());

        loop {
            match acceptor.accept() {
                Ok(stream) => work(stream),
                Err(e) => {
                    info!("Connection failed: {}", e);
                }
            }
        }
    });
}

こちらも大分省略していますが、処理の内容としては以下の通りです。

  1. acceptが指定されたthread数分、threadを生成(spawan_withの実行)
  2. 生成されたthreadでlisteneracceptして、クライアントからのconnectionを待つ
  3. クライアントからリクエストがあれば、work(Workerのhandle_connection)にデータを渡して実行する

上記で見たようにhandler_connectionRawHandlerhandle(私達が定義した処理の内容)が実行され、ユーザにレスポンスを返します。

spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())acceptor(HttpListener)cloneして別threadに渡しています。複数のthreadで共有される為、上記でlistenerArcでくるむ必要があったんですね。

まとめ

少し駆け足でしたが、Ironがportをbindingしてlistenするまでの流れを確認できました。

Iron自体はhyperを上手にラップし、bindingやlistenする等の低レベルの処理は実際はhyperが担当していることが分かったと思います。

Iron自体のコードはコンパクトでとても読みやすかったです。Ironにはchainというmiddlewareを連鎖させる機能が用意されている為、機会があれば、こちらも読んでみたいと思います。