これは Rustその3 Advent Calendar 2019 の 17 日目の記事です.
はじめに
hyper というライブラリがあります. これは「 高速 かつ 正しい HTTP の Rust 実装」1 を謳うライブラリです. GitHub のスター数は 2019/12/16 現在で 5600 を超える人気のあるライブラリです. Rust で HTTP サーバを作ろうと思ったときに hyper を選択される方も多いのではないでしょうか.
先日 12/11 には v0.13.0 がリリースされました. futures や tokio 等の外部クレートが async / .await
に対応したバージョンへと更新されており2, いよいよ Rust の非同期処理の基盤が整ってきたという感じがしますね.
さて, HTTP サーバというと Web API サーバとしての書き方が気になりますから, さっそく examples/web_api.rs を見てみましょう:
/* snip */
type GenericError = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, GenericError>;
/* snip */
async fn response_examples(
req: Request<Body>,
client: Client<HttpConnector>,
) -> Result<Response<Body>> {
/* snip */
}
#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
// Share a `Client` with all `Service`s
let client = Client::new();
let new_service = make_service_fn(move |_| {
// Move a clone of `client` into the `service_fn`.
let client = client.clone();
async {
Ok::<_, GenericError>(service_fn(move |req| {
// Clone again to ensure that client outlives this closure.
response_examples(req, client.to_owned())
}))
}
});
let server = Server::bind(&addr).serve(new_service);
println!("Listening on http://{}", addr);
server.await?;
Ok(())
}
Request
を受けてルーティングを行い Response
を返す処理の本体は response_examples
関数です.
そして main
関数では make_service_fn
に async
ブロックを持つクロージャを渡し, そのブロックの中では今度は service_fn
に response_examples
(を呼ぶクロージャ) を渡して (その結果を Ok
で包んで) います.
私が Rust や HTTP サーバに不慣れなだけかもしれませんが, 最初このコードを読んだとき, なぜ response_examples
を make_service_fn
と service_fn
という二重のメソッドに渡しているのだろう, それぞれ何をしているのか, この若干回りくどく見える手続きを踏む必要があるのはなぜだろうか? と不思議に思いました.
なので実装を読んでみることにしました.
Service
順番に定義を読んでいきましょう.
make_service_fn
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
where
F: FnMut(&Target) -> Ret,
Ret: Future,
{
MakeServiceFn { f }
}
// Not exported from crate as this will likely be replaced with `impl Service`.
pub struct MakeServiceFn<F> {
f: F,
}
impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>
where
F: FnMut(&Target) -> Ret,
Ret: Future<Output = Result<Svc, MkErr>>,
MkErr: Into<Box<dyn StdError + Send + Sync>>,
{
type Error = MkErr;
type Response = Svc;
type Future = Ret;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, target: &'t Target) -> Self::Future {
(self.f)(target)
}
}
make_service_fn
の引数 f
は Future
を返す FnMut
となっています.
いまのところ struct MakeServiceFn<F>
はとりあえず F
型のフィールド f
を包むだけですね.
impl Service for MakeServiceFn
の部分では, F
が Future<Output = Result<Svc, MkErr>>
を返す FnMut
(ただし MkErr: Into<Box<dyn StdError + Send + Sync>>
) の場合の MakeServiceFn<F>
について trait Service
を実装しています.
(この Service
とは tower_service::Service のことです. 詳しくはリンク先のドキュメントを参照してください.)
よく見比べると, 確かにこれらの型は上記 web_api.rs の make_service_fn
に渡されているクロージャの型と一致していますね. (型パラメータ Svc
の部分は _
とされていますが.)
そして Service::call
が f
の実行器3 となっており, f
が返す Future
を取り出せるようです.
service_fn
では service_fn
のほうはどうでしょうか?
定義箇所を読むと
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
where
F: FnMut(Request<R>) -> S,
S: Future,
{
ServiceFn {
f,
_req: PhantomData,
}
}
// Not exported from crate as this will likely be replaced with `impl Service`.
pub struct ServiceFn<F, R> {
f: F,
_req: PhantomData<fn(R)>,
}
impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: Payload,
Ret: Future<Output = Result<Response<ResBody>, E>>,
E: Into<Box<dyn StdError + Send + Sync>>,
ResBody: Payload,
{
type Response = crate::Response<ResBody>;
type Error = E;
type Future = Ret;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
(self.f)(req)
}
}
となっています.
F
が Request<ReqBody>
を受け取って Future<Output = Result<Response<ResBody>, E>>
を返すと指定されているあたりを除いては, なんだか make_service_fn
と似ていますね.
ここで出てくる f
として web_api.rs では
move |req| {
response_examples(req, client.to_owned())
}
というクロージャが渡されており, (当然ですが) 型も合います.
Server
Service をあれこれ作ったあとは, まず Server::bind
に &addr
が渡されています. ここで
let addr = "127.0.0.1:1337".parse().unwrap();
でした.
Server
の定義はこのようになっています:
#[pin_project]
pub struct Server<I, S, E = Exec> {
#[pin]
spawn_all: SpawnAll<I, S, E>,
}
Server::bind
の定義を見ると
impl Server<AddrIncoming, ()> {
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
panic!("error binding to {}: {}", addr, e);
});
Server::builder(incoming)
}
}
となっており, struct Builder
を返す Server::builder
メソッドが別で定義されているようです.
実際すぐ上に
/// A builder for a [`Server`](Server).
#[derive(Debug)]
pub struct Builder<I, E = Exec> {
incoming: I,
protocol: Http_<E>,
}
// ===== impl Server =====
impl<I> Server<I, ()> {
/// Starts a [`Builder`](Builder) with the provided incoming stream.
pub fn builder(incoming: I) -> Builder<I> {
Builder {
incoming,
protocol: Http_::new(),
}
}
}
が定義されています.
ここまで来るとおそらく Builder
には Server
を返す Builder::serve
メソッドが定義されているというパターンなのだろうと推測できますね:
impl<I, E> Builder<I, E> {
/* snip */
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Accept,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
let serve = self.protocol.serve(self.incoming, new_service);
let spawn_all = serve.spawn_all();
Server { spawn_all }
}
}
(こういう「状態遷移を型で表現する」話は The Book 17.3 章 にも記載されていました.)
ここで protocol
フィールドに生えている Http::serve
は引数の incoming
と new_service
から 新しい struct Serve
を返すメソッドであり,
#[must_use = "streams do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub(super) struct Serve<I, S, E = Exec> {
#[pin]
incoming: I,
make_service: S,
protocol: Http<E>,
}
impl<E> Http<E> {
pub(super) fn serve<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin,
S: MakeServiceRef<IO, Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as HttpService<Body>>::Future, Bd>,
{
Serve {
incoming,
make_service,
protocol: self.clone(),
}
}
}
Serve::spawn_all
は自分自身を新しい struct SpawnAll
のフィールドにして返すメソッドです:
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S, E> {
#[pin]
pub(super) serve: Serve<I, S, E>,
}
impl<I, S, E> Serve<I, S, E> {
/// Spawn all incoming connections onto the executor in `Http`.
pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
SpawnAll { serve: self }
}
}
こうしてみると Builder::serve
に渡された make_service
は Server
が作られるまでの間に
Server
-> SpawnAll
-> Serve
-> make_service
のように三重の struct に包まれることがわかります.
(それぞれが何らかの意義を持ったレイヤーなのだろうとは思いますがまだ実装を読み切れておらず...)
Future の実行
さて, web_api.rs では
let new_service = make_service_fn(/* snip */);
として MakeServiceFn
型の値を new_service
に束縛し,
let server = Server::bind(&addr).serve(new_service);
として Builder::serve
に渡し,
server.await?;
で実行していました.
ということはどこかで impl Future for Server
している箇所がありそうです:
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
type Output = crate::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.project().spawn_all.poll_watch(cx, &NoopWatcher)
}
}
Future::poll
の中で self.project()
なるメソッドを呼んでいますが, これは pin-project という crate が提供するメソッドです.
私は Pin
についてまだあまり詳しく理解できていないのですが, Pin された struct Pin<&mut Struct>
の内部のフィールドにアクセスしようと思ったときは Pin<&mut Field>
と &mut Field
のどちらも型として取ることが可能4 で, pin-project はそれらを簡便に記述できるマクロを提供する, ということのようです.
Server
の定義は
#[pin_project]
pub struct Server<I, S, E = Exec> {
#[pin]
spawn_all: SpawnAll<I, S, E>,
}
だったので, self.project().spawn_all
で Server
内部の spawn_all
フィールドにアクセスできるのですね.
poll_watch
impl<I, IO, IE, S, B, E> SpawnAll<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: Payload,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
pub(super) fn poll_watch<W>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
watcher: &W,
) -> Poll<crate::Result<()>>
where
E: NewSvcExec<IO, S::Future, S::Service, E, W>,
W: Watcher<IO, S::Service, E>,
{
let mut me = self.project();
loop {
if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
let fut = NewSvcTask::new(connecting, watcher.clone());
me.serve
.as_mut()
.project()
.protocol
.exec
.execute_new_svc(fut);
} else {
return Poll::Ready(Ok(()));
}
}
}
}
長い定義ですがメソッド本体は比較的シンプルで, loop
の中で Serve::poll_next_
の結果により次の非同期タスクを実行するかどうかを決めるものになっています.
ready!
は引数が Poll::Ready(v)
なら包まれている v
を取り出し, Poll::Pending
ならその場で Poll::Pending
を return
するように定義されたマクロです:
macro_rules! ready {
($e:expr) => {
match $e {
::std::task::Poll::Ready(v) => v,
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
}
};
}
poll_next_
の後ろについている ?
演算子は std::ops::Try
5 によって動作が上書きされているのですが, ここでの詳しい説明は割愛します.
poll_next_
impl<I, IO, IE, S, B, E> Serve<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IO: AsyncRead + AsyncWrite + Unpin,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: Payload,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
fn poll_next_(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
let me = self.project();
match ready!(me.make_service.poll_ready_ref(cx)) {
Ok(()) => (),
Err(e) => {
trace!("make_service closed");
return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
}
}
if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
let io = item.map_err(crate::Error::new_accept)?;
let new_fut = me.make_service.make_service_ref(&io);
Poll::Ready(Some(Ok(Connecting {
future: new_fut,
io: Some(io),
protocol: me.protocol.clone(),
})))
} else {
Poll::Ready(None)
}
}
}
ついに make_service
のメソッドを呼んでいる箇所が現れました.
ここまで書いていませんでしたが,
MakeServiceRef::poll_ready_ref
MakeServiceRef::make_service_ref
はそれぞれ
MakeServiceFn::poll_ready
MakeServiceFn::call
をラップしているものです6.
サーバにバインドされたアドレス incoming
が接続可能かどうか poll_accept
7 で非同期に検査し, 接続可能になったら make_service_ref
によって我々の response_examples
を包んだ Future
を取り出して, Connecting
という struct に包んで (さらに Poll::Ready(Some(Ok()))
で包んで) 返す, というのが poll_next_
の主要な役割です.
それにしても poll_next_
や poll_accept
(さらに poll_accept
の内部実装でも) といった Poll
を返すメソッドがすべて ready!
マクロの中にいるおかげで, 非同期タスクの連なり (?) のうちどこかがまだ Poll::Pending
を返すのなら, Server::poll
の返り値まで直通で Poll::Pending
が上がっていく仕組みになっているのもポイントですね.
アドレスに接続した後
ふたたび poll_watch
を振り返ってみると, Connecting
(がいろいろ包まれたもの) が poll_next_
から返ってきたら,
let fut = NewSvcTask::new(connecting, watcher.clone());
として NewSvcTask
なる型の値を fut
に束縛し,
me.serve
/* snip */
.execute_new_svc(fut);
と渡しています.
長くなってきたので詳細は省きますが, execute_new_svc
の実装をたどっていくと tokio::task::spawn(fut)
まで辿り着きます8. そして NewSvcTask::poll
や Connecting::poll
を経由して, Connecting
に包まれていた future
が poll
されます.
まとめ
そういうわけで,
-
make_service_fn
はサーバにバインドされたアドレスへの接続を非同期に待つためのService
を作るもの -
service_fn
は接続が確立してから HTTP リクエストの処理を非同期に行うService
を作るもの
という二段階の役割に別れていたのですね.
力尽きたのでこの辺で終わりにします. Rust 初心者ゆえ間違いも多々あるかと思いますので何かお気づきの方はご指摘くださるとありがたいです.
-
https://github.com/hyperium/hyper/blob/v0.13.0/Cargo.toml#L22-L37 ↩
-
本当に「実行器」と呼んでいいのか? ↩
-
https://doc.rust-lang.org/std/pin/#projections-and-structural-pinning ↩
-
https://github.com/hyperium/hyper/blob/v0.13.0/src/service/make.rs#L67-L92 ↩
-
https://github.com/hyperium/hyper/blob/v0.13.0/src/server/tcp.rs#L155 ↩
-
https://github.com/hyperium/hyper/blob/v0.13.0/src/common/exec.rs#L46 ↩