Tokio Changes Cheatsheet
Tokio 0.1.22 | Tokio 0.2 | |
---|---|---|
Poll Type | Result<Async<Item>, Error> or Poll<Item, Error> |
Poll<Output> |
Future Required Method | fn poll(&mut self) -> Poll<Item, Error> | fn poll(self: Pin<&mut Self>, cx: &mut [Context]) -> [Poll]\ |
Dependencies | futures v0.1.18~23 bytes v0.4.7 mio v0.6.14 |
futures 0.3.0 bytes v0.5.0 mio v0.6.20 |
Stream Required Method | fn poll(&mut self) -> Poll<Option<Item>, Error> | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> |
Sink Required Methods |
fn start_send(&mut self, item: SinkItem) -> Result<[AsyncSink], SinkError> ***fn [poll_complete]***(&mut self) -> Result<[Async]<()>, SinkError> |
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Error> fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> fn poll_close(self: Pin<&mut Self>, cx: &mut [Context]) -> [Poll]\> |
Interval Path12 | tokio::timer ::Interval |
tokio::time::Interval |
Delay Path32 | tokio::timer ::Delay |
tokio::time::Delay |
Execution Unit | tokio::prelude::task::Task | std::task::Waker |
Runtime Changes | - replace reactor() with handle() - remove executor() and TaskExecutor - remove shutdown_on_idle() - spawn() now returns JoinHandle future |
Migrate in Action
Let's start from an example:
#![feature(test)]
extern crate test;
use futures::future;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel;
struct Dual<T> {
r1: T,
r2: T,
}
impl<E, T: Stream<Item = i32, Error = E>> Stream for Dual<T> {
type Item = i32;
type Error = E;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut end = false;
// let us poll both in naiive ways:
match self.r1.poll()? {
Async::Ready(Some(e)) => return Ok(Async::Ready(Some(e))),
Async::Ready(None) => {
end = true;
}
Async::NotReady => {}
};
match self.r2.poll()? {
Async::Ready(Some(e)) => Ok(Async::Ready(Some(e))),
Async::Ready(None) => {
if end {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[test]
fn test_dual_poll() {
let mut rt = Runtime::new().unwrap();
let (mut tx, rx) = unbounded_channel();
let (mut txx, rxx) = unbounded_channel();
rt.spawn(future::lazy(move || {
for i in 0..100000_i32 {
tx.try_send(i).unwrap();
}
Ok(())
}));
rt.spawn(future::lazy(move || {
for i in 100000..200000_i32 {
txx.try_send(i).unwrap();
}
Ok(())
}));
let dual = Dual { r1: rxx, r2: rx };
let task = dual.take(200000).collect().map_err(|_| ()).and_then(|x| {
assert_eq!(x.len(), 200000);
Ok(())
});
rt.spawn(task);
rt.shutdown_on_idle().wait().unwrap();
}
Let's start from rewriting old Stream to a newer one. First, we have to change from poll()
into poll_next()
.
Notice that Error
s won't be exported anymore from the Stream unless you define type Item
as Result
. Normally, if we want to make it compatible with our old Stream, we'll give the new Item
with the same type as our old Item
.
So our code becomes:
impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
type Item = i32;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
To make use of the Pinned self, we could call Pin::get_mut()
to get the mutable reference. And to poll on our r1
, we have to pass in a pinned r1
to the poll_next()
function. Don't forget to pass the Context
to poll_next()
:
let pinned = Pin::get_mut(self);
match Pin::new(&mut pinned.r1).poll_next(cx) {
In tokio 0.2, future will return Poll
enums. We should handle these three possibilities here:
Poll::Ready(Some(item)) => {/* meaning we have one item.*/}
Poll::Ready(None) => { /* meaning Stream ends */}
Poll::Pending => { /* meaning Stream is not ready */}
Since our r1
and r2
have the same return type as our Stream
, if there's one item exported, we could just return it. So now our Stream
becomes:
impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
type Item = i32;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut end = false;
let pinned = Pin::get_mut(self);
match Pin::new(&mut pinned.r1).poll_next(cx) {
x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {
end = true;
}
Poll::Pending => (),
};
match Pin::new(&mut pinned.r2).poll_next(cx) {
x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {
if end {
Poll::Ready(None)
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
For tests, there's no much difference. What you need to do is to modify slightly on the use of Runtime
. In tokio v0.1.22, Runtime::spawn()
doesn't return any Object, so you cannot wait on a specific group of spawned Futures
. Now in tokio v0.2, we have JoinHandler
- another Future
that waits for the spawned Future
. With this JoinHandler
, now we could use async/await
syntax sugar to accomplish something like pthread join on our spawned Future
s:
let rt = Runtime::new().unwrap();
let j1 = rt.spawn(async move {/* any code snippet with async/await syntax */});
let j2 = rt.spawn(async move {/* any code snippet with async/await synatx */});
rt.block_on(async move {
j1.await.expect("first spawn failed");
j2.await.expect("second spawn failed");
});
After fixing several API changes in unbounded_channel
,
The final code in 0.2 looks like:
#![feature(test)]
extern crate test;
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime::Runtime;
use tokio::sync::mpsc::unbounded_channel;
struct Dual<T> {
r1: T,
r2: T,
}
impl<T: std::marker::Unpin + Stream<Item = i32>> Stream for Dual<T> {
type Item = i32;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut end = false;
let pinned = Pin::get_mut(self);
match Pin::new(&mut pinned.r1).poll_next(cx) {
x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {
end = true;
}
Poll::Pending => (),
};
match Pin::new(&mut pinned.r2).poll_next(cx) {
x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {
if end {
Poll::Ready(None)
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
#[test]
fn test_dual_poll() {
let mut rt = Runtime::new().unwrap();
let (mut tx, rx) = unbounded_channel();
let (mut txx, rxx) = unbounded_channel();
let j1 = rt.spawn(async move {
for i in 0..100000_i32 {
tx.send(i).unwrap();
}
});
let j2 = rt.spawn(async move {
for i in 100000..200000_i32 {
txx.send(i).unwrap();
}
});
let dual = Dual { r1: rxx, r2: rx };
let j3 = rt.spawn(async move {
let v: Vec<i32> = dual.take(200000).collect().await;
assert_eq!(v.len(), 200000);
});
rt.block_on(async move {
j1.await.unwrap();
j2.await.unwrap();
j3.await.unwrap();
});
}
For Sink
In v0.1.22, we only need to define start_send()
and poll_complete()
. These two maps to start_send()
and poll_ready()
in v0.2. Other than these, we have to also define poll_flush()
- maps to the provided method flush()
in v0.1.22, and poll_close()
- maps to privided method close
in v0.1.22. Details for these APIs are quite similar to old ones.
enter()
in v0.2
If you met any issue with the error message of "thread 'main' panicked at 'no current reactor'", then it's time to use the enter() function provided by Runtime
and Handle
. Let's first look at the implementation of Handle in tokio:
/// Enter the runtime context
pub fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
self.blocking_spawner.enter(|| {
let _io = io::set_default(&self.io_handle);
time::with_default(&self.time_handle, &self.clock, || self.spawner.enter(f))
})
}
We could see before our function been called, the handler will be assigned (or reactor, in v0.1.22) to _io. Before _io been dropped, we'll have this Handle
as the default reactor. Any function that relies on PollEvented and called outside of spawn()
and block__on()
should be called through this function.
For example, if we want to create a UdpSocket from std::net::UdpSocket, we will have:
let udp = rt.enter(move || {UdpSocket::from_std(socket).unwrap()});
-
Interval in 0.2 now doesn't have a new method. You have to use interval or interval_at. ↩
-
All tokio 0.1
timer
items have been moved totime
in 0.2. ↩ ↩2 -
Delay in 0.2 now doesn't have a new method. You have to use delay_until and delay_for ↩