TL;DR
- reduce the cost of
poll()
- ignore unnecessary
poll()
- channel works nice
- guess the state of a busy stream
- rewrite underlayer if you want to squeeze more performance
Introduction
As many people knows, tokio is a runtime framework that provides non-blocking I/O, event driven features for developers to write their own asynchronous applications. We'll introduce some techniques and tricks on optimizing tokio v0.1.22. In order to understand this paragraph, we assume the readers to have development experiences using Tokio framework.
About New Tokio
Recently, Tokio released it's 0.2 stable version which is compatible with the async/await syntax and the latest futures. Even though the authors sentenced 0.2 to be stable, wait for several months before you put the code to production. Not mentioning the enormous changes from 0.2.0-alpha.6
to 0.2.0-stable
, the APIs might still be modified in the future 0.2 releases. Issues might be discovered and fixed, and manual for the APIs is still not ready. If you dig into the code, you'll see many APIs are marked as hidden from document. This paragraph will not cover much about 0.2 due to all these reasons.
A brief example
Let's start from a simple example:
// a stream that polls from two other sources of streams
impl Stream for Server {
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
let result1 = self.socket1.poll();
let result2 = self.socket2.poll();
// do something on the results
}
}
One use case for this example is to combine multiple streams into one stream. The socket might be a file, a TcpStream, a Timer, or even an I/O device. What's hidden behind the scene is mio doing the task management. When any Stream goes into Async::NotReady
status, it registers the task into queue, and wait for any trigger event to wake up the task. In this example, since we have to poll()
from both socket1
and socket2
, Server::poll()
might be waken up by any of them. When socket2
becomes Ready
and wakes up Server::poll
while socket1
is still in NotReady
, we wasted time in polling socket1
.
Here's the space that we could optimize. If we could:
- Lower the cost of
poll()
onsocket1
andsocket2
- Ignore unnecessary
poll()
then we could save time for other operations.
How to accomplish any of these two points depends on how we make use of the output data from these streams.
1. Using Channel
If we want to avoid unnecessary poll()
, we could pipe streams of events into channel. Channel stream will be wakened if any of the sink gets data. [1]
let (tx, rx) = unbounded_channel();
let sink = tx.clone(); // unbounded_channel or channel
let sink2 = tx.clone(); // unbounded_channel or channel
let task = server.socket1
.map_err(|e| error!("{:?}", e))
.forward(sink.sink_map_err(|e| error!("{:?}", e)).with(|p| Ok((1, p))))
.map(|_| ());
exec.spawn(task); // spawned by TaskExecutor
let task = server.socket1
.map_err(|e| error!("{:?}", e))
.forward(sink.sink_map_err(|e| error!("{:?}", e)).with(|p| Ok(2, p)))
.map(|_| ());
exec.spawn(task); // spawned by TaskExecutor
let task = rx
.map_err(|e| error!("{:?}", e))
.for_each(|(from, data)| {
// do something
})
.map(|_| ());
exec.spawn(task); // consumer spawned by TaskExecutor
// no need to impl Stream for Server
One benefit of using channel is that it provides a buffered space for sudden burst of input. If there's no other concern, channel should work for most cases.
2. Guess State
Create a flag in Server to record the status of socket1
and socket2
. If any of them returns Ready
, keep polling it until it returns NotReady
. When streams are busy, they don't often go into NotReady
status. Polling on the Ready
stream should have higher probability of getting data. This will minimize the cost on polling both streams. The side effect of this method is that the produced data will be sloped on one side if the stream we polled seldom return NotReady
.
impl Stream for Server {
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
let result = {
if self.is_socket1 { // guessing that socket1 is ready
if self.alive_1 { // prevent polling on ended stream
match self.socket1.poll()? {
Async::Ready(Some(e)) => e,
Async::Ready(None) => {
self.alive_1 = false;
// do something
}
Async::NotReady => { // register event for socket1
self.is_socket1 = false;
...
}
}
}
if self.alive_2 { // prevent polling on ended stream
match self.socket2.poll()? { // register event for socket2
Async::Ready(Some(e)) => e,
Async::Ready(None) => {
self.alive_2 = false;
// do something
}
Async::NotReady => return Ok(Async::NotReady)
}
}
} else { // guessing that socket2 is ready
if self.alive_2 { // prevent polling on ended stream
match self.socket2.poll() {
Async::Ready(Some(e)) => e,
Async::Ready(None) => {
self.alive_2 = false;
// do something
}
Async::NotReady => {
self.is_socket1 = true;
...
}
}
}
if self.alive_1 { // prevent polling on ended stream
match self.socket1.poll()? { // register event for socket1
Async::Ready(Some(e)) => e,
Async::Ready(None) => {
self.alive_1 = false;
// do something
}
Async::NotReady => return Ok(Async::NotReady)
}
}
}
// both two streams ended
return Ok(Async::Ready(None))
}
// do something on the results
}
}
3. Use Atomic Flags and AtomicTask
Other than guessing, If we could share a flag between Server
and socket1
/socket2
, we could know which event triggers current Server::poll
. This is only possible if you have control on the underlying mio::Evented
object in general case. However, if we are allowed to spawn socket1
/socket2
in other threads (Refer to our first method) and save output in some container, we could set the flags from the following Wrapper
:
struct Wrapper<T> {
inner: T, // T: `socket1`/`socket2` type
from_me: Arc<AtomicBool>,
// use HashMap as an example. should be better if we have a lock-free container here.
result: Arc<RwLock<HashMap<int32, int64>>,
// task from Server
task: Arc<AtomicTask>,
}
impl<T: Stream> Stream for Wrapper { // Struct for `socket1` and `socket2`
type Item = ();
type Error = <T as Stream>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(Some(e))) => {
let mut w = self.result.write().unwrap();
w.insert(...);
...
self.from_me.store(true, Ordering::Relaxed);
self.task.notify();
Ok(Async::Ready(Some(())))
}
Ok(Async::Ready(None)) => {
... // set some flags to notify the status of this stream
self.task.notify();
Ok(Async::Ready(None))
}
Ok(Async::NotReady) => Ok(Async::NotReady)
Err(e) => {
... // set some flags to notify the status of this stream
Err(e)
}
}
}
}
impl<T> Drop for Wrapper<T> {
fn drop(&mut self) {
// on Error or stream ends, wake up the Server task
self.task.notify();
}
}
// `from_1` and `from_2` are two flags coming from `Wrapper::from_me`
// Server::task and Wrapper::task share the same `AtomicTask`
// Server::result and Wrapper::result share the same HashMap
impl Stream for Server {
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
let mut notready = true;
if self.from_1.swap(false, Ordering::Relaxed) {
notready = false;
// self.result .... do something on self.result
}
if self.from_2.swap(false, Ordering::Relaxed) {
notready = false;
// self.result .... do something on self.result
}
if notready {
self.task.register();
return Ok(Async::NotReady)
}
}
}
If we could modify directly to the implementation of socket1
and socket2
, that should work the best. Basically, method 3 is the enhanced version of method 1. You could gain more performance from this method, but you also need to take the risk of getting deadlocked in race conditions.
Other Optimization
- Replace Mutex with RwLock or Atomic flags + UnsafeCell
- MPMC if you have enough CPUs (May make Stream slower, but in all, reduce the process time)
- Instead of using default Debug trait, define yours and print only when it's necessary
- Upgrade tokio to 0.2 for faster scheduler and faster channels
- Upgrade your old libraries, such as
serde
andbytes
. - Don't use futures' mpsc channels. Use tokio's mpsc channels instead (1.5x~2x slower).
Tokio 0.2
Tokio v0.2 sentenced that they have a great improvement on its scheduling [2]. We did several benchmarks on both to compare. First table is the benchmark result of sending 2,700,000 bytes of u8 using LinesCodec as Decoder to output 100,000 messages in length 26 through TcpFramed:
v0.1.22 | v0.2.4 | |
---|---|---|
TcpFramed | 181ns/iter | 172ns/iter |
While in channel, we send 200,000 i32 numbers:
v0.1.22 | v0.2.4 | |
---|---|---|
Unbounded Channel | 233ns/iter | 202ns/iter |
Bounded Channel(1000) | 384ns/iter | 315ns/iter |
Bounded Channel(1) | 2965ns/iter | 10082ns/iter |
The tricks we introduced in this paragraph are also applicable to v0.2, and should be benefit from the new improvements introduces in v0.2, too. Just need to be aware of those changes in APIs, like Task
becomes Waker
, poll()
in Stream
becomes poll_next()
, and you'll have no control on choosing which Runtime
to run async/await
.
Reference
- tokio-sync chan.rs, https://docs.rs/tokio-sync/0.1.6/src/tokio_sync/mpsc/chan.rs.html#173-188
- Making the Tokio scheduler 10x faster, https://tokio.rs/blog/2019-10-scheduler/