10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

六本木の某FinTech集団Advent Calendar 2019

Day 4

Tokio Performance Optimization

Last updated at Posted at 2019-12-03

TL;DR

  • reduce the cost of poll()
  • ignore unnecessary poll()
  • channel works nice
  • guess the state of a busy stream
  • rewrite underlayer if you want to squeeze more performance

Introduction

As many people knows, tokio is a runtime framework that provides non-blocking I/O, event driven features for developers to write their own asynchronous applications. We'll introduce some techniques and tricks on optimizing tokio v0.1.22. In order to understand this paragraph, we assume the readers to have development experiences using Tokio framework.

About New Tokio

Recently, Tokio released it's 0.2 stable version which is compatible with the async/await syntax and the latest futures. Even though the authors sentenced 0.2 to be stable, wait for several months before you put the code to production. Not mentioning the enormous changes from 0.2.0-alpha.6 to 0.2.0-stable, the APIs might still be modified in the future 0.2 releases. Issues might be discovered and fixed, and manual for the APIs is still not ready. If you dig into the code, you'll see many APIs are marked as hidden from document. This paragraph will not cover much about 0.2 due to all these reasons.

A brief example

Let's start from a simple example:

// a stream that polls from two other sources of streams
impl Stream for Server {
    type Item = Vec<u8>;
    type Error = io::Error;
    fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
        let result1 = self.socket1.poll();
        let result2 = self.socket2.poll();
        // do something on the results
    }
}

One use case for this example is to combine multiple streams into one stream. The socket might be a file, a TcpStream, a Timer, or even an I/O device. What's hidden behind the scene is mio doing the task management. When any Stream goes into Async::NotReady status, it registers the task into queue, and wait for any trigger event to wake up the task. In this example, since we have to poll() from both socket1 and socket2, Server::poll() might be waken up by any of them. When socket2 becomes Ready and wakes up Server::poll while socket1 is still in NotReady, we wasted time in polling socket1.

Here's the space that we could optimize. If we could:

  1. Lower the cost of poll() on socket1 and socket2
  2. Ignore unnecessary poll()

then we could save time for other operations.
How to accomplish any of these two points depends on how we make use of the output data from these streams.

1. Using Channel

If we want to avoid unnecessary poll(), we could pipe streams of events into channel. Channel stream will be wakened if any of the sink gets data. [1]

let (tx, rx) = unbounded_channel();
let sink = tx.clone(); // unbounded_channel or channel
let sink2 = tx.clone(); // unbounded_channel or channel
let task = server.socket1
    .map_err(|e| error!("{:?}", e))
    .forward(sink.sink_map_err(|e| error!("{:?}", e)).with(|p| Ok((1, p))))
    .map(|_| ());
exec.spawn(task); // spawned by TaskExecutor
let task = server.socket1
    .map_err(|e| error!("{:?}", e))
    .forward(sink.sink_map_err(|e| error!("{:?}", e)).with(|p| Ok(2, p)))
    .map(|_| ());
exec.spawn(task); // spawned by TaskExecutor
let task = rx
    .map_err(|e| error!("{:?}", e))
    .for_each(|(from, data)| {
        // do something
    })
    .map(|_| ());
exec.spawn(task); // consumer spawned by TaskExecutor

// no need to impl Stream for Server

One benefit of using channel is that it provides a buffered space for sudden burst of input. If there's no other concern, channel should work for most cases.


2. Guess State

Create a flag in Server to record the status of socket1 and socket2. If any of them returns Ready, keep polling it until it returns NotReady. When streams are busy, they don't often go into NotReady status. Polling on the Ready stream should have higher probability of getting data. This will minimize the cost on polling both streams. The side effect of this method is that the produced data will be sloped on one side if the stream we polled seldom return NotReady.

impl Stream for Server {
    type Item = Vec<u8>;
    type Error = io::Error;
    fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
        let result = {
            if self.is_socket1 { // guessing that socket1 is ready
                if self.alive_1 { // prevent polling on ended stream
                    match self.socket1.poll()? {
                        Async::Ready(Some(e)) => e,
                        Async::Ready(None) => {
                            self.alive_1 = false;
                            // do something
                        }
                        Async::NotReady => { // register event for socket1
                            self.is_socket1 = false;
                            ...
                        }
                    }
                }
                if self.alive_2 { // prevent polling on ended stream
                    match self.socket2.poll()? { // register event for socket2
                        Async::Ready(Some(e)) => e,
                        Async::Ready(None) => {
                            self.alive_2 = false;
                            // do something
                        }        
                        Async::NotReady => return Ok(Async::NotReady)
                    }
                }
            } else { // guessing that socket2 is ready
                if self.alive_2 { // prevent polling on ended stream
                    match self.socket2.poll() {
                        Async::Ready(Some(e)) => e,
                        Async::Ready(None) => {
                            self.alive_2 = false;
                            // do something
                        }
                        Async::NotReady => {
                            self.is_socket1 = true;
                            ...
                        }
                    }
                }
                if self.alive_1 { // prevent polling on ended stream
                    match self.socket1.poll()? { // register event for socket1
                        Async::Ready(Some(e)) => e,
                        Async::Ready(None) => {
                            self.alive_1 = false;
                            // do something
                        }
                        Async::NotReady => return Ok(Async::NotReady)
                    }
                }
            }
            // both two streams ended
            return Ok(Async::Ready(None))
        }
        // do something on the results
    }
}

3. Use Atomic Flags and AtomicTask

Other than guessing, If we could share a flag between Server and socket1/socket2, we could know which event triggers current Server::poll. This is only possible if you have control on the underlying mio::Evented object in general case. However, if we are allowed to spawn socket1/socket2 in other threads (Refer to our first method) and save output in some container, we could set the flags from the following Wrapper:

struct Wrapper<T> {
    inner: T, // T: `socket1`/`socket2` type
    from_me: Arc<AtomicBool>,
    // use HashMap as an example. should be better if we have a lock-free container here.
    result: Arc<RwLock<HashMap<int32, int64>>,
    // task from Server
    task: Arc<AtomicTask>,
}
impl<T: Stream> Stream for Wrapper { // Struct for `socket1` and `socket2`
    type Item = ();
    type Error = <T as Stream>::Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.inner.poll() {
            Ok(Async::Ready(Some(e))) => {
                let mut w = self.result.write().unwrap();
                w.insert(...);
                ...
                self.from_me.store(true, Ordering::Relaxed);
                self.task.notify();
                Ok(Async::Ready(Some(())))
            }
            Ok(Async::Ready(None)) => {
                ... // set some flags to notify the status of this stream
                self.task.notify();
                Ok(Async::Ready(None))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady)
            Err(e) => {
                ... // set some flags to notify the status of this stream
                Err(e)
            }
        }
    }
}
impl<T> Drop for Wrapper<T> {
    fn drop(&mut self) {
        // on Error or stream ends, wake up the Server task
        self.task.notify();
    }
}

// `from_1` and `from_2` are two flags coming from `Wrapper::from_me`
// Server::task and Wrapper::task share the same `AtomicTask`
// Server::result and Wrapper::result share the same HashMap
impl Stream for Server {
    type Item = Vec<u8>;
    type Error = io::Error;
    fn poll(&mut self) -> Poll<Option<<Self as Stream>::Item>, <Self as Stream>::Error> {
        let mut notready = true;
        if self.from_1.swap(false, Ordering::Relaxed) {
            notready = false;
            // self.result .... do something on self.result
        }
        if self.from_2.swap(false, Ordering::Relaxed) {
            notready = false;
            // self.result .... do something on self.result
        }
        if notready {
            self.task.register();
            return Ok(Async::NotReady)
        }
    }
}

If we could modify directly to the implementation of socket1 and socket2, that should work the best. Basically, method 3 is the enhanced version of method 1. You could gain more performance from this method, but you also need to take the risk of getting deadlocked in race conditions.


Other Optimization

  • Replace Mutex with RwLock or Atomic flags + UnsafeCell
  • MPMC if you have enough CPUs (May make Stream slower, but in all, reduce the process time)
  • Instead of using default Debug trait, define yours and print only when it's necessary
  • Upgrade tokio to 0.2 for faster scheduler and faster channels
  • Upgrade your old libraries, such as serde and bytes.
  • Don't use futures' mpsc channels. Use tokio's mpsc channels instead (1.5x~2x slower).

Tokio 0.2

Tokio v0.2 sentenced that they have a great improvement on its scheduling [2]. We did several benchmarks on both to compare. First table is the benchmark result of sending 2,700,000 bytes of u8 using LinesCodec as Decoder to output 100,000 messages in length 26 through TcpFramed:

v0.1.22 v0.2.4
TcpFramed 181ns/iter 172ns/iter

While in channel, we send 200,000 i32 numbers:

v0.1.22 v0.2.4
Unbounded Channel 233ns/iter 202ns/iter
Bounded Channel(1000) 384ns/iter 315ns/iter
Bounded Channel(1) 2965ns/iter 10082ns/iter

The tricks we introduced in this paragraph are also applicable to v0.2, and should be benefit from the new improvements introduces in v0.2, too. Just need to be aware of those changes in APIs, like Task becomes Waker, poll() in Stream becomes poll_next(), and you'll have no control on choosing which Runtime to run async/await.

Reference

  1. tokio-sync chan.rs, https://docs.rs/tokio-sync/0.1.6/src/tokio_sync/mpsc/chan.rs.html#173-188
  2. Making the Tokio scheduler 10x faster, https://tokio.rs/blog/2019-10-scheduler/
10
5
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?