Let's assume we have an application that's built from a series of async
tasks that talk to each other through unbounded_channel
s. By counting each write/read
access, e.g. send()/recv()
, valuable insights into performance bottlenecks and regressions as well as overall health of the application can be had.
Gains to be had
- Find bottlenecks: The ratio of
R = writes/reads
gives an approximation of the relative throughput of the tasks. IfR > 1
then the task writing to the channel is roughlyR
times faster than the reading task. IfR < 1
then you are not counting correctly and should fix your bug. - Find and locate performance regressions: If we use this technique during development and keep an eye on the numbers then we'll not only spot regressions right away but also know which tasks are responsible.
- Find potential health issues: Because
unbounded_channel
s don't exert back-pressure they can delay issues from manifesting themselves. By keeping an eye on the message back-log in our channels we can catch those problems early.
The crate metrics
is perfectly suited to do what we need. It can count!
and with metrics-exporter-prometheus
offers a very convenient way of querying all counters. On top of that we could hook it up to Grafana
for some very nice visual representation of the data.
Wrap your channels
For convenience we are going to write a wrapper that will take care of the book keeping. All we have to do is give it a name for the channel so that when we query the application we know which numbers belong to which channel/task etc.
use metrics::counter;
use tokio::sync::mpsc::{
self,
error::SendError,
};
pub fn unbounded_channel<T>(id: String) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = mpsc::unbounded_channel::<T>();
(
UnboundedSender::new(tx, id.clone()),
UnboundedReceiver::new(rx, id),
)
}
The UnboundedSender<T>
and UnboundedReceiver<T>
structs just have an additional field for the id
s. When the channel is created the read and write counters are initialized to zero. The reason we initialize all counters is that if a counter is never written to it won't show up in the query and we don't want to deal with entries that may or may not exist.
#[derive(Clone)]
pub struct UnboundedSender<T> {
tx: mpsc::UnboundedSender<T>,
id: String,
}
impl<T> UnboundedSender<T> {
fn new(tx: mpsc::UnboundedSender<T>, id: String) -> Self {
counter!(id.clone(), 0, "side" => "tx");
Self { tx, id }
}
}
pub struct UnboundedReceiver<T> {
rx: mpsc::UnboundedReceiver<T>,
id: String,
}
impl<T> UnboundedReceiver<T> {
fn new(rx: mpsc::UnboundedReceiver<T>, id: String) -> Self {
counter!(id.clone(), 0, "side" => "rx");
UnboundedReceiver { rx, id }
}
}
Access as usual
The send()
and recv()
functions' signature doesn't change. Whenever we successfully send()
or recv()
a message we increase the corresponding counter by 1.
impl<T> UnboundedSender<T> {
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
let res = self.tx.send(msg);
counter!(self.id.clone(), 1, "side" => "tx");
res
}
}
impl<T> UnboundedReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
let res = self.rx.recv().await;
counter!(self.id.clone(), 1, "side" => "rx");
res
}
}
#For example
Let's assume we have an application with three tasks: udp
, updates
, and converter
. To stress-test it we throw some data at it at a rate higher than the application's max throughput. If we use metrics-exporter-prometheus
then this is what it would look like when we query our application:
rustymcchickenface@uat:~$ curl 127.0.0.1:8180
# metrics snapshot (ts=1607472059) (prometheus exposition format)
# TYPE Group 2: udp counter
Group 2: udp{side="rx"} 1000028
Group 2: udp{side="tx"} 1000028
# TYPE Group 2: updates counter
Group 2: updates{side="rx"} 954114
Group 2: updates{side="tx"} 1000028
# TYPE Group 2: converter counter
Group 2: converter{side="tx"} 954325
Group 2: converter{side="rx"} 86379
udp
seems to have no issues at all and the ~5% difference in updates
is also nothing to worry about at this point. If performance requirements were not met then converter
would be the place to start as it clearly can not keep up with the preceding tasks.
What's next
- Get rid of those
.clone()
s. - Use
Grafana
for a nice visual representation of all the counters.
TL;DR
Keeping track of the number of read and write accesses to unbounded_channel
s can be a neat way of detecting performance issues and regressions of your application during development and health issues later in production.
For this purpose, we are going to use metrics
to count each send/receive access to/from the channel.