rust
tokio

[Rust] [tokio]Futureの実装とasync/await

いわゆる「やってみた系」の記事なので多くは望まないで欲しい。

何をするのか

Futureの自前実装による非同期処理とそれがasync/await導入でどのように変わるかを示す。
なおコードはRust2018で記述している。

最初のFuture

まず簡単なFutureを実装してみる。

use tokio::prelude::*;

struct MyFuture;

impl Future for MyFuture {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        println!("poll called");
        Ok(Async::Ready(()))
    }
}

fn main() {
    tokio::run(MyFuture{});
    println!("finished");
}

poll()は正常時には計算終了を表すAsync::Ready(...)または、まだ計算中であることを示すAsync::NotReadyを返す。poll()は自分の処理がまだ終わっていなくても(例えばスリープ中や電文送信中等)でも自分以外の要因で呼び出される可能性があるため、その際にはAsync::NotReadyを返してまだ自分の処理が終わっていないことを明示する必要がある。

実行するとMyFuturepoll()が呼ばれOk(Async::Ready(()))を返すため即座に処理は終了する。

$ cargo run
poll called
finished
$

Ok(Async::Ready(()))Ok(Async::NotReady)に変更するとどうなるか。

@@ -8,7 +8,7 @@ impl Future for MyFuture {

     fn poll(&mut self) -> Poll<(), ()> {
         println!("poll called");
-        Ok(Async::Ready(()))
+        Ok(Async::NotReady)
     }
 }

初回にpoll()が呼ばれてOk(Async::NotReady)を返すため処理は終了しないが、poll()を呼び出す要因を誰も発生させないためそのままpoll()も呼ばれず、処理も終了しないままとなる。

$ cargo run
poll called

UDPパケットを待ち受ける

use tokio::prelude::*;
use tokio::net::UdpSocket;
use std::net::SocketAddr;

struct EchoServer {
    socket: UdpSocket,
    buf: Vec<u8>,
}

impl EchoServer {
    fn new(socket: UdpSocket) -> EchoServer {
        EchoServer{socket: socket, buf: vec![0; 1024]}
    }
}

impl Future for EchoServer {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        match self.socket.poll_recv_from(&mut self.buf) {
            Ok(Async::Ready((_size, _peer))) => {
                println!("recv");
                Ok(Async::Ready(()))
            },
            Ok(Async::NotReady) => {
                println!("not ready");
                Ok(Async::NotReady)
            },
            Err(_) => Err(()),
        }
    }
}

fn main() {
    let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
    let socket = UdpSocket::bind(&addr).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    let server = EchoServer::new(socket);

    tokio::run(server);
}

$ cargo runして$ echo hoge | socat - UDP:localhost:9000してやると以下のように出力される。

$ cargo run
# [...]
Listening on: 127.0.0.1:9000
not ready
recv
$

UdpSocket::poll_recv_from()は初回のpoll()では電文が届いていないためAsync::NotReadyを返す。socat実行後再度poll()が呼ばれその際にはUdpSocket::poll_recv_from()Async::Ready(...)を返しEchoServer::poll()Async::Ready(())を返すため、処理が終了する。

いちいちpoll()に対してmatch書くのは面倒なのでtry_ready!マクロを使うとちょっとだけ簡単になる。

use tokio::prelude::*;
use tokio::net::UdpSocket;
use std::net::SocketAddr;

use futures::try_ready;

struct EchoServer {
    socket: UdpSocket,
    buf: Vec<u8>,
}

impl EchoServer {
    fn new(socket: UdpSocket) -> EchoServer {
        EchoServer{socket: socket, buf: vec![0; 1024]}
    }
}

impl Future for EchoServer {
    type Item = ();
    type Error = std::io::Error;

    fn poll(&mut self) -> Poll<(), std::io::Error> {
        let _ = try_ready!(self.socket.poll_recv_from(&mut self.buf));
        println!("connect");
        Ok(Async::Ready(()))
    }
}

fn main() {
    let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
    let socket = UdpSocket::bind(&addr).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    let server = EchoServer::new(socket);

    tokio::run(server.map_err(|_| ()));
}

try_ready!()Errを直接returnするためEchoServer::Errorが変更されている点とtokio::run()の引数型に合わせるためmap_err()してる点に注意されたし。

Echoする

もう一歩進めてEchoしてみる。

struct EchoServer {
    socket: UdpSocket,
    buf: Vec<u8>,
    to_send: Option<(usize, SocketAddr)>,
}

[...]

impl Future for EchoServer {

    [...]

    fn poll(&mut self) -> Poll<(), std::io::Error> {
        loop {
            if self.to_send == None {
                self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
            }

            if let Some((size, peer)) = self.to_send {
                let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
                println!("Echoed {}/{} bytes to {}", amt, size, peer);
                self.to_send = None;
            }
        }
    }
}

recvしてsendするだけのはずが、loopだのifだのあってややこしい。少し解説したい。

まずUdpSocket::poll_recv_from()の部分は前回説明したとおり、受信していればself.to_sendに受信バイト数と送信元情報を格納する。

次に受信した電文をUdpSocket::poll_send_to()でechoするわけだが、ここでpeerが送信可能になっていない可能性が有り、その場合はUdpSocket::poll_send_to()Async::NotReadyが返ってくる。その際try_ready!()poll()を抜けるわけだが、次回poll()が呼ばれた際に1つめのif文が無いとechoしてないのに受信待ちを行ってしまうことになってしまうのだ。つまり1つめのif文があることで送信が完了するまでUdpSpcket::poll_recv_from()をスキップし続けることが可能となる。

またloopの意味だが、例えば以下のようにloop無しになっていると

    fn poll(&mut self) -> Poll<(), std::io::Error> {
        if self.to_send == None {
            self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
        }

        if let Some((size, peer)) = self.to_send {
            let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
            println!("Echoed {}/{} bytes to {}", amt, size, peer);
            self.to_send = None;
        }

        Ok(Async::NotReady)
    }

1回はechoされるが、次回の受信待ちができなくなってしまう。イベントループに対して受信時にpoll()が呼ばれるようにするためには再度UdpSocket::poll_recv_from()を呼び出す必要がある。

async/awaitを使う

以上で見てもらったようにFutureを自前実装する場合は、今は受信中なのか送信中なのかといった状態遷移を考えなければならい。これがasync/awaitでどう簡単になるか見てみよう。

Cargo.toml
[...]

[dependencies]
tokio = { git = "https://github.com/tokio-rs/tokio", features=["async-await-preview"] }
tokio-async-await = { git = "https://github.com/tokio-rs/tokio" }
tokio-tcp = { git = "https://github.com/tokio-rs/tokio" }
futures-preview = "0.3.0-alpha.9"
#![feature(pin, await_macro, async_await, futures_api)]
use tokio::await;
use tokio::net::UdpSocket;
use std::net::SocketAddr;

fn main() {
    let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
    let mut socket = UdpSocket::bind(&addr).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    tokio::run_async(async move {
        loop {
            let buf = vec![0; 1024];
            let (sock, buf, size, peer) = await!(socket.recv_dgram(buf)).unwrap();
            let (sock, _) = await!(sock.send_dgram(buf, &peer)).unwrap();
            socket = sock;
            println!("Echoed {} to {}", size, peer);
        }
    });
}

UdpSocketにはFuture(RecvDgram, SendDgram)を返すrecv_dgram(),send_dgram()があるのでそれらのFutureに対してawait!()を実行している。
RecvDgramSendDgramは自作Futurepoll()内でも利用可能である。
上記自作FutureRecvDgramSendDgramを利用するのは読者への宿題としたい。

Futureモナドを利用したコード

大変さを強調するためFutureを自前で実装するということをやってみたが、実はtokioの本来の使い方的にはasync/awaitを使用せずとももう少し簡単に書くことができる。

use tokio::prelude::*;
use futures::future::{loop_fn, Loop};
use tokio::net::UdpSocket;
use std::net::SocketAddr;

fn main() {
    let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
    let socket = UdpSocket::bind(&addr).unwrap();
    println!("Listening on: {}", socket.local_addr().unwrap());

    let server = loop_fn(socket, |socket| {
        let buf = vec![0; 1024];
        socket.recv_dgram(buf)
            .and_then(|(socket, buf, _size, peer)| socket.send_dgram(buf, &peer))
            .and_then(|(socket, _)| Ok(Loop::Continue(socket)))
    });

    tokio::run(server.map_err(|_| ()));
}

ただこれも複雑な処理になってくるといわゆる「コールバック地獄」になるわけで、やはり早くasync/await来てほしいなという感じである。