今週のThis Week in Rustに出てきたosaka.rsにちょっと興味惹かれたので読んでみたときの個人メモ
osaka.rsとは?
mioをベースに、asyncもどきの#[osaka]とawait!もどきのsync!を実装したお試し非同期フレームワークという感じ。
#[osaka]
#[proc_macro_attribute]については
let output = match f.decl.output.clone() {
syn::ReturnType::Default => quote! {()},
syn::ReturnType::Type(_, t) => quote! {#t},
};
f.decl.output = match syn::parse((quote! {
-> osaka::Task<#output>
}).into(),
) {
Ok(v) => v,
Err(e) => return e.to_compile_error().into(),
};
返り値をosaka::Task<>でラップしている。
syn::ReturnType::Defaultは返り値無しの関数の場合。
let oblock = f.block;
f.block = match syn::parse(
(quote! {{
use std::ops::Generator;
let mut l = move||{
#oblock
};
let a = match unsafe { l.resume() } {
std::ops::GeneratorState::Complete(y) => {
return osaka::Task::immediate(y);
}
std::ops::GeneratorState::Yielded(y) => {
y
}
};
osaka::Task::new(Box::new(l),a)
}})
.into(),
) {
Ok(v) => v,
Err(e) => return e.to_compile_error().into(),
};
関数のブロックをラッピングしてstd::ops::Generatorを作ってすぐにresume()で実行し、作ったジェネレータと中断結果(完了した場合はosaka::Task::immediate)を返す関数を作り、それをf.blockに代入することで元の関数を作り変えている。
この#[osaka]はどんな関数にも付けられるというものではなく、ブロック内部でyield(または後述のsync!)を呼んでいる必要があるため、どちらかというとジェネレータを普通の関数に見せかける仕組みという方が正しいように思う。
sync!
macro_rules! sync {
($task:expr) => {{
use osaka::FutureResult;
use osaka::Future;
loop {
match $task.poll() {
FutureResult::Done(y) => {
break y;
}
FutureResult::Again(y) => {
yield y;
}
}
}
};};
}
引数の$taskは#[osaka]付きの関数(?)で生成されたosaka::Taskまたはジェネレータ。
yieldを上へと伝搬させていくawait!もどきの役割。
Task::run()
ブロックしてイベントを待つ。
a.poll.poll(&mut events, timeout).expect("poll");
for token in &a.tokens {
token.active.store(0, Ordering::SeqCst);
}
for event in &events {
for token in &a.tokens {
if event.token() == token.m {
debug!("token {:?} activated", token.m);
token.active.store(1, Ordering::SeqCst);
}
}
}
mio::Poll::poll()でイベントを待って、来たら対応するtoken.activeを有効にする。
if let FutureResult::Done(v) = self.poll() {
return v;
}
poll()を呼んでイベントを処理。Doneなら上に返る。Againならloopで再度mio::Poll::poll()のイベント待ちへ。
Task::poll()
osaka::Future::poll()の実装。
match self {
Task::Immediate{r} => {
return FutureResult::Done(r.take().expect("immediate polled after completion"));
}
Task::Later{f,a} => {
let mut ready = false;
if let Some(deadline) = a.deadline {
if Instant::now() >= deadline {
debug!("task wakeup caused by deadline");
a.deadline = None;
ready = true;
}
}
if !ready {
for token in &a.tokens {
if token.active.load(Ordering::SeqCst) > 0 {
debug!("task wakeup caused by token readyness");
ready = true;
break;
}
}
}
if ready {
match f.poll() {
FutureResult::Done(y) => {
return FutureResult::Done(y);
},
FutureResult::Again(a2) => {
*a = a2;
}
}
}
FutureResult::Again(a.clone())
}
}
デッドラインに到達、もしくはイベントが来ていれば、osaka::Future(ジェネレータ or osaka::Task)に対してFuture::poll()を実行しFutureResultを返す。
動かすものがないならそのままAgainを返す。
Again
# [derive(Clone)]
pub struct Again {
tokens: Vec<Token>,
deadline: Option<Instant>,
poll: Arc<mio::Poll>,
}
イベント待ちをするための情報。Task::Laterのメンバーとしてsync!で伝搬していき、Task::run()でイベント待ちするときに利用される。
README.mdの例
以上を踏まえてREADME.mdを見てみる
概要
resolve()では引数のpollに待つイベントを登録し、イベントがあるごとにyield poll.againから再開して応答を受信。
test()ではresolve()で作ったタスクをsync!で同期的に待つだけ。
詳細
# [osaka]
pub fn resolve(poll: Poll, names: Vec<String>) -> Result<Vec<String>, Error> {
//...
let sock = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).map_err(|e| Error::Io(e))?;
let token = poll
.register(&sock, mio::Ready::readable(), mio::PollOpt::level())
.unwrap();
//...
// wait for a packet
let pkt = match loop {
// wait for the token to be ready, or timeout
yield poll.again(token.clone(), Some(Duration::from_secs(5)));
if now.elapsed() >= Duration::from_secs(5) {
// timeout
break None;
}
// now the socket _should_ be ready
let (len, from) = match sock.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
// but just in case it isn't lets re-loop
if e.kind() == std::io::ErrorKind::WouldBlock {
continue;
}
return Err(Error::Io(e));
}
};
}
// do stuff with the pkt
// ...
}
pub fn test(poll: Poll) -> Result<(), Error> {
let mut a = resolve(
poll.clone(),
vec![
"3.carrier.devguard.io".into(),
],
);
let y = osaka::sync!(a);
println!("resolved: {:?}", y);
Ok(())
}
pub fn main() {
tinylogger::init().ok();
let poll = osaka::Poll::new();
test(poll).run().unwrap();
}
以下雑多なメモ
これ、test()に#[osaka]付ける必要あるのではないだろうか。以降test()に#[osaka]付いてる前提で話を進める。
あとこのままだといろいろコンパイルは通らないので動かすためには結構修正必要。
#[osaka]の効果でresolve()ではosaka::Task<>が返る。
最初の実行では引数のpollに対するイベント登録をした後、yield poll.again();で
osaka::Task::Later{Box<Generator>, Again}を返す。
test()もosaka::Task<>を返す。test()のブロックもジェネレータ化されているので、最初の呼出でresolve()を呼んでsync!の中で上のLaterに対してpoll()を実行する。
最初の呼び出しては通常イベントは発生していないので上にAgainが返って、そのAgainを元にTaskが作られてtest(poll)の返り値となる。
上の返り値にさらにTask::run()を実行する。Task::run()はブロッキングメソッドでmio::Poll::poll()を使って実際にブロッキングでイベント待ちをおこなう。イベントが来た場合はTask::poll()を呼び出す。あとは「mio::Poll::poll()してTask::poll()」の処理を全ての処理が終わるまで繰り返す。