rust
hyper
tokio
futures-cpupool
futures-fs

Rust (tokio + hyper + ~~futures-cpupool~~ futures-fs) でシンプルな非同期IO WebServer を作る

※ 追追記 tokio_fs を使いましょう
※ 追記 futures_fs を使いましょう

Rust で非同期 IO をするには tokio が便利である。
しかし tokio は非同期ネットワークライブラリであり、node.js のように非同期でファイルシステムを扱うことができない。
これは、現状の Linux (Posix) には十分な非同期ファイルシステム用システムコールが整備されていないためである。
実際、node.js の中で用いられている非同期IOライブラリである libuv もブロッキングなファイル操作はスレッドプールで待つことで非同期ファイル操作を実現している
それに留まらず、 go, haskell(ghc), boost asio なども同様である。

そこで http ライブラリには hyper を使い、スレッドプールには futures_cpupool を使ったシンプルな静的ファイル配信Webサーバを実装した。

特徴

  • 非同期ネットワーク操作用の tokio_core::reactor::Handle と、非同期ファイル操作用の futures_cpupool::CpuPool を状態として持つ
  • ブロッキングな最低限のファイル操作を cpupool に投げる

コード

main.rs
extern crate getopts;
extern crate pretty_env_logger;
#[macro_use] extern crate log;
extern crate tokio_core;
extern crate mio;
extern crate futures;
extern crate futures_cpupool;
extern crate hyper;
extern crate hyper_tls;

use std::fs::{self};
use std::vec::{Vec};
use std::boxed::{Box};
use std::result::Result::{Ok, Err};
use std::io::{self, BufReader, Read};
use std::path::{Path};
use std::time::{Duration};
use futures::{Future};
use futures::future;
use futures::stream::{Stream};
use futures_cpupool::{CpuPool};
use tokio_core::reactor::{Core, Timeout, Handle};
use hyper::{Body, Client, Get, Post, StatusCode, Chunk};
use hyper::server::{Http, Service, Request, Response};
use hyper::header::{ContentLength};

struct WebService {
  handle: Handle, // for async network task
  pool: CpuPool, // for async fileio task
  publicDir: String, // cwd
}

impl WebService {
  fn static_file(&self, path: String) -> Box<Future<Item=String, Error=String>> {
    let handle = self.handle.clone();
    let publicDir = self.publicDir.clone();
    let task = self.pool.spawn_fn(move ||{
      let path = match path.as_str() {
        "/" => "/index.html".to_string(),
        _ => path
      };
      let filepath = format!("{}{}", publicDir, path);
      println!("{}", filepath);
      match std::fs::File::open(filepath) {
        Err(why) => Err(why.to_string()),
        Ok(mut file) => {
          let mut s = String::new();
          match file.read_to_string(&mut s) {
            Err(why) => Err("cannot read to string".to_string()),
            Ok(_) => Ok(s)
          }
        }
      }
    });
    return Box::new(task);
  }
}

impl Service for WebService {
  type Request = Request;
  type Response = Response<Box<Stream<Item=Chunk, Error=Self::Error>>>;
  type Error = hyper::Error;
  type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
  fn call(&self, req: Self::Request) -> Self::Future {
    match (req.method(), req.path()) {
      (&Get, _) =>
        Box::new(
          self.static_file(req.path().to_string()).then(|res|{
            let (statusCode, bodyString) = match res {
              Ok(body) => (StatusCode::Ok, body),
              Err(_) => (StatusCode::NotFound, "Not Found".to_string()),
            };
            let length = bodyString.len() as u64;
            let body: Box<Stream<Item=Chunk, Error=Self::Error>> = Box::new(Body::from(bodyString));
            return future::ok(Response::new()
              .with_status(statusCode)
              .with_header(ContentLength(length))
              .with_body(body));
        })),
      _ => Box::new(future::ok(Response::new().with_status(StatusCode::InternalServerError))),
    }
  }
}

fn main() {
  pretty_env_logger::init();

  let args: Vec<String> = std::env::args().collect();
  let program = args[0].clone();
  let mut opts = getopts::Options::new();
  opts.optopt("p", "port", "port", "3000");
  opts.optflag("h", "help", "print this help menu");
  let matches = opts.parse(&args[1..]).unwrap();
  if matches.opt_present("h") {
    let help = format!("Usage: {} FILE [options]", program);
    print!("{}", opts.usage(&help));
    return;
  }

  let port = matches.opt_str("p").unwrap();
  let addr = format!("127.0.0.1:{}", port).parse().unwrap();

  // async io resource
  let pool = CpuPool::new_num_cpus(); // for file aio
  let mut core = Core::new().unwrap(); // for network aio

  // create web service
  let server_handle = core.handle();
  let service_pool = pool.clone();
  let service_handle = core.handle();
  let serve = Http::new().serve_addr_handle(&addr, &server_handle, move ||{
    let service = WebService{
      handle: service_handle.clone(),
      pool: service_pool.clone(),
      publicDir: std::env::current_dir().unwrap().to_str().unwrap().to_string(),
    };
    return Ok(service);
  }).unwrap();
  println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());

  // launch web server
  let connection_handle = core.handle();
  server_handle.spawn(
    serve
      .for_each(move |conn| {
        // handle new connection
        connection_handle.spawn(
          conn
            .map(|_| ())
            .map_err(|err| println!("serve error: {:?}", err))
        );
        return Ok(());
      }).map_err(|_| ())
  );

  core.run(future::empty::<(), ()>()).unwrap();
}
Cargo.toml
[package]
name = "SimpleAIOWebServer"
version = "0.1.0"
authors = ["foo <foo@foo.com>"]

[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
futures-await = "0.1"
log = "0.4"
pretty_env_logger = "0.2"
mio = "0.6"
tokio-core = "0.1"
hyper = "0.11"
hyper-tls = "0.1"
websocket = "0.20"
getopts = "0.2"

後記

futures-fs 版

せっかくなので futures-fs 版も作った。
↑の cpupool 版と違ってファイルを stream で読み書きしているのが特徴です。

今後の async iron やその他非同期フレームワークは tokio + hyper + futures-fs + ??? の組み合わせが多くなりそう

main.rs
extern crate getopts;
extern crate pretty_env_logger;
#[macro_use] extern crate log;
extern crate tokio_core;
extern crate mio;
extern crate futures;
extern crate futures_cpupool;
extern crate futures_fs;
extern crate bytes;
extern crate hyper;
extern crate hyper_tls;

use std::vec::{Vec};
use std::boxed::{Box};
use std::result::Result::{Ok, Err};
use std::path::{Path, PathBuf};
use std::time::{Duration};
use futures::{Future, Stream};
use futures::future;
use futures::stream;
use futures_cpupool::{CpuPool};
use futures_fs::FsPool;
use bytes::{Bytes};
use tokio_core::reactor::{Core, Timeout, Handle};
use hyper::{Body, Client, Get, Post, StatusCode, Chunk};
use hyper::server::{Http, Service, Request, Response};


struct WebService {
  handle: Handle, // for async network task
  fs: FsPool, // for async fileio stream task
  public_dir: PathBuf, // cwd
}

impl WebService {
  /// (PathBuf path) -> std::unique_ptr<std::fstream>
  fn static_file(&self, path: PathBuf) -> Result<Box<Stream<Item=Bytes, Error=std::io::Error>>, std::io::Error> {
    let path = if path == Path::new("/") { PathBuf::from("index.html") }else{ path };
    let path = if path.is_absolute() { PathBuf::from(path.to_str().unwrap()[1..].to_string()) }else{ path };
    // need cwd check here
    let filepath = self.public_dir.join(path);
    println!("{}", filepath.to_str().unwrap());
    match filepath.canonicalize() {
      Err(why) => Err(why),
      Ok(path) =>{
        let fin = self.fs.read(filepath);
        Ok(Box::new(fin)) // std::make_unique
      }
    }
  }
}

impl Service for WebService {
  type Request = Request;
  type Response = Response<Box<Stream<Item=Chunk, Error=Self::Error>>>;
  type Error = hyper::Error;
  type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
  fn call(&self, req: Self::Request) -> Self::Future {
    match (req.method(), req.path()) {
      (&Get, _) => {
        let body: Box<Stream<Item=Chunk, Error=Self::Error>> = match self.static_file(Path::new(req.path()).to_path_buf()){
          Ok(fin) => Box::new(fin.map(|byte| Chunk::from(byte)).map_err(|err| hyper::Error::Io(err))),
          Err(_) => Box::new(Body::from("Not Found".to_string())),
        };
        let res: Self::Response = Response::new().with_status(StatusCode::Ok).with_body(body);
        let fut = future::ok(res);
        Box::new(fut)
      },
      _ => {
        let body: Box<Stream<Item=Chunk, Error=Self::Error>> = Box::new(Body::from("Method Not Allowed".to_string()));
        let res: Self::Response = Response::new().with_status(StatusCode::MethodNotAllowed).with_body(body);
        let fut = future::ok(res);
        Box::new(fut)
      },
    }
  }
}

fn main() {
  pretty_env_logger::init();

  let args: Vec<String> = std::env::args().collect();
  let program = args[0].clone();
  let mut opts = getopts::Options::new();
  opts.optopt("p", "port", "port", "3000");
  opts.optflag("h", "help", "print this help menu");
  let matches = opts.parse(&args[1..]).unwrap();
  if matches.opt_present("h") {
    let help = format!("Usage: {} FILE [options]", program);
    print!("{}", opts.usage(&help));
    return;
  }

  let port = matches.opt_str("p").unwrap();
  let addr = format!("127.0.0.1:{}", port).parse().unwrap();

  // async io resource
  let fspool = FsPool::new(4);
  let mut core = Core::new().unwrap(); // for network aio

  // create web service
  let server_handle = core.handle();
  let service_handle = core.handle();
  let serve = Http::new().serve_addr_handle(&addr, &server_handle, move ||{
    let service = WebService{
      handle: service_handle.clone(),
      fs: fspool.clone(),
      public_dir: std::env::current_dir().unwrap().as_path().to_path_buf(),
    };
    return Ok(service);
  }).unwrap();
  println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());

  // launch web server
  let connection_handle = core.handle();
  server_handle.spawn(
    serve
      .for_each(move |conn| {
        // handle new connection
        connection_handle.spawn(
          conn
            .map(|_| ())
            .map_err(|err| println!("serve error: {:?}", err))
        );
        return Ok(());
      }).map_err(|_| ())
  );

  core.run(future::empty::<(), ()>()).unwrap();
}

参考