今週のThis Week in Rustに出てきたosaka.rsにちょっと興味惹かれたので読んでみたときの個人メモ
osaka.rsとは?
mioをベースに、async
もどきの#[osaka]
とawait!
もどきのsync!
を実装したお試し非同期フレームワークという感じ。
#[osaka]
#[proc_macro_attribute]
については
let output = match f.decl.output.clone() {
syn::ReturnType::Default => quote! {()},
syn::ReturnType::Type(_, t) => quote! {#t},
};
f.decl.output = match syn::parse((quote! {
-> osaka::Task<#output>
}).into(),
) {
Ok(v) => v,
Err(e) => return e.to_compile_error().into(),
};
返り値をosaka::Task<>
でラップしている。
syn::ReturnType::Default
は返り値無しの関数の場合。
let oblock = f.block;
f.block = match syn::parse(
(quote! {{
use std::ops::Generator;
let mut l = move||{
#oblock
};
let a = match unsafe { l.resume() } {
std::ops::GeneratorState::Complete(y) => {
return osaka::Task::immediate(y);
}
std::ops::GeneratorState::Yielded(y) => {
y
}
};
osaka::Task::new(Box::new(l),a)
}})
.into(),
) {
Ok(v) => v,
Err(e) => return e.to_compile_error().into(),
};
関数のブロックをラッピングしてstd::ops::Generator
を作ってすぐにresume()
で実行し、作ったジェネレータと中断結果(完了した場合はosaka::Task::immediate
)を返す関数を作り、それをf.block
に代入することで元の関数を作り変えている。
この#[osaka]
はどんな関数にも付けられるというものではなく、ブロック内部でyield
(または後述のsync!
)を呼んでいる必要があるため、どちらかというとジェネレータを普通の関数に見せかける仕組みという方が正しいように思う。
sync!
macro_rules! sync {
($task:expr) => {{
use osaka::FutureResult;
use osaka::Future;
loop {
match $task.poll() {
FutureResult::Done(y) => {
break y;
}
FutureResult::Again(y) => {
yield y;
}
}
}
};};
}
引数の$task
は#[osaka]
付きの関数(?)で生成されたosaka::Task
またはジェネレータ。
yield
を上へと伝搬させていくawait!
もどきの役割。
Task::run()
ブロックしてイベントを待つ。
a.poll.poll(&mut events, timeout).expect("poll");
for token in &a.tokens {
token.active.store(0, Ordering::SeqCst);
}
for event in &events {
for token in &a.tokens {
if event.token() == token.m {
debug!("token {:?} activated", token.m);
token.active.store(1, Ordering::SeqCst);
}
}
}
mio::Poll::poll()
でイベントを待って、来たら対応するtoken.active
を有効にする。
if let FutureResult::Done(v) = self.poll() {
return v;
}
poll()
を呼んでイベントを処理。Done
なら上に返る。Again
ならloop
で再度mio::Poll::poll()
のイベント待ちへ。
Task::poll()
osaka::Future::poll()
の実装。
match self {
Task::Immediate{r} => {
return FutureResult::Done(r.take().expect("immediate polled after completion"));
}
Task::Later{f,a} => {
let mut ready = false;
if let Some(deadline) = a.deadline {
if Instant::now() >= deadline {
debug!("task wakeup caused by deadline");
a.deadline = None;
ready = true;
}
}
if !ready {
for token in &a.tokens {
if token.active.load(Ordering::SeqCst) > 0 {
debug!("task wakeup caused by token readyness");
ready = true;
break;
}
}
}
if ready {
match f.poll() {
FutureResult::Done(y) => {
return FutureResult::Done(y);
},
FutureResult::Again(a2) => {
*a = a2;
}
}
}
FutureResult::Again(a.clone())
}
}
デッドラインに到達、もしくはイベントが来ていれば、osaka::Future
(ジェネレータ or osaka::Task
)に対してFuture::poll()
を実行しFutureResult
を返す。
動かすものがないならそのままAgain
を返す。
Again
#[derive(Clone)]
pub struct Again {
tokens: Vec<Token>,
deadline: Option<Instant>,
poll: Arc<mio::Poll>,
}
イベント待ちをするための情報。Task::Later
のメンバーとしてsync!
で伝搬していき、Task::run()
でイベント待ちするときに利用される。
README.mdの例
以上を踏まえてREADME.mdを見てみる
概要
resolve()
では引数のpoll
に待つイベントを登録し、イベントがあるごとにyield poll.again
から再開して応答を受信。
test()
ではresolve()
で作ったタスクをsync!
で同期的に待つだけ。
詳細
#[osaka]
pub fn resolve(poll: Poll, names: Vec<String>) -> Result<Vec<String>, Error> {
//...
let sock = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).map_err(|e| Error::Io(e))?;
let token = poll
.register(&sock, mio::Ready::readable(), mio::PollOpt::level())
.unwrap();
//...
// wait for a packet
let pkt = match loop {
// wait for the token to be ready, or timeout
yield poll.again(token.clone(), Some(Duration::from_secs(5)));
if now.elapsed() >= Duration::from_secs(5) {
// timeout
break None;
}
// now the socket _should_ be ready
let (len, from) = match sock.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
// but just in case it isn't lets re-loop
if e.kind() == std::io::ErrorKind::WouldBlock {
continue;
}
return Err(Error::Io(e));
}
};
}
// do stuff with the pkt
// ...
}
pub fn test(poll: Poll) -> Result<(), Error> {
let mut a = resolve(
poll.clone(),
vec![
"3.carrier.devguard.io".into(),
],
);
let y = osaka::sync!(a);
println!("resolved: {:?}", y);
Ok(())
}
pub fn main() {
tinylogger::init().ok();
let poll = osaka::Poll::new();
test(poll).run().unwrap();
}
以下雑多なメモ
これ、test()
に#[osaka]
付ける必要あるのではないだろうか。以降test()
に#[osaka]
付いてる前提で話を進める。
あとこのままだといろいろコンパイルは通らないので動かすためには結構修正必要。
#[osaka]
の効果でresolve()
ではosaka::Task<>
が返る。
最初の実行では引数のpoll
に対するイベント登録をした後、yield poll.again();
で
osaka::Task::Later{Box<Generator>, Again}
を返す。
test()
もosaka::Task<>
を返す。test()
のブロックもジェネレータ化されているので、最初の呼出でresolve()
を呼んでsync!
の中で上のLater
に対してpoll()
を実行する。
最初の呼び出しては通常イベントは発生していないので上にAgain
が返って、そのAgain
を元にTask
が作られてtest(poll)
の返り値となる。
上の返り値にさらにTask::run()
を実行する。Task::run()
はブロッキングメソッドでmio::Poll::poll()
を使って実際にブロッキングでイベント待ちをおこなう。イベントが来た場合はTask::poll()
を呼び出す。あとは「mio::Poll::poll()
してTask::poll()
」の処理を全ての処理が終わるまで繰り返す。