LoginSignup
6
1

More than 3 years have passed since last update.

Monitor tokio's unbounded_channels with metrics to make serious gains

Last updated at Posted at 2020-12-10

Let's assume we have an application that's built from a series of async tasks that talk to each other through unbounded_channels. By counting each write/read access, e.g. send()/recv(), valuable insights into performance bottlenecks and regressions as well as overall health of the application can be had.

Gains to be had

  • Find bottlenecks: The ratio of R = writes/reads gives an approximation of the relative throughput of the tasks. If R > 1 then the task writing to the channel is roughly R times faster than the reading task. If R < 1then you are not counting correctly and should fix your bug.
  • Find and locate performance regressions: If we use this technique during development and keep an eye on the numbers then we'll not only spot regressions right away but also know which tasks are responsible.
  • Find potential health issues: Because unbounded_channels don't exert back-pressure they can delay issues from manifesting themselves. By keeping an eye on the message back-log in our channels we can catch those problems early.

The crate metrics is perfectly suited to do what we need. It can count! and with metrics-exporter-prometheus offers a very convenient way of querying all counters. On top of that we could hook it up to Grafana for some very nice visual representation of the data.

Wrap your channels

For convenience we are going to write a wrapper that will take care of the book keeping. All we have to do is give it a name for the channel so that when we query the application we know which numbers belong to which channel/task etc.

use metrics::counter;
use tokio::sync::mpsc::{
    self,
    error::SendError,
};

pub fn unbounded_channel<T>(id: String) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
    let (tx, rx) = mpsc::unbounded_channel::<T>();
    (
        UnboundedSender::new(tx, id.clone()),
        UnboundedReceiver::new(rx, id),
    )
}

The UnboundedSender<T> and UnboundedReceiver<T> structs just have an additional field for the ids. When the channel is created the read and write counters are initialized to zero. The reason we initialize all counters is that if a counter is never written to it won't show up in the query and we don't want to deal with entries that may or may not exist.

#[derive(Clone)]
pub struct UnboundedSender<T> {
    tx: mpsc::UnboundedSender<T>,
    id: String,
}

impl<T> UnboundedSender<T> {
    fn new(tx: mpsc::UnboundedSender<T>, id: String) -> Self {
        counter!(id.clone(), 0, "side" => "tx");
        Self { tx, id }
    }
}

pub struct UnboundedReceiver<T> {
    rx: mpsc::UnboundedReceiver<T>,
    id: String,
}

impl<T> UnboundedReceiver<T> {
    fn new(rx: mpsc::UnboundedReceiver<T>, id: String) -> Self {
        counter!(id.clone(), 0, "side" => "rx");
        UnboundedReceiver { rx, id }
    }
}

Access as usual

The send() and recv() functions' signature doesn't change. Whenever we successfully send() or recv() a message we increase the corresponding counter by 1.

impl<T> UnboundedSender<T> {
    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
        let res = self.tx.send(msg);
        counter!(self.id.clone(), 1, "side" => "tx");
        res
    }
}

impl<T> UnboundedReceiver<T> {
    pub async fn recv(&mut self) -> Option<T> {
        let res = self.rx.recv().await;
        counter!(self.id.clone(), 1, "side" => "rx");
        res
    }
}

For example

Let's assume we have an application with three tasks: udp, updates, and converter. To stress-test it we throw some data at it at a rate higher than the application's max throughput. If we use metrics-exporter-prometheus then this is what it would look like when we query our application:

rustymcchickenface@uat:~$ curl 127.0.0.1:8180
# metrics snapshot (ts=1607472059) (prometheus exposition format)
# TYPE Group 2: udp counter
Group 2: udp{side="rx"} 1000028
Group 2: udp{side="tx"} 1000028

# TYPE Group 2: updates counter
Group 2: updates{side="rx"} 954114
Group 2: updates{side="tx"} 1000028

# TYPE Group 2: converter counter
Group 2: converter{side="tx"} 954325
Group 2: converter{side="rx"} 86379

udp seems to have no issues at all and the ~5% difference in updates is also nothing to worry about at this point. If performance requirements were not met then converter would be the place to start as it clearly can not keep up with the preceding tasks.

What's next

  • Get rid of those .clone()s.
  • Use Grafana for a nice visual representation of all the counters.

TL;DR

Keeping track of the number of read and write accesses to unbounded_channels can be a neat way of detecting performance issues and regressions of your application during development and health issues later in production.
For this purpose, we are going to use metrics to count each send/receive access to/from the channel.

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1