6
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

六本木の某FinTech集団Advent Calendar 2019

Day 7

Migrate Tokio from 0.1.22 to 0.2

Last updated at Posted at 2019-12-09

Tokio Changes Cheatsheet

Tokio 0.1.22 Tokio 0.2
Poll Type Result<Async<Item>, Error>
or
Poll<Item, Error>
Poll<Output>
Future Required Method fn poll(&mut self) -> Poll<Item, Error> fn poll(self: Pin<&mut Self>, cx: &mut [Context]) -> [Poll]\
Dependencies futures v0.1.18~23
bytes v0.4.7
mio v0.6.14
futures 0.3.0
bytes v0.5.0
mio v0.6.20
Stream Required Method fn poll(&mut self) -> Poll<Option<Item>, Error> fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
Sink Required Methods fn start_send(&mut self, item: SinkItem) -> Result<[AsyncSink], SinkError>
***fn [poll_complete]***(&mut self) -> Result<[Async]<()>, SinkError>
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>>
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Error>
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>>
fn poll_close(self: Pin<&mut Self>, cx: &mut [Context]) -> [Poll]\>
Interval Path12 tokio::timer::Interval tokio::time::Interval
Delay Path32 tokio::timer::Delay tokio::time::Delay
Execution Unit tokio::prelude::task::Task std::task::Waker
Runtime Changes - replace reactor() with handle()
- remove executor() and TaskExecutor
- remove shutdown_on_idle()
- spawn() now returns JoinHandle future

Migrate in Action

Let's start from an example:

#![feature(test)]

extern crate test;
use futures::future;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel;

struct Dual<T> {
    r1: T,
    r2: T,
}

impl<E, T: Stream<Item = i32, Error = E>> Stream for Dual<T> {
    type Item = i32;
    type Error = E;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let mut end = false;
        // let us poll both in naiive ways:
        match self.r1.poll()? {
            Async::Ready(Some(e)) => return Ok(Async::Ready(Some(e))),
            Async::Ready(None) => {
                end = true;
            }
            Async::NotReady => {}
        };
        match self.r2.poll()? {
            Async::Ready(Some(e)) => Ok(Async::Ready(Some(e))),
            Async::Ready(None) => {
                if end {
                    Ok(Async::Ready(None))
                } else {
                    Ok(Async::NotReady)
                }
            }
            Async::NotReady => Ok(Async::NotReady),
        }
    }
}

#[test]
fn test_dual_poll() {
    let mut rt = Runtime::new().unwrap();
    let (mut tx, rx) = unbounded_channel();
    let (mut txx, rxx) = unbounded_channel();
    rt.spawn(future::lazy(move || {
        for i in 0..100000_i32 {
            tx.try_send(i).unwrap();
        }
        Ok(())
    }));
    rt.spawn(future::lazy(move || {
        for i in 100000..200000_i32 {
            txx.try_send(i).unwrap();
        }
        Ok(())
    }));
    let dual = Dual { r1: rxx, r2: rx };
    let task = dual.take(200000).collect().map_err(|_| ()).and_then(|x| {
        assert_eq!(x.len(), 200000);
        Ok(())
    });
    rt.spawn(task);
    rt.shutdown_on_idle().wait().unwrap();
}

Let's start from rewriting old Stream to a newer one. First, we have to change from poll() into poll_next().
Notice that Errors won't be exported anymore from the Stream unless you define type Item as Result. Normally, if we want to make it compatible with our old Stream, we'll give the new Item with the same type as our old Item.

So our code becomes:

impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
    type Item = i32;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {

To make use of the Pinned self, we could call Pin::get_mut() to get the mutable reference. And to poll on our r1, we have to pass in a pinned r1 to the poll_next() function. Don't forget to pass the Context to poll_next():

        let pinned = Pin::get_mut(self);
        match Pin::new(&mut pinned.r1).poll_next(cx) {

In tokio 0.2, future will return Poll enums. We should handle these three possibilities here:

Poll::Ready(Some(item)) => {/* meaning we have one item.*/}
Poll::Ready(None) => { /* meaning Stream ends */}
Poll::Pending => { /* meaning Stream is not ready */}

Since our r1 and r2 have the same return type as our Stream, if there's one item exported, we could just return it. So now our Stream becomes:

impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
    type Item = i32;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut end = false;
        let pinned = Pin::get_mut(self);
        match Pin::new(&mut pinned.r1).poll_next(cx) {
            x @ Poll::Ready(Some(_)) => return x,
            Poll::Ready(None) => {
                end = true;
            }
            Poll::Pending => (),
        };

        match Pin::new(&mut pinned.r2).poll_next(cx) {
            x @ Poll::Ready(Some(_)) => return x,
            Poll::Ready(None) => {
                if end {
                    Poll::Ready(None)
                } else {
                    Poll::Pending
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

For tests, there's no much difference. What you need to do is to modify slightly on the use of Runtime. In tokio v0.1.22, Runtime::spawn() doesn't return any Object, so you cannot wait on a specific group of spawned Futures. Now in tokio v0.2, we have JoinHandler - another Future that waits for the spawned Future. With this JoinHandler, now we could use async/await syntax sugar to accomplish something like pthread join on our spawned Futures:

let rt = Runtime::new().unwrap();
let j1 = rt.spawn(async move {/* any code snippet with async/await syntax */});
let j2 = rt.spawn(async move {/* any code snippet with async/await synatx */});
rt.block_on(async move {
    j1.await.expect("first spawn failed");
    j2.await.expect("second spawn failed");
});

After fixing several API changes in unbounded_channel,
The final code in 0.2 looks like:

#![feature(test)]

extern crate test;
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel;

struct Dual<T> {
    r1: T,
    r2: T,
}

impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
    type Item = i32;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut end = false;
        let pinned = Pin::get_mut(self);
        match Pin::new(&mut pinned.r1).poll_next(cx) {
            x @ Poll::Ready(Some(_)) => return x,
            Poll::Ready(None) => {
                end = true;
            }
            Poll::Pending => (),
        };

        match Pin::new(&mut pinned.r2).poll_next(cx) {
            x @ Poll::Ready(Some(_)) => return x,
            Poll::Ready(None) => {
                if end {
                    Poll::Ready(None)
                } else {
                    Poll::Pending
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

#[test]
fn test_dual_poll() {
    let mut rt = Runtime::new().unwrap();
    let (mut tx, rx) = unbounded_channel();
    let (mut txx, rxx) = unbounded_channel();
    let j1 = rt.spawn(async move {
        for i in 0..100000_i32 {
            tx.send(i).unwrap();
        }
    });

    let j2 = rt.spawn(async move {
        for i in 100000..200000_i32 {
            txx.send(i).unwrap();
        }
    });

    let dual = Dual { r1: rxx, r2: rx };
    let j3 = rt.spawn(async move {
        let v: Vec<i32> = dual.take(200000).collect().await;
        assert_eq!(v.len(), 200000);
    });
    rt.block_on(async move {
        j1.await.unwrap();
        j2.await.unwrap();
        j3.await.unwrap();
    });
}

For Sink

In v0.1.22, we only need to define start_send() and poll_complete(). These two maps to start_send() and poll_ready() in v0.2. Other than these, we have to also define poll_flush() - maps to the provided method flush() in v0.1.22, and poll_close() - maps to privided method close in v0.1.22. Details for these APIs are quite similar to old ones.

enter() in v0.2

If you met any issue with the error message of "thread 'main' panicked at 'no current reactor'", then it's time to use the enter() function provided by Runtime and Handle. Let's first look at the implementation of Handle in tokio:

    /// Enter the runtime context
    pub fn enter<F, R>(&self, f: F) -> R
    where
        F: FnOnce() -> R,
    {
        self.blocking_spawner.enter(|| {
            let _io = io::set_default(&self.io_handle);

            time::with_default(&self.time_handle, &self.clock, || self.spawner.enter(f))
        })
    }

We could see before our function been called, the handler will be assigned (or reactor, in v0.1.22) to _io. Before _io been dropped, we'll have this Handle as the default reactor. Any function that relies on PollEvented and called outside of spawn() and block__on() should be called through this function.

For example, if we want to create a UdpSocket from std::net::UdpSocket, we will have:

let udp = rt.enter(move || {UdpSocket::from_std(socket).unwrap()});
  1. Interval in 0.2 now doesn't have a new method. You have to use interval or interval_at.

  2. All tokio 0.1 timer items have been moved to time in 0.2. 2

  3. Delay in 0.2 now doesn't have a new method. You have to use delay_until and delay_for

6
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?