25
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rustその2Advent Calendar 2017

Day 13

Ironのコードを読んでみた

Last updated at Posted at 2017-12-12

本記事はRustその2 Advent Calendar 2017 の13日目です。

最近、有志でMastering Rustの輪読会を開き、少しずつRustの勉強をしています。
未だ、Rust入門者の域をでないレベルではありますが、先日 Rust User Community もくもく会@渋谷 1st に参加し、そこでIronのコードリーディングをしたので、以下に纏めたいと思います。

環境

本記事は下記の環境で確認しています。

  • rustc: 1.21.0
  • cargo: 0.22.0
  • Iron: 0.6.0

Ironとは?

IronはRustで書かれたWebFrameworkです。12月15日現在Githubのstar数は4500overといくつかあるRustのWebFramework(nickel, rocket等)の中で一番starを集めています。
Iron自体はより低レベルなWebFrameworkであるhyperをラップする形で開発されており、公式ドキュメントによると

Iron is meant to be as extensible and pluggable as possible; Iron's core is concentrated and avoids unnecessary features by leaving them to middleware, plugins, and modifiers.

ということで、Ironのコア自体は必要最低限の機能のみを提供し、middleware等を追加することで、機能を拡張することができるのが特徴のようです。

Ironの使い方

まずはIronの使い方について、簡単に確認します。

利用方法はCargo.tomlに以下を追加するだけです。

[dependencies]
iron = "*"

単純なリクエストを返すWebサーバは以下の通り。

extern crate iron;

use iron::prelude::*;
use iron::status;

fn main() {
    Iron::new(|_: &mut Request| {
        Ok(Response::with((status::Ok, "Hello World!")))
    }).http("localhost:3000").unwrap();
}

Iron::newに渡しているのはHandlerで&mut Requestを受け取って、IronResult<Response>を返す関数です。上記ではそれをクロージャーで渡しています。

Iron::newでIronインスタンスを作成、httpメソッドがHttpResult<Listening>を返すので、unwrapでListeningを取り出しています。(Result型はScalaで言うTry型に近いもので、unwrapで成功時の値を取り出すことができます。)

ここでlocalhost:3000にアクセスするとstatus 200Hellow world!をリクエストボディとして返します。

% curl -v http://localhost:3000                                                                                                                                        
* Rebuilt URL to: http://localhost:3000/
*   Trying ::1...
* Connected to localhost (::1) port 3000 (#0)
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/7.43.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Length: 12
< Content-Type: text/plain
< Date: Sat, 09 Dec 2017 14:47:05 GMT
<
* Connection #0 to host localhost left intact
Hello World!

Ironのコードリーディング

まずコードを追う前に、rustでFrameworkを使用せずに、listenする方法について確認しましょう。

rustではTCP/UDPを扱うstd::netが標準ライブラリとして用意されています。
std::netでlistenする方法は以下の通りです。

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:3000").unwrap();
    match listener.accept() {
        Ok((_socket, addr)) => println!("new client: {:?}", addr),
        Err(e) => println!("couldn't get client: {:?}", e),
    }
}

上記の処理の内容は下記の通りです。

  1. TCPListenerでport 3000番にbind
  2. listener.accept()でクライアントからのconnectionを確立するまで待つ
  3. データを受け取ったら、クライアントのIPアドレスを表示して終了

非常にシンプルで分かりやすいですね。

さて前置きが長くなりましたが、早速Ironのコードを見ていきましょう。
今回、Ironがportをbindingしてlistenするまでの流れについて確認したいと思います。
(※ 以下、普段Scalaを利用している為、ScalaにおけるXXXみたいな表現がところどころ入っていますがご容赦ください。)

Iron::new(<handler>).http("localhost:3000").unwrap();

まずはnewによって生成されるIronの構造体から確認しましょう。

iron/src/iron.rs

pub struct Iron<H> {
    /// Iron contains a `Handler`, which it uses to create responses for client
    /// requests.
    pub handler: H,

    /// Server timeouts.
    pub timeouts: Timeouts,

    /// The number of request handling threads.
    ///
    /// Defaults to `8 * num_cpus`.
    pub threads: usize,
}

当然ですが、newする際に必要となるhandlerがフィールドとして定義されていますね。その他にもtimeoutやthreadを設定することができるようです。続いて、Ironのimplを確認しましょう。(コメント等は削除しています。)

iron/src/iron.rs

impl<H: Handler> Iron<H> {
    pub fn new(handler: H) -> Iron<H> {
        Iron {
            handler: handler,
            timeouts: Timeouts::default(),
            threads: 8 * ::num_cpus::get(),
        }
    }
    ...
}

まずトレイト境界として、Handlerトレイトが指定されています。Handlerトレイトは&mut Requestを受け取って、IronResult<Response>を返すメソッドが定義されている為、newが受け取る型を制限することができます。newの役割は受け取ったhandlerでIronインスタンスを生成するだけですね。

続いて、同様にimplに定義されているhttpを確認しましょう。

iron/src/iron.rs
pub fn http<A>(self, addr: A) -> HttpResult<Listening>
    where A: ToSocketAddrs
{
    HttpListener::new(addr).and_then(|l| self.listen(l, Protocol::http()))
}

指定されたアドレスでHttpListenerを生成し、それをlistenに渡しているだけですね。
HttpListenerResult型を返すので、and_then(Scalaで言うflatMapのようなもの)を使って、中身をlistenに渡しています。HttpListenerを確認してみましょう。

hyper/src/net.rs
...
impl From<TcpListener> for HttpListener {
    fn from(listener: TcpListener) -> HttpListener {
        HttpListener {
            listener: Arc::new(listener),

            read_timeout : None,
            write_timeout: None,
        }
    }
}

impl HttpListener {
    /// Start listening to an address over HTTP.
    pub fn new<To: ToSocketAddrs>(addr: To) -> ::Result<HttpListener> {
        Ok(HttpListener::from(try!(TcpListener::bind(addr))))
    }
}
...

HttpListenerhyper側で定義されています。newを見た際にTcpListener::bind(addr)を実行しているので、ここでportにbindしているのが分かります。
そのあと、fromメソッドでHttpListenerに変換していますが、これは後程見るように生成したTcpListenerを複数のthreadから参照する必要があるので、Arcでくるむ為だと思われます。

HttpListenerでportにbindしていることが分かったので、Ironに戻って、listenメソッドを確認しましょう。

iron/src/iron.rs
pub fn listen<L>(self, mut listener: L, protocol: Protocol) -> HttpResult<Listening>
    where L: 'static + NetworkListener + Send
{
    let handler = RawHandler {
        handler: self.handler,
        addr: try!(listener.local_addr()),
        protocol: protocol,
    };

    let mut server = Server::new(listener);
    server.keep_alive(self.timeouts.keep_alive);
    server.set_read_timeout(self.timeouts.read);
    server.set_write_timeout(self.timeouts.write);
    server.handle_threads(handler, self.threads)
}

listenは渡されたlistenerを元に新たにRawHandlerを生成し、その後、Serverを生成、timeout値等を設定した後、handle_threadsを実行という流れになります。RawHandlerを確認しましょう。

iron/src/iron.rs
struct RawHandler<H> {
    handler: H,
    addr: SocketAddr,
    protocol: Protocol,
}

impl<H: Handler> ::hyper::server::Handler for RawHandler<H> {
    fn handle(&self, http_req: HttpRequest, mut http_res: HttpResponse<Fresh>) {
        // Set some defaults in case request handler panics.
        // This should not be necessary anymore once stdlib's catch_panic becomes stable.
        *http_res.status_mut() = status::InternalServerError;

        // Create `Request` wrapper.
        match Request::from_http(http_req, self.addr, &self.protocol) {
            Ok(mut req) => {
                // Dispatch the request, write the response back to http_res
                self.handler.handle(&mut req).unwrap_or_else(|e| {
                    error!("Error handling:\n{:?}\nError was: {:?}", req, e.error);
                    e.response
                }).write_back(http_res)
            },
            Err(e) => {
                error!("Error creating request:\n    {}", e);
                bad_request(http_res)
            }
        }
    }
}

handleRequesthandlerIron::newにて、Requestを受け取った際に実行したい処理)に渡して実行しています。今回は説明を省略しますがhandlerが返すResponse型にwrite_backが定義されており、クライアントへのデータの返送処理を担当しています。

ここまでで、私達が定義したhandlerが実行され、クライアントへデータを返すまでの流れが分かりました。
後はRawHandlerhandleがどこで呼び出されるかを確認すれば良いだけですね。残されたhandler_threadsを確認しましょう。

hyper/src/server/mod.rs
impl<L: NetworkListener + Send + 'static> Server<L> {
    ...
    pub fn handle_threads<H: Handler + 'static>(self, handler: H,
            threads: usize) -> ::Result<Listening> {
        handle(self, handler, threads)
    }
}

fn handle<H, L>(mut server: Server<L>, handler: H, threads: usize) -> ::Result<Listening>
where H: Handler + 'static, L: NetworkListener + Send + 'static {
    let socket = try!(server.listener.local_addr());

    debug!("threads = {:?}", threads);
    let pool = ListenerPool::new(server.listener);
    let worker = Worker::new(handler, server.timeouts);
    let work = move |mut stream| worker.handle_connection(&mut stream);

    let guard = thread::spawn(move || pool.accept(work, threads));

    Ok(Listening {
        _guard: Some(guard),
        socket: socket,
    })
}

Serverhyper側で定義されています。handler_threadsは指定されたthread数をServerhandleに渡しているだけですね。

handleではlistener(HttpListener)を元にListenerPoolを生成し、handler(RawHandler)を元にWorkerを生成、streamを受取りhandler_connectionを実行するクロージャーを定義し、別スレッドでListenerPoolacceptを実行しています。

先にWorkerhandler_connectionを確認しましょう。

hyper/src/server/mod.rs
impl<H: Handler + 'static> Worker<H> {
    fn new(handler: H, timeouts: Timeouts) -> Worker<H> {
        Worker {
            handler: handler,
            timeouts: timeouts,
        }
    }

    fn handle_connection<S>(&self, stream: &mut S) where S: NetworkStream + Clone {
        ...
        while self.keep_alive_loop(&mut rdr, &mut wrt, addr) {
            if let Err(e) = self.set_read_timeout(*rdr.get_ref(), self.timeouts.keep_alive) {
                info!("set_read_timeout keep_alive {:?}", e);
                break;
            }
        }
        ...
    }
    ...

    fn keep_alive_loop<W: Write>(&self, rdr: &mut BufReader<&mut NetworkStream>,
            wrt: &mut W, addr: SocketAddr) -> bool {
        ...
        {
            let mut res = Response::new(wrt, &mut res_headers);
            res.version = version;
            self.handler.handle(req, res);
        }
        ...
    }

大分省略していますが、handle_connectionの中でkeep_alive_loopが呼ばれており、そこでhandler.handle(RawHandlerのhandle)が実行されていることが分かると思います。

続いて、ListenerPoolの定義は以下の通りです。

hyper/src/server/listener.rs
pub struct ListenerPool<A: NetworkListener> {
    acceptor: A
}

impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
    /// Create a thread pool to manage the acceptor.
    pub fn new(acceptor: A) -> ListenerPool<A> {
        ListenerPool { acceptor: acceptor }
    }

    pub fn accept<F>(self, work: F, threads: usize)
        ...

        for _ in 0..threads {
            spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
        }
        ...

    }
}

fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
      F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
    thread::spawn(move || {
        let _sentinel = Sentinel::new(supervisor, ());

        loop {
            match acceptor.accept() {
                Ok(stream) => work(stream),
                Err(e) => {
                    info!("Connection failed: {}", e);
                }
            }
        }
    });
}

こちらも大分省略していますが、処理の内容としては以下の通りです。

  1. acceptが指定されたthread数分、threadを生成(spawan_withの実行)
  2. 生成されたthreadでlisteneracceptして、クライアントからのconnectionを待つ
  3. クライアントからリクエストがあれば、work(Workerのhandle_connection)にデータを渡して実行する

上記で見たようにhandler_connectionRawHandlerhandle(私達が定義した処理の内容)が実行され、ユーザにレスポンスを返します。

spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())acceptor(HttpListener)cloneして別threadに渡しています。複数のthreadで共有される為、上記でlistenerArcでくるむ必要があったんですね。

まとめ

少し駆け足でしたが、Ironがportをbindingしてlistenするまでの流れを確認できました。

Iron自体はhyperを上手にラップし、bindingやlistenする等の低レベルの処理は実際はhyperが担当していることが分かったと思います。

Iron自体のコードはコンパクトでとても読みやすかったです。Ironにはchainというmiddlewareを連鎖させる機能が用意されている為、機会があれば、こちらも読んでみたいと思います。

25
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?