25
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RustAdvent Calendar 2018

Day 6

Rustで書かれたVMM firecrackerのソースコードを読もう!①~起動編~

Posted at

はじめに

本記事は、Rust Advent Calendar 2018の6日目として、急遽、書かれました(埋まってないの悲しいしね!)。

起動編、としていますが、続きがあるかどうかは、わかりません。
ブログで書いているものをまとめた内容になります。

また、ソースコードリーディングの途中経過であるため、誤読している可能性がありますので、ご了承下さい。

firecrackerとは?

最高の餌です。
サーバレスコンピューティングのための安全、高速なmicroVMです。
Rustで書かれていて、REST APIが生えていて、とにかくモダンなVMMです。

firecracker
firecracker github

公式のドキュメントにあることは、公式ドキュメントに任せることにして、ソースコードを読みましょう!ソースコードを!

とは言え少し前提知識を

firecrackerは、KVMを使ってVMを作成します。VMMにつき1つのゲストOSだけを持つタイプのVMMのようです。つまり、複数のゲストを立ち上げる場合には、個別にVMMごと起動する、という方式です。Noahを思い出しました。聞いててよかったTuring Complete FM

ただし、firecrackerのバイナリには、リソースを隔離するための仕組みはないようです。jailerがcgroupsなどを使って、リソース隔離する責務を負っています。そのため、firecracker単体でも起動はできますが、基本は、jailerで隔離した状態でfirecrackerを立ち上げる、というのが想定する使い方のようです。

1つのインスタンスは、3つのスレッドから構成されます。API Server, VMM, vCPU, です。CPUの設定を複数個にすれば、vCPUのスレッドを複数個持つことができます。
ということで、API ServerがHTTPリクエストとしてVMMの制御コマンドを受け付けて、VMMがKVMを制御し、KVMから上がってきたイベントをVMMのハンドラが処理する、という流れで作られていそうです。

ちらっと覗いたとき、deviceは、ネットワークデバイス、ブロックデバイス、シリアル、くらいしかなかったです。

ソースコードは、起動周りを見て、HTTPリクエスト待ちっぽいところにたどり着いたら、リクエストハンドラ実装を見ていく、という方針でソースコードリーディングを進めています。

Cargo.toml

まずは、Cargoの設定ファイルから見てみましょう。

.cargo/config
[build]
target = "x86_64-unknown-linux-musl"

musl使ってますね。C言語ライブラリに依存しないバイナリにしているようです。ライブラリとの依存を持ちたくないでしょうし、妥当な感じですね。

Cargo.tomlは思ったより見るものなかったです。panic strategyがabortなことくらいでしょうか。

Cargo.toml
[profile.release]
lto = true
panic = "abort"

src

では、さっそくsrcから見ていきましょう。
jailerもいますね。

$ tree src/
src/
├── bin
│   └── jailer.rs
└── main.rs

logger

main.rsに移ります。まずは、ロガーの初期化をしています。

src/main.rs
fn main() {
    LOGGER
        .init(&"", None, None)
        .expect("Failed to register logger");
...

LOGGERは、logger crateでlazy_staticを使って、定義しています。典型的なグローバルオブジェクトの初期化っぽいです。
ロガーは、複数スレッドからアクセスされるため、排他制御を行うのも当然ですね。

logger/src/lib.rs
lazy_static! {
    static ref _LOGGER_INNER: Logger = Logger::new();
}

lazy_static! {
    /// Static instance used for handling human-readable logs.
    ///
    pub static ref LOGGER: &'static Logger = {
        set_logger(_LOGGER_INNER.deref()).unwrap();
        _LOGGER_INNER.deref()
    };
}

set_loggerlog crateの関数です。

pub fn set_logger(logger: &'static Log) -> Result<(), SetLoggerError> {
    set_logger_inner(|| logger)
}

fn set_logger_inner<F>(make_logger: F) -> Result<(), SetLoggerError>
where
    F: FnOnce() -> &'static Log,
{
    unsafe {
        match STATE.compare_and_swap(UNINITIALIZED, INITIALIZING, Ordering::SeqCst) {
            UNINITIALIZED => {
                // make_logger()はloggerのオブジェクトが返るだけ。`|| logger`
                LOGGER = make_logger();
                STATE.store(INITIALIZED, Ordering::SeqCst);
                Ok(())
            }
            INITIALIZING => {
                while STATE.load(Ordering::SeqCst) == INITIALIZING {}
                Err(SetLoggerError(()))
            }
            _ => Err(SetLoggerError(())),
        }
    }
}

set_logger_innerでは、正常ケースにおいて(UNINITIALIZEDのアーム)、compare_and_swapを使って、アトミックにLOGGERの状態を初期化済みに変更します。

LOGGERのオブジェクトを作った後、initで初期化しています。この時の引数は、instance id, log_pipe, metrics_pipeです。

/src/main.rs
fn main() {
    LOGGER
        .init(&"", None, None)
        .expect("Failed to register logger");

instance idは置いておいて、2つのpipeはログをどこに出力するか、を切り替えるのに利用します。Noneの場合は、stdout/stderrに出力されます。Someでファイルパスを渡すと、そのファイルにログを残すようにします。
これは後ほど、テストで出るため、覚えておきましょう!

次に、SIGSYSのsignal handlerを登録しています。中身を追ってみると、本当にSIGSYSのhandlerを登録しているだけでした。

    // If the signal handler can't be set, it's OK to panic.
    seccomp::setup_sigsys_handler().expect("Failed to register signal handler");

panic hook

次は、panic発生時のフックを登録しています。abortで死ぬ前に、panic発生時のbacktraceがログに残るようになっています。

src/main.rs
    // Start firecracker by setting up a panic hook, which will be called before
    // terminating as we're building with panic = "abort".
    // It's worth noting that the abort is caused by sending a SIG_ABORT signal to the process.
    panic::set_hook(Box::new(move |info| {
        // We're currently using the closure parameter, which is a &PanicInfo, for printing the
        // origin of the panic, including the payload passed to panic! and the source code location
        // from which the panic originated.
        error!("Panic occurred: {:?}", info);
        METRICS.vmm.panic_count.inc();

        let bt = Backtrace::new();
        error!("{:?}", bt);

        // Log the metrics before aborting.
        if let Err(e) = LOGGER.log_metrics() {
            error!("Failed to log metrics on abort. {}:?", e);
        }
    }));

このあたりはベアメタルプログラミングで良く書くのでおなじみですね。
panic発生時に渡されるPanicInfoは、プログラムの何行目でpanicになったか、といった情報を含んでいます。backtrace crateのBacktraceインスタンスを作ると、debug出力でbacktraceが取れるみたいです。

この機能に関しては、同ファイル内にユニットテストが書かれています。

まずは、ヘルパー関数です。

#[cfg(test)]
mod tests {
...
    // テストのヘルパー関数
    // log_pathのファイルに、expectationsの文字列3つが含まれているかどうかをテストする
    fn validate_backtrace(
        log_path: &str,
        expectations: &[(&'static str, &'static str, &'static str)],
    ) {
        let f = File::open(log_path).unwrap();
        let reader = BufReader::new(f);
        let mut pass = false;
        let mut expectation_iter = expectations.iter();
        let mut expected_words = expectation_iter.next().unwrap();

        for ln_res in reader.lines() {
            let line = ln_res.unwrap();
            if !(line.contains(expected_words.0)
                && line.contains(expected_words.1)
                && line.contains(expected_words.2))
            {
                continue;
            }
            if let Some(w) = expectation_iter.next() {
                expected_words = w;
                continue;
            }
            pass = true;
            break;
        }
        assert!(pass);
    }

続いて、main関数のテストです。ここでは、panicが発生したときに、backtraceが出力されることをテストしています。

    #[test]
    fn test_main() {
        const FIRECRACKER_INIT_TIMEOUT_MILLIS: u64 = 100;

        // There is no reason to run this test if the default socket path exists.
        assert!(!Path::new(DEFAULT_API_SOCK_PATH).exists());

         // ログを保存するための一時ファイルを作成します。
        let log_file_temp =
            NamedTempFile::new().expect("Failed to create temporary output logging file.");
        let metrics_file_temp =
            NamedTempFile::new().expect("Failed to create temporary metrics logging file.");
        let log_file = String::from(log_file_temp.path().to_path_buf().to_str().unwrap());

        // Start Firecracker in a separate thread
        thread::spawn(|| {
            main();
        });

        // Wait around for a bit, so Firecracker has time to initialize and create the
        // API socket.
        thread::sleep(Duration::from_millis(FIRECRACKER_INIT_TIMEOUT_MILLIS));

        // If Firecracker hasn't finished initializing yet, something is really wrong!
        assert!(Path::new(DEFAULT_API_SOCK_PATH).exists());

        // init()のpipeにSomeを与えているので、指定したログファイルにログを出力します。
        LOGGER
            .init(
                "TEST-ID",
                Some(log_file_temp.path().to_str().unwrap().to_string()),
                Some(metrics_file_temp.path().to_str().unwrap().to_string()),
            ).expect("Could not initialize logger.");

        // panicを起こします。panic発生時、main関数で設定したhook関数が呼ばれ、backtraceがログに出力されます。
        let _ = panic::catch_unwind(|| {
            panic!("Oh, noes!");
        });

        // 期待するbacktraceがログファイルに出力されていることをテストします。
        validate_backtrace(
            log_file.as_str(),
            &[
                // Lines containing these words should have appeared in the log, in this order
                ("ERROR", "main.rs", "Panic occurred"),
                ("ERROR", "main.rs", "stack backtrace:"),
                ("0:", "0x", "backtrace::"),
            ],
        );

        // Clean up
        fs::remove_file(DEFAULT_API_SOCK_PATH).expect("failure in removing socket file");
    }

先ほどのmain関数の中で、panic発生時のhookを仕掛けていました。hook関数の中ではbacktraceをログ出力しており、その機能が想定通り動作していることをテストしています。

コマンドラインオプションの処理

コマンドライン引数の処理をしています。特に面白みはないので、飛ばします。

    let cmd_arguments = App::new("firecracker")
        .version(crate_version!())
        .author(crate_authors!())
        .about("Launch a microvm.")
        .arg(
            Arg::with_name("api_sock")
                .long("api-sock")
                .help("Path to unix domain socket used by the API")
                .default_value(DEFAULT_API_SOCK_PATH)
                .takes_value(true),
        ).arg(
            Arg::with_name("context")
                .long("context")
                .help("Additional parameters sent to Firecracker.")
                .takes_value(true),
        ).get_matches();
...

API Serverオブジェクト作成

ここから、API ServerとVMMを作る準備が始まります。まずは、共有リソースを作って、API Serverを作っています。

    let shared_info = Arc::new(RwLock::new(InstanceInfo {
        state: InstanceState::Uninitialized,
        id: instance_id,
    }));
    let mmds_info = MMDS.clone();
    let (to_vmm, from_api) = channel();
    let server = ApiServer::new(mmds_info, shared_info.clone(), to_vmm).unwrap();

shared_infoは、API ServerとVMMとの間で共有される情報のようです。
MMDSは、microVM metadata service だそうです。今のところ、意味するところはよくわかりません。追々判明するでしょう。

channelで非同期のSenderとReceiverを作成しています。API ServerとVMMを繋ぐチャネルですね。

ここまでで作成した、VMMとの情報共有リソースを渡して、ApiServerを初期化しています。

VMMオブジェクト作成

次は、VMMです。API Serverからイベントを受ける取るために使うであろうファイルディスクリプタと、jailerで起動している場合に受け取れるらしいKVMのファイルディスクリプタを取得しています。

    let api_event_fd = server
        .get_event_fd_clone()
        .expect("Cannot clone API eventFD.");

    let kvm_fd = if is_jailed {
        Some(jailer::KVM_FD)
    } else {
        None
    };

    let _vmm_thread_handle =
        vmm::start_vmm_thread(shared_info, api_event_fd, from_api, seccomp_level, kvm_fd);

vmm/src/lib.rsでは、Vmmのオブジェクトを作成して、threadを生成します。

vmm/src/lib.rs
pub fn start_vmm_thread(
...
) -> thread::JoinHandle<()> {
    thread::Builder::new()
        .name("fc_vmm".to_string())
        .spawn(move || {
            // If this fails, consider it fatal. Use expect().
            let mut vmm = Vmm::new(
...
            ).expect("Cannot create VMM.");
            match vmm.run_control() {
                Ok(()) => vmm.stop(0),
                Err(_) => vmm.stop(1),
            }
        }).expect("VMM thread spawn failed.")
}

Vmmは巨大な構造体です。Vmm::new()では、この巨大な構造体を初期化する処理を行います。その中には、VMの作成も含まれています。

vmm/src/lib.rs
struct Vmm {
    kvm: KvmContext,

    vm_config: VmConfig,
    shared_info: Arc<RwLock<InstanceInfo>>,

    // guest VM core resources
    guest_memory: Option<GuestMemory>,
    kernel_config: Option<KernelConfig>,
    kill_signaled: Option<Arc<AtomicBool>>,
    vcpu_handles: Option<Vec<thread::JoinHandle<()>>>,
    exit_evt: Option<EpollEvent<EventFd>>,
    vm: Vm,

    // guest VM devices
    mmio_device_manager: Option<MMIODeviceManager>,
    legacy_device_manager: LegacyDeviceManager,
    drive_handler_id_map: HashMap<String, usize>,

    // If there is a Root Block Device, this should be added as the first element of the list
    // This is necessary because we want the root to always be mounted on /dev/vda
    block_device_configs: BlockDeviceConfigs,
    network_interface_configs: NetworkInterfaceConfigs,
    #[cfg(feature = "vsock")]
    vsock_device_configs: VsockDeviceConfigs,

    epoll_context: EpollContext,

    // api resources
    api_event: EpollEvent<EventFd>,
    from_api: Receiver<Box<VmmAction>>,

    write_metrics_event: EpollEvent<TimerFd>,

    // The level of seccomp filtering used. Seccomp filters are loaded before executing guest code.
    // See `seccomp::SeccompLevel` for more information about seccomp levels.
    seccomp_level: u32,
}

Vmmの初期化に必要な要素を準備しています。まずは、epollでファイルディスクリプタのイベント監視を追加しています。

vmm/src/lib.rs
impl Vmm {
    fn new(
        api_shared_info: Arc<RwLock<InstanceInfo>>,
        api_event_fd: EventFd,
        from_api: Receiver<Box<VmmAction>>,
        seccomp_level: u32,
        kvm_fd: Option<RawFd>,
    ) -> Result<Self> {
        // epollでイベント発生を監視する
        let mut epoll_context = EpollContext::new()?;
        // If this fails, it's fatal; using expect() to crash.
        let api_event = epoll_context
            .add_event(api_event_fd, EpollDispatch::VmmActionRequest)
            .expect("Cannot add API eventfd to epoll.");

        let write_metrics_event = epoll_context
            .add_event(
                // non-blocking & close on exec
                TimerFd::new_custom(ClockId::Monotonic, true, true).map_err(Error::TimerFd)?,
                EpollDispatch::WriteMetrics,
            ).expect("Cannot add write metrics TimerFd to epoll.");

api_event_fdは、API Serverからのイベント通知のためのファイルディスクリプタです。監視するイベントは、EpollDispatch::VmmActionRequestとあるので、API ServerからVMMへコマンドが送られるのだと推測できます。
main関数内で、API Serverのオブジェクトからファイルディスクリプタをcloneしていました。

src/main.rs main
    let api_event_fd = server
        .get_event_fd_clone()
        .expect("Cannot clone API eventFD.");

次にVMインスタンスを生成します。kvm_fdはjailerで隔離されていない場合、/dev/kvmになります。

        let block_device_configs = BlockDeviceConfigs::new();
        let kvm = KvmContext::new(kvm_fd)?;
        let vm = Vm::new(kvm.fd()).map_err(Error::Vm)?;

Vm::new()では、kvmのVMインスタンス作成をKVMに依頼します。とりあえず、ゲストOSのメモリはNoneで作成しています。

vmm/src/vstate.rs
impl Vm {
    /// Constructs a new `Vm` using the given `Kvm` instance.
    pub fn new(kvm: &Kvm) -> Result<Self> {
        //create fd for interacting with kvm-vm specific functions
        let vm_fd = kvm.create_vm().map_err(Error::VmFd)?;

        Ok(Vm {
            fd: vm_fd,
            guest_mem: None,
        })
    }

Kvm.create_vm()を見ていきます。

kvm/src/lib.rs
pub struct Kvm {
    kvm: File,
}

impl Kvm {
...
    /// Creates a VM fd using the KVM fd (KVM_CREATE_VM).
    /// A call to this function will also initialize the supported cpuid (KVM_GET_SUPPORTED_CPUID)
    /// and the size of the vcpu mmap area (KVM_GET_VCPU_MMAP_SIZE).
    pub fn create_vm(&self) -> Result<VmFd> {
        // Safe because we know kvm is a real kvm fd as this module is the only one that can make
        // Kvm objects.
        let ret = unsafe { ioctl(&self.kvm, KVM_CREATE_VM()) };
        if ret >= 0 {
            // Safe because we verify the value of ret and we are the owners of the fd.
            let vm_file = unsafe { File::from_raw_fd(ret) };
            let run_mmap_size = self.get_vcpu_mmap_size()?;
            let kvm_cpuid: CpuId = self.get_supported_cpuid(MAX_KVM_CPUID_ENTRIES)?;
            Ok(VmFd {
                vm: vm_file,
                supported_cpuid: kvm_cpuid,
                run_size: run_mmap_size,
            })
        } else {
            errno_result()
        }
    }

注目するのは let ret = unsafe { ioctl(&self.kvm, KVM_CREATE_VM()) };の部分で、普通に/dev/kvmをioctlで叩いてVMを作成しています。
作成したVMのファイルでVmFdを初期化し、関数の戻り値としています。

ここまでをまとめると、Vmm::new()では、API Serverからイベント通知を受け取れるためのデータと、/dev/kvmを叩いて生成したVMインスタンスから、Vmmオブジェクトを作成しようとしています。
最終的に、Vmmは次のように初期値を与えられて作成されます。

Vmm::new()

vmm/src/lib.rs
        Ok(Vmm {
            kvm,
            vm_config: VmConfig::default(),
            shared_info: api_shared_info,
            guest_memory: None,
            kernel_config: None,
            kill_signaled: None,
            vcpu_handles: None,
            exit_evt: None,
            vm,
            mmio_device_manager: None,
            legacy_device_manager: LegacyDeviceManager::new().map_err(Error::CreateLegacyDevice)?,
            block_device_configs,
            drive_handler_id_map: HashMap::new(),
            network_interface_configs: NetworkInterfaceConfigs::new(),
            #[cfg(feature = "vsock")]
            vsock_device_configs: VsockDeviceConfigs::new(),
            epoll_context,
            api_event,
            from_api,
            write_metrics_event,
            seccomp_level,
        })

少し拾っておくと、vm_configはデフォルト値が設定されています。デフォルト値は次の通りです。

vmm/src/vmm_config/machine_config.rs
impl Default for VmConfig {
    fn default() -> Self {
        VmConfig {
            vcpu_count: Some(1),
            mem_size_mib: Some(128),
            ht_enabled: Some(false),
            cpu_template: None,
        }
    }
}

CPU1コア、メモリサイズ128MB、(多分)hyperthreadingをdisable、がデフォルト設定になります。

もう1点、legacy_device_managerなるメンバーがいます。LegacyDeviceManagerは次のように定義されており、IOバス上のUARTとi8042(PS/2コントローラでしたっけ?)を管理しているようです。

vmm/src/device_manager/legacy.rs
/// The `LegacyDeviceManager` is a wrapper that is used for registering legacy devices
/// on an I/O Bus. It currently manages the uart and i8042 devices.
/// The `LegacyDeviceManger` should be initialized only by using the constructor.
pub struct LegacyDeviceManager {
    pub io_bus: devices::Bus,
    pub stdio_serial: Arc<Mutex<devices::legacy::Serial>>,
    pub i8042: Arc<Mutex<devices::legacy::I8042Device>>,

    pub com_evt_1_3: EventFd,
    pub com_evt_2_4: EventFd,
    pub stdin_handle: io::Stdin,
}

このlegacy_device_managerは、KVMでイベントが発生し、VMMに制御が戻ってきたときのハンドラで利用することが予想できます。

API Server起動

とうとう初期化大詰めです。API Serverのsocketをbindして、API Serverを起動します。jailerで隔離されている場合は、Unix domain socketのファイルディスクリプタを受け取るようです。そうでない場合、パスからUnix domain socketを取得します。

    let uds_path_or_fd = if is_jailed {
        UnixDomainSocket::Fd(jailer::LISTENER_FD)
    } else {
        UnixDomainSocket::Path(bind_path)
    };

    match server.bind_and_run(uds_path_or_fd, start_time_us, start_time_cpu_us) {
        Ok(_) => (),
        Err(Error::Io(inner)) => match inner.kind() {
            // エラー処理
        },
        Err(Error::Eventfd(inner)) => panic!(
            // エラー処理
        ),
    }

なぜ、Unix domain socketを用意しているかというと、Unix domain socketでHTTPリクエストをlistenするようです(そんなことできるの知らなかった)。
DockerでもdockerdのAPIは、Unix domain socketを介したREST APIになっているようです。
firecrackerも本来はjailerで隔離されるので、Unix domain socketで外の世界とやり取りする、のかもしれません。もう少し周りの実装を解析しないとなんとも言えないですね。

bind_and_runはエラーが発生するまで実行を続ける関数です。Coreはtokioのイベントループみたいですね。

    // TODO: does tokio_uds also support abstract domain sockets?
    pub fn bind_and_run<P: AsRef<Path>>(
...
    ) -> Result<()> {
        let mut core = Core::new().map_err(Error::Io)?;
        let handle = Rc::new(core.handle());
...
        // This runs forever, unless an error is returned somewhere within f (but nothing happens
        // for errors which might arise inside the connections we spawn from f, unless we explicitly
        // do something in their future chain). When this returns, ongoing connections will be
        // interrupted, and other futures will not complete, as the event loop stops working.
        core.run(f)
    }

tokioのCoreは、tokio::reactorで実装されているEvent loopです。
少し読んでみようかな、と下記ソースに飛びましたが、諦めてスゴスゴと帰ってきました。

tokio/src/reactor/mod.rs
impl Core {
    /// Creates a new event loop, returning any error that happened during the
    /// creation.
    // わからん!
    pub fn new() -> io::Result<Core> {
        let io = try!(mio::Poll::new());
        let future_pair = mio::Registration::new2();
        try!(io.register(&future_pair.0,
                         TOKEN_FUTURE,
                         mio::Ready::readable(),
                         mio::PollOpt::level()));
        let (tx, rx) = mpsc::unbounded();
        let channel_pair = mio::Registration::new2();
        try!(io.register(&channel_pair.0,
                         TOKEN_MESSAGES,
                         mio::Ready::readable(),
                         mio::PollOpt::level()));
        let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
        rx_readiness.notify(0);

        Ok(Core {
...

ただ、CoreがEvent loopであることがわかれば、後は、イベント発生時のハンドラを登録して、イベント待ちになるだけ、のはずです。続きを見ると、unix domain socketのlistenerを作成しています。

    pub fn bind_and_run<P: AsRef<Path>>(
...
        // `path_or_fd`はunix domain socketのパスまたはファイルディスクリプタです
        let listener = match path_or_fd {
            UnixDomainSocket::Path(path) => UnixListener::bind(path, &handle).map_err(Error::Io)?,
            UnixDomainSocket::Fd(fd) => {
                // Safe because we assume fd is a valid file descriptor number, associated with a
                // previously bound UnixListener.
                UnixListener::from_listener(
                    unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) },
                    &handle,
                ).map_err(Error::Io)?
            }
        };

次に、メトリクスに実行開始時間を出力しています。一瞬、目に馴染まない書き方されていますが、METRICSはlazy_staticで初期化されるグローバルオブジェクトで、.api_serverなどはただのpublicなフィールドです。

        if let Some(start_time) = start_time_us {
            let delta_us = (chrono::Utc::now().timestamp_nanos() / 1000) as u64 - start_time;
            METRICS
                .api_server
                .process_startup_time_us
                .add(delta_us as usize);
        }

        if let Some(cpu_start_time) = start_time_cpu_us {
            let delta_us = fc_util::now_cputime_us() - cpu_start_time;
            METRICS
                .api_server
                .process_startup_time_cpu_us
                .add(delta_us as usize);
        }

HTTPサーバには、hyperを使っています。

        let http: Http<hyper::Chunk> = Http::new();

さて、次がよくわからんポイントです。ここで、fFuture traitを実装する型です。

        let f = listener
            .incoming()
            .for_each(|(stream, _)| {
                // For the sake of clarity: when we use self.efd.clone(), the intent is to
                // clone the wrapping Rc, not the EventFd itself.
                let service = ApiServerHttpService::new(
                    self.mmds_info.clone(),
                    self.vmm_shared_info.clone(),
                    self.api_request_sender.clone(),
                    self.efd.clone(),
                );
                let connection = http.serve_connection(stream, service);
                // todo: is spawn() any better/worse than execute()?
                // We have to adjust the future item and error, to fit spawn()'s definition.
                handle.spawn(connection.map(|_| ()).map_err(|_| ()));
                Ok(())
            }).map_err(Error::Io);

listener.incoming()は、IoStream<(UnixStream, SocketAddr)>を返します。incoming()は、listener自身を消費して、IoStream<(UnixStream, SocketAddr)>を作り出すようです。
ソースコードを見てみますが、難しいですね。

tokio-uds-0.1.7/src/lib,rs

    /// Consumes this listener, returning a stream of the sockets this listener
    /// accepts.
    ///
    /// This method returns an implementation of the `Stream` trait which
    /// resolves to the sockets the are accepted on this listener.
    pub fn incoming(self) -> IoStream<(UnixStream, SocketAddr)> {
        struct Incoming {
            inner: UnixListener,
        }

        impl Stream for Incoming {
            type Item = (UnixStream, SocketAddr);
            type Error = io::Error;

            fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
                Ok(Some(try_nb!(self.inner.accept())).into())
            }
        }

        Incoming { inner: self }.boxed()
    }

Stream traitが、SelfからItemへの変換する機能を実装していないと辻褄が合わないですね。
えーっと、Stream traitは、futures::Streamで、Iteratorの非同期バージョン、とのことです。なので、for_each()で処理しているのですね。
IoStreamは、下のように定義されているので、`Incoming { inner: self }.boxed()‘のところで型が合っていますね。

type IoStream<T> = Box<Stream<Item = T, Error = Error> + Send>;

incoming()の謎が解けたところで、for_each()に行くと、UnixStreamstreamとしてクロージャで受け取り、HTTP Serviceを作成して、UnixStreamとbindしてHTTPリクエストを捌いている、ということみたいですね。

            .incoming()
            .for_each(|(stream, _)| {
                // For the sake of clarity: when we use self.efd.clone(), the intent is to
                // clone the wrapping Rc, not the EventFd itself.
                let service = ApiServerHttpService::new(
                    self.mmds_info.clone(),
                    self.vmm_shared_info.clone(),
                    self.api_request_sender.clone(),
                    self.efd.clone(),
                );
                let connection = http.serve_connection(stream, service);
                // todo: is spawn() any better/worse than execute()?
                // We have to adjust the future item and error, to fit spawn()'s definition.
                handle.spawn(connection.map(|_| ()).map_err(|_| ()));
                Ok(())
            }).map_err(Error::Io);

ApiServiceHttpServiceは、来たrequestごとに作成されるサービスのようです。

// In hyper, a struct that implements the Service trait is created to handle each incoming
// request. This is the one for our ApiServer.
pub struct ApiServerHttpService {
    // MMDS info directly accessible from this API thread.
    mmds_info: Arc<Mutex<Mmds>>,
    // VMM instance info directly accessible from this API thread.
    vmm_shared_info: Arc<RwLock<InstanceInfo>>,
    // This allows sending messages to the VMM thread. It makes sense to use a Rc for the sender
    // (instead of cloning) because everything happens on a single thread, so there's no risk of
    // having races (if that was even a problem to begin with).
    api_request_sender: Rc<mpsc::Sender<Box<VmmAction>>>,
    // We write to this EventFd to let the VMM know about new messages.
    vmm_send_event: Rc<EventFd>,
}

ApiServerHttpServiceは、hyper::server::Service traitを実装しています。hyperでは、届いたリクエストごとにService` traitを実装したstructが生成されます。

api_server/src/http_service.rs
impl hyper::server::Service for ApiServerHttpService {
    type Request = hyper::Request;
    type Response = hyper::Response;
    type Error = hyper::error::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
...

serve_connectionは、Futureを返すようです。
hyper/src/server/mod.rsを見ると、Connectionを返しています。

hyper/src/server/mod.rs
    /// Bind a connection together with a Service.
    ///
    /// This returns a Future that must be polled in order for HTTP to be
    /// driven on the connection.
    pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>

Connectionの定義を見ると、下のようになっていました。うーん、わからん…。

/// A future binding a connection with a Service.
///
/// Polling this future will drive HTTP forward.
///
/// # Note
///
/// This will currently yield an unnameable (`Opaque`) value
/// on success. The purpose of this is that nothing can be assumed about
/// the type, not even it's name. It's probable that in a later release,
/// this future yields the underlying IO object, which could be done without
/// a breaking change.
///
/// It is likely best to just map the value to `()`, for now.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<I, S>
where
    S: HyperService,
    S::ResponseBody: Stream<Error=::Error>,
    <S::ResponseBody as Stream>::Item: AsRef<[u8]>,
{
...
}

ただ、コメントを見ると、It is likely best to just map the value to (), for now.とあるので、mapで()を返すクロージャを渡していることと辻褄が合います。

                let connection = http.serve_connection(stream, service);
                // todo: is spawn() any better/worse than execute()?
                // We have to adjust the future item and error, to fit spawn()'s definition.
                handle.spawn(connection.map(|_| ()).map_err(|_| ()));

おそらく、ここのconnection.map()で、ApiServerHttpServicecallが呼ばれる気がします。ApiServerHttpServicecallは次のようになっており、HTTPリクエストを処理しています。

api_server/src/http_service.rs
impl hyper::server::Service for ApiServerHttpService {
...
    fn call(&self, req: Self::Request) -> Self::Future {
...
        Box::new(req.body().concat2().and_then(move |b| {
            // When this will be executed, the body is available. We start by parsing the request.
            match parse_request(method, path.as_ref(), &b) {
                Ok(parsed_req) => match parsed_req {
                    GetInstanceInfo => {
...

matchのparse_request()へ潜っていくと、VMM Actionへパースされる処理が見つかります。下のようなテストが書かれています。

    #[test]
    fn test_parse_actions_req() {
        // PUT InstanceStart
        let json = "{
                \"action_type\": \"InstanceStart\"
              }";
        let body: Chunk = Chunk::from(json);
        let path = "/foo";

        match parse_actions_req(path, Method::Put, &body) {
            Ok(pr) => {
                let (sender, receiver) = oneshot::channel();
                assert!(pr.eq(&ParsedRequest::Sync(
                    VmmAction::StartMicroVm(sender),
                    receiver
                )));
            }
            _ => assert!(false),
        }
...

hyperの使い方がわかっていなくて、少し怪しいですが、これで、Unix domain socketに届いたHTTPリクエストが、その内容に応じて、VMM threadに届きそうです。
後は、REST APIが叩かれるの待ち、というところまで読んだところで、本記事は終了です!

ここまで拙い解説に付き合って下さった方、ありがとうございました。

25
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?