Leaky Bucketアルゴリズム(下記Rustクレート)に遭遇したので、その動作確認メモです。
Leaky Bucketとは
リーキーバケットはネットワークに送信するトラフィックに対して、その転送レートを制御する。名前の通り、穴の開いたバケツに相当し、様々な流量の水流がそのバケツに流れ込むと、そこまでの流量がどうであっても小さな穴からは一定の水流が流れ出す。同様にリーキーバケット・アルゴリズムは様々なバースト性のトラフィックを一定のトラフィックに変換する機構を提供する。
リーキーバケット - Wikipedia
主にレート制限の手法として活用されているようです。
つまりサーバー都合でリクエスト回数に制限を設けることにより、サービスやトラフィックに制限を加えます。
画像引用元:How we built rate limiting capable of scaling to millions of domains
なお、レート制限の文脈では下記アルゴリズムも紹介されていることが多いみたいです。
いずれも制限の対象を工夫することで、要求に合わせて最適化している印象でした。
- Token Bucket
- Fixed Window
- Sliding Window
参考:
レート制限アルゴリズム - システム設計 - GeeksforGeeks
様々なrate limitアルゴリズム - Carpe Diem
QoS(9)帯域制御の制御方法 トークンバケット/リーキーバケット/万能リーキーバケットモデルとは – Ethernet TSN がIoTを変える
leaky_bucket crateについて
Leaky BucketのRust実装であるleaky_bucket1の動作を確認します。
このクレートはtokioの機能に依存しているのでtokioランタイム上で使用する必要があるようです。
動作確認
やや冗長ですが、axumサーバーで簡単に動作確認しました。
RateLimiter
の設定項目がいくつかあり、単純な構成でレート制限する上ではこの設定項目でおおよそ十分な印象を勝手に受けました。
設定項目について詳しくはドキュメントを参照ください。
確認内容
- バケツに空きがある(発行したtokenが残っている)間はリクエストを受け付けます
- バケツが満杯になる(発行したtokenを全て消費する)とリクエストを拒否します2
- tokenは毎秒1つ補充されるので、バケツが満杯になった(発行したtokenを全て消費した)状態から時間が経てば再度リクエストを受け付けます
use std::{sync::Arc, time::Duration};
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, post},
Router,
};
use leaky_bucket::RateLimiter;
use tracing::{debug, level_filters::LevelFilter};
struct AppState {
tokens: Arc<RateLimiter>,
}
impl AppState {
pub fn new() -> AppState {
AppState {
tokens: Arc::new(
RateLimiter::builder()
.initial(5)
.interval(Duration::from_secs(1))
.refill(1)
.max(5)
.build(),
),
}
}
}
#[tokio::main]
async fn main() {
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy(),
)
.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.with_level(true)
.compact(),
)
.init();
let state = Arc::new(AppState::new());
let app = Router::new()
.route("/bucket", post(use_bucket))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn use_bucket(State(state): State<Arc<AppState>>) -> Response {
debug!("API called.");
if !state.tokens.try_acquire(1) {
return (StatusCode::TOO_MANY_REQUESTS, "No token left.\n").into_response();
}
(StatusCode::OK, "Token used.\n").into_response()
}
リクエスト例
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
sleep 2
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
curl -X POST http://localhost:3000/bucket
レスポンス出力結果
Token used.
Token used.
Token used.
Token used.
Token used.
No token left.
Token used.
Token used.
No token left.
ログ出力結果
2024-12-25T17:14:24.064404Z DEBUG API called.
2024-12-25T17:14:24.069417Z DEBUG API called.
2024-12-25T17:14:24.075361Z DEBUG API called.
2024-12-25T17:14:24.081062Z DEBUG API called.
2024-12-25T17:14:24.088231Z DEBUG API called.
2024-12-25T17:14:24.094721Z DEBUG API called.
2024-12-25T17:14:26.101068Z DEBUG API called.
2024-12-25T17:14:26.105920Z DEBUG API called.
2024-12-25T17:14:26.111101Z DEBUG API called.
バージョン情報
tokio ={ version = "1.42.0" , features = ["full"]}
leaky-bucket = "1.1.2"
axum = "0.7.9"
tracing = "0.1.41"
tracing-subscriber = {version = "0.3.19", features = ["env-filter"]}