13
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rustその3Advent Calendar 2019

Day 17

hyper v0.13.0 の make_service_fn と service_fn とは何か

Last updated at Posted at 2019-12-16

これは Rustその3 Advent Calendar 2019 の 17 日目の記事です.

はじめに

hyper というライブラリがあります. これは「 高速 かつ 正しい HTTP の Rust 実装」1 を謳うライブラリです. GitHub のスター数は 2019/12/16 現在で 5600 を超える人気のあるライブラリです. Rust で HTTP サーバを作ろうと思ったときに hyper を選択される方も多いのではないでしょうか.

先日 12/11 には v0.13.0 がリリースされました. futures や tokio 等の外部クレートが async / .await に対応したバージョンへと更新されており2, いよいよ Rust の非同期処理の基盤が整ってきたという感じがしますね.

さて, HTTP サーバというと Web API サーバとしての書き方が気になりますから, さっそく examples/web_api.rs を見てみましょう:

examples/web_api.rs
/* snip */

type GenericError = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, GenericError>;

/* snip */

async fn response_examples(
    req: Request<Body>,
    client: Client<HttpConnector>,
) -> Result<Response<Body>> {
    /* snip */
}

#[tokio::main]
async fn main() -> Result<()> {
    pretty_env_logger::init();

    let addr = "127.0.0.1:1337".parse().unwrap();

    // Share a `Client` with all `Service`s
    let client = Client::new();

    let new_service = make_service_fn(move |_| {
        // Move a clone of `client` into the `service_fn`.
        let client = client.clone();
        async {
            Ok::<_, GenericError>(service_fn(move |req| {
                // Clone again to ensure that client outlives this closure.
                response_examples(req, client.to_owned())
            }))
        }
    });

    let server = Server::bind(&addr).serve(new_service);

    println!("Listening on http://{}", addr);

    server.await?;

    Ok(())
}

Request を受けてルーティングを行い Response を返す処理の本体は response_examples 関数です.

そして main 関数では make_service_fnasync ブロックを持つクロージャを渡し, そのブロックの中では今度は service_fnresponse_examples (を呼ぶクロージャ) を渡して (その結果を Ok で包んで) います.

私が Rust や HTTP サーバに不慣れなだけかもしれませんが, 最初このコードを読んだとき, なぜ response_examplesmake_service_fnservice_fn という二重のメソッドに渡しているのだろう, それぞれ何をしているのか, この若干回りくどく見える手続きを踏む必要があるのはなぜだろうか? と不思議に思いました.

なので実装を読んでみることにしました.

Service

順番に定義を読んでいきましょう.

make_service_fn

src/service/make.rs
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
where
    F: FnMut(&Target) -> Ret,
    Ret: Future,
{
    MakeServiceFn { f }
}

// Not exported from crate as this will likely be replaced with `impl Service`.
pub struct MakeServiceFn<F> {
    f: F,
}

impl<'t, F, Ret, Target, Svc, MkErr> Service<&'t Target> for MakeServiceFn<F>
where
    F: FnMut(&Target) -> Ret,
    Ret: Future<Output = Result<Svc, MkErr>>,
    MkErr: Into<Box<dyn StdError + Send + Sync>>,
{
    type Error = MkErr;
    type Response = Svc;
    type Future = Ret;

    fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, target: &'t Target) -> Self::Future {
        (self.f)(target)
    }
}

make_service_fn の引数 fFuture を返す FnMut となっています.

いまのところ struct MakeServiceFn<F> はとりあえず F 型のフィールド f を包むだけですね.

impl Service for MakeServiceFn の部分では, FFuture<Output = Result<Svc, MkErr>> を返す FnMut (ただし MkErr: Into<Box<dyn StdError + Send + Sync>>) の場合の MakeServiceFn<F> について trait Service を実装しています.

(この Service とは tower_service::Service のことです. 詳しくはリンク先のドキュメントを参照してください.)

よく見比べると, 確かにこれらの型は上記 web_api.rs の make_service_fn に渡されているクロージャの型と一致していますね. (型パラメータ Svc の部分は _ とされていますが.)

そして Service::callf の実行器3 となっており, f が返す Future を取り出せるようです.

service_fn

では service_fn のほうはどうでしょうか?

定義箇所を読むと

src/service/util.rs
pub fn service_fn<F, R, S>(f: F) -> ServiceFn<F, R>
where
    F: FnMut(Request<R>) -> S,
    S: Future,
{
    ServiceFn {
        f,
        _req: PhantomData,
    }
}

// Not exported from crate as this will likely be replaced with `impl Service`.
pub struct ServiceFn<F, R> {
    f: F,
    _req: PhantomData<fn(R)>,
}

impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>>
    for ServiceFn<F, ReqBody>
where
    F: FnMut(Request<ReqBody>) -> Ret,
    ReqBody: Payload,
    Ret: Future<Output = Result<Response<ResBody>, E>>,
    E: Into<Box<dyn StdError + Send + Sync>>,
    ResBody: Payload,
{
    type Response = crate::Response<ResBody>;
    type Error = E;
    type Future = Ret;

    fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
        (self.f)(req)
    }
}

となっています.

FRequest<ReqBody> を受け取って Future<Output = Result<Response<ResBody>, E>> を返すと指定されているあたりを除いては, なんだか make_service_fn と似ていますね.

ここで出てくる f として web_api.rs では

move |req| {
    response_examples(req, client.to_owned())
}

というクロージャが渡されており, (当然ですが) 型も合います.

Server

Service をあれこれ作ったあとは, まず Server::bind&addr が渡されています. ここで

let addr = "127.0.0.1:1337".parse().unwrap();

でした.

Server の定義はこのようになっています:

src/server/mod.rs
#[pin_project]
pub struct Server<I, S, E = Exec> {
    #[pin]
    spawn_all: SpawnAll<I, S, E>,
}

Server::bind の定義を見ると

src/server/mod.rs
impl Server<AddrIncoming, ()> {
    pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
        let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
            panic!("error binding to {}: {}", addr, e);
        });
        Server::builder(incoming)
    }
}

となっており, struct Builder を返す Server::builder メソッドが別で定義されているようです.

実際すぐ上に

src/server/mod.rs
/// A builder for a [`Server`](Server).
#[derive(Debug)]
pub struct Builder<I, E = Exec> {
    incoming: I,
    protocol: Http_<E>,
}

// ===== impl Server =====

impl<I> Server<I, ()> {
    /// Starts a [`Builder`](Builder) with the provided incoming stream.
    pub fn builder(incoming: I) -> Builder<I> {
        Builder {
            incoming,
            protocol: Http_::new(),
        }
    }
}

が定義されています.

ここまで来るとおそらく Builder には Server を返す Builder::serve メソッドが定義されているというパターンなのだろうと推測できますね:

src/server/mod.rs
impl<I, E> Builder<I, E> {

    /* snip */

    pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
    where
        I: Accept,
        I::Error: Into<Box<dyn StdError + Send + Sync>>,
        I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
        S: MakeServiceRef<I::Conn, Body, ResBody = B>,
        S::Error: Into<Box<dyn StdError + Send + Sync>>,
        B: Payload,
        E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
        E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
    {
        let serve = self.protocol.serve(self.incoming, new_service);
        let spawn_all = serve.spawn_all();
        Server { spawn_all }
    }
}

(こういう「状態遷移を型で表現する」話は The Book 17.3 章 にも記載されていました.)

ここで protocol フィールドに生えている Http::serve は引数の incomingnew_service から 新しい struct Serve を返すメソッドであり,

src/server/conn.rs
#[must_use = "streams do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub(super) struct Serve<I, S, E = Exec> {
    #[pin]
    incoming: I,
    make_service: S,
    protocol: Http<E>,
}

src/server/conn.rs
impl<E> Http<E> {
    pub(super) fn serve<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
    where
        I: Accept<Conn = IO, Error = IE>,
        IE: Into<Box<dyn StdError + Send + Sync>>,
        IO: AsyncRead + AsyncWrite + Unpin,
        S: MakeServiceRef<IO, Body, ResBody = Bd>,
        S::Error: Into<Box<dyn StdError + Send + Sync>>,
        Bd: Payload,
        E: H2Exec<<S::Service as HttpService<Body>>::Future, Bd>,
    {
        Serve {
            incoming,
            make_service,
            protocol: self.clone(),
        }
    }
}

Serve::spawn_all は自分自身を新しい struct SpawnAll のフィールドにして返すメソッドです:

src/server/conn.rs
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S, E> {
    #[pin]
    pub(super) serve: Serve<I, S, E>,
}

src/server/conn.rs
impl<I, S, E> Serve<I, S, E> {
    /// Spawn all incoming connections onto the executor in `Http`.
    pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
        SpawnAll { serve: self }
    }
}

こうしてみると Builder::serve に渡された make_serviceServer が作られるまでの間に

Server -> SpawnAll -> Serve -> make_service

のように三重の struct に包まれることがわかります.

(それぞれが何らかの意義を持ったレイヤーなのだろうとは思いますがまだ実装を読み切れておらず...)

Future の実行

さて, web_api.rs では

let new_service = make_service_fn(/* snip */);

として MakeServiceFn 型の値を new_service に束縛し,

let server = Server::bind(&addr).serve(new_service);

として Builder::serve に渡し,

server.await?;

で実行していました.

ということはどこかで impl Future for Server している箇所がありそうです:

src/server/mod.rs
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
where
    I: Accept<Conn = IO, Error = IE>,
    IE: Into<Box<dyn StdError + Send + Sync>>,
    IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<IO, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: Payload,
    E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
    E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
    type Output = crate::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        self.project().spawn_all.poll_watch(cx, &NoopWatcher)
    }
}

Future::poll の中で self.project() なるメソッドを呼んでいますが, これは pin-project という crate が提供するメソッドです.

私は Pin についてまだあまり詳しく理解できていないのですが, Pin された struct Pin<&mut Struct> の内部のフィールドにアクセスしようと思ったときは Pin<&mut Field>&mut Field のどちらも型として取ることが可能4 で, pin-project はそれらを簡便に記述できるマクロを提供する, ということのようです.

Server の定義は

src/server/mod.rs
#[pin_project]
pub struct Server<I, S, E = Exec> {
    #[pin]
    spawn_all: SpawnAll<I, S, E>,
}

だったので, self.project().spawn_allServer 内部の spawn_all フィールドにアクセスできるのですね.

poll_watch

src/server/conn.rs
impl<I, IO, IE, S, B, E> SpawnAll<I, S, E>
where
    I: Accept<Conn = IO, Error = IE>,
    IE: Into<Box<dyn StdError + Send + Sync>>,
    IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<IO, Body, ResBody = B>,
    B: Payload,
    E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
    pub(super) fn poll_watch<W>(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        watcher: &W,
    ) -> Poll<crate::Result<()>>
    where
        E: NewSvcExec<IO, S::Future, S::Service, E, W>,
        W: Watcher<IO, S::Service, E>,
    {
        let mut me = self.project();
        loop {
            if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
                let fut = NewSvcTask::new(connecting, watcher.clone());
                me.serve
                    .as_mut()
                    .project()
                    .protocol
                    .exec
                    .execute_new_svc(fut);
            } else {
                return Poll::Ready(Ok(()));
            }
        }
    }
}

長い定義ですがメソッド本体は比較的シンプルで, loop の中で Serve::poll_next_ の結果により次の非同期タスクを実行するかどうかを決めるものになっています.

ready! は引数が Poll::Ready(v) なら包まれている v を取り出し, Poll::Pending ならその場で Poll::Pendingreturn するように定義されたマクロです:

src/common/mod.rs
macro_rules! ready {
    ($e:expr) => {
        match $e {
            ::std::task::Poll::Ready(v) => v,
            ::std::task::Poll::Pending => return ::std::task::Poll::Pending,
        }
    };
}

poll_next_ の後ろについている ? 演算子は std::ops::Try 5 によって動作が上書きされているのですが, ここでの詳しい説明は割愛します.

poll_next_

src/server/conn.rs
impl<I, IO, IE, S, B, E> Serve<I, S, E>
where
    I: Accept<Conn = IO, Error = IE>,
    IO: AsyncRead + AsyncWrite + Unpin,
    IE: Into<Box<dyn StdError + Send + Sync>>,
    S: MakeServiceRef<IO, Body, ResBody = B>,
    B: Payload,
    E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
    fn poll_next_(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
        let me = self.project();
        match ready!(me.make_service.poll_ready_ref(cx)) {
            Ok(()) => (),
            Err(e) => {
                trace!("make_service closed");
                return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
            }
        }

        if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
            let io = item.map_err(crate::Error::new_accept)?;
            let new_fut = me.make_service.make_service_ref(&io);
            Poll::Ready(Some(Ok(Connecting {
                future: new_fut,
                io: Some(io),
                protocol: me.protocol.clone(),
            })))
        } else {
            Poll::Ready(None)
        }
    }
}

ついに make_service のメソッドを呼んでいる箇所が現れました.

ここまで書いていませんでしたが,

  • MakeServiceRef::poll_ready_ref
  • MakeServiceRef::make_service_ref

はそれぞれ

  • MakeServiceFn::poll_ready
  • MakeServiceFn::call

をラップしているものです6.

サーバにバインドされたアドレス incoming が接続可能かどうか poll_accept7 で非同期に検査し, 接続可能になったら make_service_ref によって我々の response_examples を包んだ Future を取り出して, Connecting という struct に包んで (さらに Poll::Ready(Some(Ok())) で包んで) 返す, というのが poll_next_ の主要な役割です.

それにしても poll_next_poll_accept (さらに poll_accept の内部実装でも) といった Poll を返すメソッドがすべて ready! マクロの中にいるおかげで, 非同期タスクの連なり (?) のうちどこかがまだ Poll::Pending を返すのなら, Server::poll の返り値まで直通で Poll::Pending が上がっていく仕組みになっているのもポイントですね.

アドレスに接続した後

ふたたび poll_watch を振り返ってみると, Connecting (がいろいろ包まれたもの) が poll_next_ から返ってきたら,

let fut = NewSvcTask::new(connecting, watcher.clone());

として NewSvcTask なる型の値を fut に束縛し,

me.serve
    /* snip */
    .execute_new_svc(fut);

と渡しています.

長くなってきたので詳細は省きますが, execute_new_svc の実装をたどっていくと tokio::task::spawn(fut) まで辿り着きます8. そして NewSvcTask::pollConnecting::poll を経由して, Connecting に包まれていた futurepoll されます.

まとめ

そういうわけで,

  • make_service_fn はサーバにバインドされたアドレスへの接続を非同期に待つための Service を作るもの
  • service_fn は接続が確立してから HTTP リクエストの処理を非同期に行う Service を作るもの

という二段階の役割に別れていたのですね.

力尽きたのでこの辺で終わりにします. Rust 初心者ゆえ間違いも多々あるかと思いますので何かお気づきの方はご指摘くださるとありがたいです.

  1. https://github.com/hyperium/hyper/blob/v0.13.0/README.md

  2. https://github.com/hyperium/hyper/blob/v0.13.0/Cargo.toml#L22-L37

  3. 本当に「実行器」と呼んでいいのか?

  4. https://doc.rust-lang.org/std/pin/#projections-and-structural-pinning

  5. https://doc.rust-lang.org/std/ops/trait.Try.html

  6. https://github.com/hyperium/hyper/blob/v0.13.0/src/service/make.rs#L67-L92

  7. https://github.com/hyperium/hyper/blob/v0.13.0/src/server/tcp.rs#L155

  8. https://github.com/hyperium/hyper/blob/v0.13.0/src/common/exec.rs#L46

13
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?