RTMP実装でnetworkトラフィックコントロールする必要があって、一般的にトークンバケットで実装が多いので、そのアルゴリズムをswiftで実装しました。
似たようleaky bucketもありますが、これはトラフィックを一定するようなアルゴリズムで、rtmpなどの動画音声送信だとkey frameなどででかいデータ一時トラフィックあがることがあるので、トークンバケットが適しています。
トークンバケット
トークンバケット(英: token bucket)とは、ネットワークへのデータ流入量を制御するアルゴリズムの一種であり、バースト性のあるデータ送信を許容する。いくつかの用途があるがトラフィックシェーピングの手法として使うことが多い。
コンポーネント
バケット:トークンを保存する容器。最大 b 個のトークンを保持できます。
トークン:一定レート r でバケットに追加される仮想的な単位。
パケット(PDU):ネットワークを通過するデータの単位。
アルゴリズム
- 1/r 秒ごとに1トークンがバケットに追加されます。
- バケットが満杯の場合、新しく追加されたトークンは捨てられます。
- ネットワークにデータを送信する際、nバイトのデータ(PDU)がn個のトークンと交換されます。
- 必要なトークン数が不足している場合、そのデータは送信されず、特定の方法で処理されます(例:捨てる、送信delayなど)。
swift code
TokenBucket
actor TokenBucket {
private let defaultRate = 2500000 // Default refill rate in tokens per second
private var tokens: Int // Current number of tokens in the bucket
private var lastRefillTime: TimeInterval // Last time the bucket was refilled
private var refillRate: Int // Rate at which tokens are refilled
private var capacity: Int // Maximum capacity of the bucket
// Initialization, set the refill rate and capacity, and fill the bucket
init() {
self.refillRate = defaultRate
self.capacity = defaultRate
self.tokens = defaultRate
self.lastRefillTime = Date().timeIntervalSince1970
}
// Update the rate and capacity of the bucket
func update(rate: Int, capacity: Int) {
self.refillRate = rate
self.capacity = capacity
self.tokens = capacity
}
// Refill tokens in the bucket
func refill() {
let currentTime = Date().timeIntervalSince1970
let elapsedTime = Int((currentTime - lastRefillTime) * 1000)
let tokensToAdd = elapsedTime * refillRate / 1000 // Calculate tokens to add based on elapsed time and refill rate
tokens = min(tokens + tokensToAdd, capacity) // Add tokens but do not exceed maximum capacity
lastRefillTime = currentTime // Update the last refill time
}
// Consume tokens and return whether the operation was successful
func consume(tokensNeeded: Int) -> Bool {
refill() // First, refill the bucket
if tokens >= tokensNeeded { // If there are enough tokens
tokens -= tokensNeeded // Consume the tokens
return true // Return success
}
return false // Not enough tokens, return failure
}
}
データ送信側
let data = /* 送信するデータ */
// tokenが足りてればそのまま送信
if await tokenBucket.consume(tokensNeeded: data.count) {
do {
try await sendData(data)
} catch {
logger.error("send data failed. error: \(error)")
}
} else {
logger.info("token bucket is empty, waiting...")
// token足りないので、10ms待つ
try? await Task.sleep(nanoseconds: UInt64(10 * 1000 * 1000))
}
参考