3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Rust] osaka.rs メモ

Last updated at Posted at 2019-02-10

今週のThis Week in Rustに出てきたosaka.rsにちょっと興味惹かれたので読んでみたときの個人メモ

osaka.rsとは?

mioをベースに、asyncもどきの#[osaka]await!もどきのsync!を実装したお試し非同期フレームワークという感じ。

#[osaka]

#[proc_macro_attribute]については

osaka-macro/lib.rs
    let output = match f.decl.output.clone() {
        syn::ReturnType::Default => quote! {()},
        syn::ReturnType::Type(_, t) => quote! {#t},
    };

    f.decl.output = match syn::parse((quote! {
        -> osaka::Task<#output>
    }).into(),
    ) {
        Ok(v) => v,
        Err(e) => return e.to_compile_error().into(),
    };

返り値をosaka::Task<>でラップしている。

syn::ReturnType::Defaultは返り値無しの関数の場合。

osaka/lib.rs
    let oblock = f.block;
    f.block = match syn::parse(
        (quote! {{
            use std::ops::Generator;
            let mut l = move||{
                #oblock
            };

            let a = match unsafe { l.resume() } {
                std::ops::GeneratorState::Complete(y) => {
                    return osaka::Task::immediate(y);
                }
                std::ops::GeneratorState::Yielded(y) => {
                    y
                }
            };
            osaka::Task::new(Box::new(l),a)
        }})
        .into(),
    ) {
        Ok(v) => v,
        Err(e) => return e.to_compile_error().into(),
    };

関数のブロックをラッピングしてstd::ops::Generatorを作ってすぐにresume()で実行し、作ったジェネレータと中断結果(完了した場合はosaka::Task::immediate)を返す関数を作り、それをf.blockに代入することで元の関数を作り変えている。

この#[osaka]はどんな関数にも付けられるというものではなく、ブロック内部でyield(または後述のsync!)を呼んでいる必要があるため、どちらかというとジェネレータを普通の関数に見せかける仕組みという方が正しいように思う。

sync!

osaka/lib.rs
macro_rules! sync {
    ($task:expr) => {{
        use osaka::FutureResult;
        use osaka::Future;
        loop {
            match $task.poll() {
                FutureResult::Done(y) => {
                    break y;
                }
                FutureResult::Again(y) => {
                    yield y;
                }
            }
        }
    };};
}

引数の$task#[osaka]付きの関数(?)で生成されたosaka::Taskまたはジェネレータ。
yieldを上へと伝搬させていくawait!もどきの役割。

Task::run()

ブロックしてイベントを待つ。

osaka/lib.rs
                    a.poll.poll(&mut events, timeout).expect("poll");

                    for token in &a.tokens {
                        token.active.store(0, Ordering::SeqCst);
                    }
                    for event in &events {
                        for token in &a.tokens {
                            if event.token() == token.m {
                                debug!("token {:?} activated", token.m);
                                token.active.store(1, Ordering::SeqCst);
                            }
                        }
                    }

mio::Poll::poll()でイベントを待って、来たら対応するtoken.activeを有効にする。

osaka/lib.rs
            if let FutureResult::Done(v) = self.poll() {
                return v;
            }

poll()を呼んでイベントを処理。Doneなら上に返る。Againならloopで再度mio::Poll::poll()のイベント待ちへ。

Task::poll()

osaka::Future::poll()の実装。

osaka/lib.rs
        match self {
            Task::Immediate{r} =>  {
                return FutureResult::Done(r.take().expect("immediate polled after completion"));
            }
            Task::Later{f,a} =>  {
                let mut ready = false;

                if let Some(deadline) = a.deadline {
                    if Instant::now() >= deadline {
                        debug!("task wakeup caused by deadline");
                        a.deadline = None;
                        ready = true;
                    }
                }

                if !ready {
                    for token in &a.tokens {
                        if token.active.load(Ordering::SeqCst) > 0 {
                            debug!("task wakeup caused by token readyness");
                            ready = true;
                            break;
                        }
                    }
                }

                if ready {
                    match f.poll() {
                        FutureResult::Done(y) => {
                            return FutureResult::Done(y);
                        },
                        FutureResult::Again(a2) => {
                            *a = a2;
                        }
                    }
                }

                FutureResult::Again(a.clone())
            }
        }

デッドラインに到達、もしくはイベントが来ていれば、osaka::Future(ジェネレータ or osaka::Task)に対してFuture::poll()を実行しFutureResultを返す。

動かすものがないならそのままAgainを返す。

Again

osaka/lib.rs
#[derive(Clone)]
pub struct Again {
    tokens:     Vec<Token>,
    deadline:   Option<Instant>,
    poll:       Arc<mio::Poll>,
}

イベント待ちをするための情報。Task::Laterのメンバーとしてsync!で伝搬していき、Task::run()でイベント待ちするときに利用される。

README.mdの例

以上を踏まえてREADME.mdを見てみる

概要

resolve()では引数のpollに待つイベントを登録し、イベントがあるごとにyield poll.againから再開して応答を受信。
test()ではresolve()で作ったタスクをsync!で同期的に待つだけ。

詳細

README.md
#[osaka]
pub fn resolve(poll: Poll, names: Vec<String>) -> Result<Vec<String>, Error> {
    //...
    let sock = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap()).map_err(|e| Error::Io(e))?;
    let token = poll
        .register(&sock, mio::Ready::readable(), mio::PollOpt::level())
        .unwrap();
    //...

    // wait for a packet
    let pkt = match loop {
        // wait for the token to be ready, or timeout
        yield poll.again(token.clone(), Some(Duration::from_secs(5)));
        if now.elapsed() >= Duration::from_secs(5) {
            // timeout
            break None;
        }
        // now the socket _should_ be ready
        let (len, from) = match sock.recv_from(&mut buf) {
            Ok(v) => v,
            Err(e) => {
                // but just in case it isn't lets re-loop
                if e.kind() == std::io::ErrorKind::WouldBlock {
                    continue;
                }
                return Err(Error::Io(e));
            }
        };
    }

    // do stuff with the pkt
    // ...
}

pub fn test(poll: Poll) -> Result<(), Error> {
    let mut a = resolve(
        poll.clone(),
        vec![
            "3.carrier.devguard.io".into(),
        ],
    );
    let y = osaka::sync!(a);
    println!("resolved: {:?}", y);
    Ok(())
}

pub fn main() {
    tinylogger::init().ok();
    let poll = osaka::Poll::new();
    test(poll).run().unwrap();
}

以下雑多なメモ

これ、test()#[osaka]付ける必要あるのではないだろうか。以降test()#[osaka]付いてる前提で話を進める。
あとこのままだといろいろコンパイルは通らないので動かすためには結構修正必要。

#[osaka]の効果でresolve()ではosaka::Task<>が返る。
最初の実行では引数のpollに対するイベント登録をした後、yield poll.again();
osaka::Task::Later{Box<Generator>, Again}を返す。

test()osaka::Task<>を返す。test()のブロックもジェネレータ化されているので、最初の呼出でresolve()を呼んでsync!の中で上のLaterに対してpoll()を実行する。
最初の呼び出しては通常イベントは発生していないので上にAgainが返って、そのAgainを元にTaskが作られてtest(poll)の返り値となる。

上の返り値にさらにTask::run()を実行する。Task::run()はブロッキングメソッドでmio::Poll::poll()を使って実際にブロッキングでイベント待ちをおこなう。イベントが来た場合はTask::poll()を呼び出す。あとは「mio::Poll::poll()してTask::poll()」の処理を全ての処理が終わるまで繰り返す。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?