いわゆる「やってみた系」の記事なので多くは望まないで欲しい。
何をするのか
Future
の自前実装による非同期処理とそれがasync/await導入でどのように変わるかを示す。
なおコードはRust2018で記述している。
最初のFuture
まず簡単なFutureを実装してみる。
use tokio::prelude::*;
struct MyFuture;
impl Future for MyFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
println!("poll called");
Ok(Async::Ready(()))
}
}
fn main() {
tokio::run(MyFuture{});
println!("finished");
}
poll()
は正常時には計算終了を表すAsync::Ready(...)
または、まだ計算中であることを示すAsync::NotReady
を返す。poll()
は自分の処理がまだ終わっていなくても(例えばスリープ中や電文送信中等)でも自分以外の要因で(要調査)呼び出される可能性があるため、その際にはAsync::NotReady
を返してまだ自分の処理が終わっていないことを明示する必要がある。
実行するとMyFuture
のpoll()
が呼ばれOk(Async::Ready(()))
を返すため即座に処理は終了する。
$ cargo run
poll called
finished
$
Ok(Async::Ready(()))
をOk(Async::NotReady)
に変更するとどうなるか。
@@ -8,7 +8,7 @@ impl Future for MyFuture {
fn poll(&mut self) -> Poll<(), ()> {
println!("poll called");
- Ok(Async::Ready(()))
+ Ok(Async::NotReady)
}
}
初回にpoll()
が呼ばれてOk(Async::NotReady)
を返すため処理は終了しないが、poll()
を呼び出す要因を誰も発生させないためそのままpoll()
も呼ばれず、処理も終了しないままとなる。
$ cargo run
poll called
UDPパケットを待ち受ける
use tokio::prelude::*;
use tokio::net::UdpSocket;
use std::net::SocketAddr;
struct EchoServer {
socket: UdpSocket,
buf: Vec<u8>,
}
impl EchoServer {
fn new(socket: UdpSocket) -> EchoServer {
EchoServer{socket: socket, buf: vec![0; 1024]}
}
}
impl Future for EchoServer {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.socket.poll_recv_from(&mut self.buf) {
Ok(Async::Ready((_size, _peer))) => {
println!("recv");
Ok(Async::Ready(()))
},
Ok(Async::NotReady) => {
println!("not ready");
Ok(Async::NotReady)
},
Err(_) => Err(()),
}
}
}
fn main() {
let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let server = EchoServer::new(socket);
tokio::run(server);
}
$ cargo run
して$ echo hoge | socat - UDP:localhost:9000
してやると以下のように出力される。
$ cargo run
# [...]
Listening on: 127.0.0.1:9000
not ready
recv
$
UdpSocket::poll_recv_from()
は初回のpoll()
では電文が届いていないためAsync::NotReady
を返す。socat
実行後再度poll()
が呼ばれその際にはUdpSocket::poll_recv_from()
はAsync::Ready(...)
を返しEchoServer::poll()
もAsync::Ready(())
を返すため、処理が終了する。
いちいちpoll()
に対してmatch
書くのは面倒なのでtry_ready!
マクロを使うとちょっとだけ簡単になる。
use tokio::prelude::*;
use tokio::net::UdpSocket;
use std::net::SocketAddr;
use futures::try_ready;
struct EchoServer {
socket: UdpSocket,
buf: Vec<u8>,
}
impl EchoServer {
fn new(socket: UdpSocket) -> EchoServer {
EchoServer{socket: socket, buf: vec![0; 1024]}
}
}
impl Future for EchoServer {
type Item = ();
type Error = std::io::Error;
fn poll(&mut self) -> Poll<(), std::io::Error> {
let _ = try_ready!(self.socket.poll_recv_from(&mut self.buf));
println!("connect");
Ok(Async::Ready(()))
}
}
fn main() {
let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let server = EchoServer::new(socket);
tokio::run(server.map_err(|_| ()));
}
try_ready!()
がErr
を直接returnするためEchoServer::Error
が変更されている点とtokio::run()
の引数型に合わせるためmap_err()
してる点に注意されたし。
Echoする
もう一歩進めてEchoしてみる。
struct EchoServer {
socket: UdpSocket,
buf: Vec<u8>,
to_send: Option<(usize, SocketAddr)>,
}
[...]
impl Future for EchoServer {
[...]
fn poll(&mut self) -> Poll<(), std::io::Error> {
loop {
if self.to_send == None {
self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
}
if let Some((size, peer)) = self.to_send {
let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
println!("Echoed {}/{} bytes to {}", amt, size, peer);
self.to_send = None;
}
}
}
}
recvしてsendするだけのはずが、loopだのifだのあってややこしい。少し解説したい。
まずUdpSocket::poll_recv_from()
の部分は前回説明したとおり、受信していればself.to_send
に受信バイト数と送信元情報を格納する。
次に受信した電文をUdpSocket::poll_send_to()
でechoするわけだが、ここでpeer
が送信可能になっていない可能性が有り、その場合はUdpSocket::poll_send_to()
でAsync::NotReady
が返ってくる。その際try_ready!()
でpoll()
を抜けるわけだが、次回poll()
が呼ばれた際に1つめのif文が無いとechoしてないのに受信待ちを行ってしまうことになってしまうのだ。つまり1つめのif文があることで送信が完了するまでUdpSpcket::poll_recv_from()
をスキップし続けることが可能となる。
またloopの意味だが、例えば以下のようにloop無しになっていると
fn poll(&mut self) -> Poll<(), std::io::Error> {
if self.to_send == None {
self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
}
if let Some((size, peer)) = self.to_send {
let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
println!("Echoed {}/{} bytes to {}", amt, size, peer);
self.to_send = None;
}
Ok(Async::NotReady)
}
1回はechoされるが、次回の受信待ちができなくなってしまう。イベントループに対して受信時にpoll()
が呼ばれるようにするためには再度UdpSocket::poll_recv_from()
を呼び出す必要がある。
async/awaitを使う
以上で見てもらったようにFuture
を自前実装する場合は、今は受信中なのか送信中なのかといった状態遷移を考えなければならい。これがasync/awaitでどう簡単になるか見てみよう。
[...]
[dependencies]
tokio = { git = "https://github.com/tokio-rs/tokio", features=["async-await-preview"] }
tokio-async-await = { git = "https://github.com/tokio-rs/tokio" }
tokio-tcp = { git = "https://github.com/tokio-rs/tokio" }
futures-preview = "0.3.0-alpha.9"
#![feature(pin, await_macro, async_await, futures_api)]
use tokio::await;
use tokio::net::UdpSocket;
use std::net::SocketAddr;
fn main() {
let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
let mut socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
tokio::run_async(async move {
loop {
let buf = vec![0; 1024];
let (sock, buf, size, peer) = await!(socket.recv_dgram(buf)).unwrap();
let (sock, _) = await!(sock.send_dgram(buf, &peer)).unwrap();
socket = sock;
println!("Echoed {} to {}", size, peer);
}
});
}
UdpSocket
にはFuture
(RecvDgram
, SendDgram
)を返すrecv_dgram()
,send_dgram()
があるのでそれらのFuture
に対してawait!()
を実行している。
RecvDgram
やSendDgram
は自作Future
のpoll()
内でも利用可能である。
上記自作Future
でRecvDgram
やSendDgram
を利用するのは読者への宿題としたい。
Future
モナドを利用したコード
大変さを強調するためFuture
を自前で実装するということをやってみたが、実はtokioの本来の使い方的にはasync/awaitを使用せずとももう少し簡単に書くことができる。
use tokio::prelude::*;
use futures::future::{loop_fn, Loop};
use tokio::net::UdpSocket;
use std::net::SocketAddr;
fn main() {
let addr = "127.0.0.1:9000".to_string().parse::<SocketAddr>().unwrap();
let socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
let server = loop_fn(socket, |socket| {
let buf = vec![0; 1024];
socket.recv_dgram(buf)
.and_then(|(socket, buf, _size, peer)| socket.send_dgram(buf, &peer))
.and_then(|(socket, _)| Ok(Loop::Continue(socket)))
});
tokio::run(server.map_err(|_| ()));
}
ただこれも複雑な処理になってくるといわゆる「コールバック地獄」になるわけで、やはり早くasync/await来てほしいなという感じである。