動機
- 何時・どの程度の処理リクエストが飛んでくるか分からないというケースでGoを採用したとして、「裏側のインフラ・ミドルに掛かる負荷をアプリレベルでもそれなりに考慮する必要がある」という機能要件が上がって来た場合、Goだとどう書くのがstandardなんだろう?
- Web上で事例を探してみたところ、rate limit, worker pool, etcのサンプルを見かけるんだけど、情報が分散気味
- 「負荷やスループットを意識した書き方」をパターン/スニペット風にまとめておけば後で役に立つかも
という話。を調べてみた、という話。
※もっと良い書き方あるだろ!とか、他にもこんなケースがあるだろ!とか間違いなくあるはずなので、都度より良く改善していきたい。
使用したソース
- https://github.com/goldeneggg/go_limit_example
- 検証用のインタフェース
LimitExector
に「どんな制限を掛けるか?」という用途別にメソッドを定義
type LimitExector interface {
// 同時処理数に上限を設ける
// 上限を超えるリクエストは処理しない
ByMax(req ReqEntity)
// 1秒間の同時処理数に上限を設ける
// 上限を超えるリクエストは1秒待つ
ByMaxPerSecond(req ReqEntity)
// 処理に実行間隔を設ける => x秒間隔に1回処理を行う
ByTickerDuration(req ReqEntity)
// 処理に実行間隔を設ける => x秒間隔に1回処理を行う
// 瞬間的な高負荷(=burst)への対応として、先頭200リクエストまでは一気に処理する事を許可する
ByTickerDurationWithBurst(req ReqEntity)
}
-
LimitExector
インタフェースを実装するlimit
structと、limit
を生成するために使用するLimitParam
struct
// どんな制限を掛けるか?を指定するパラメータ用struct
type LimitParam struct {
BurstLimit int
DurationNanoSec int
}
// 制限を掛ける為のオブジェクト(ここではチャネル)を保持するstruct
// このstructにLimitExectorインタフェースのメソッドを実装する
type limit struct {
capacity chan time.Time // 最大同時処理数をコントロールするチャネル
ticker <-chan time.Time // 処理間隔をコントロールするチャネル
}
// LimitExectorのインスタンスを取得する
func NewLimitExector(limitParam LimitParam) LimitExector {
lmt := limit{}
if limitParam.BurstLimit > 0 {
lmt.capacity = make(chan time.Time, limitParam.BurstLimit)
for i := 0; i < limitParam.BurstLimit; i++ {
lmt.capacity <- time.Now()
}
}
if limitParam.DurationNanoSec > 0 {
lmt.ticker = time.Tick(time.Nanosecond * time.Duration(limitParam.DurationNanoSec))
}
return lmt
}
- デモ用のリクエスト群を生成して、forループで
LimitExector
の各メソッドに投げる
for req := range <リクエスト群> {
limitExector.ByXXXX(req)
}
検証ケース
ケース1:最大同時処理数を制御する
func (lmt limit) ByMax(req ReqEntity) {
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
// 処理量オーバーの場合
// 実際の例では sorryサーバ的なトコに飛ばす とかありそう
fmt.Println("Capacity Over ByMax", req.Id)
}
}
-
lmt.capacity
は 最大同時処理数と同じサイズのチャネル (事前に生成・値を送信済) -
select case
文でこのチャネルを受信するcaseを書く- 受信出来る要素があれば
go lmt.kickService(req)
が動く
- 受信出来る要素があれば
- 上記のコード例だと、
lmt.capacity
チャネルがサイズ=100で生成・値送信済の場合、最初の100個は一気に処理するが、この時点でチャネルから受信できる要素が無くなり、以降のリクエストが来てもdefault
節に処理が飛ぶ-
default
ブロックを書かなかった場合は、lmt.capacity
から要素を受信可能になるまでcase
部分で待つ挙動になる
-
-
go lmt.kickService(req)
は実際に起動したいゴルーチン(のダミー関数)- この関数内で 処理が完了したらチャネルに値を再送信する(再補充する)
- 再送信後は
case <-lmt.capacity
節の受信が再び動くようになる
func (lmt limit) kickService(req ReqEntity) {
if lmt.capacity != nil {
// 完了したらcapacityを再補充する
defer func() {
lmt.capacity <- time.Now()
fmt.Println("*refilled capacity")
}()
}
:
:
ケース2:1秒間の最大同時処理数を制御する
func (lmt limit) ByMaxPerSecond(req ReqEntity) {
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
// 処理量オーバーの場合
// 1秒待って再挑戦
time.Sleep(time.Second)
select {
case <-lmt.capacity:
go lmt.kickService(req)
default:
fmt.Println("Capacity Over ByMaxPerSecond", req.Id)
}
}
}
- ケース1の
ByMax
関数とほぼ同じだが、異なるのはdefault
ブロックで1秒待ってlmt.capacity
チャネルの受信に再挑戦している部分 - 書いておいて何だけど、 実サービスでこれをやると処理滞留引き起こしてサチりそう なので非推奨
ケース3:処理間隔を制御する
func (lmt limit) ByTickerDuration(req ReqEntity) {
// 指定した実行間隔を空けてサービスを起動する
<-lmt.ticker
go lmt.kickService(req)
}
- time.Ticker を使う
- Tickerは 指定した間隔で時刻を配信する同期チャネル
- 間隔=10millisecで作成したTickerの場合、
<-ticker
の受信が10millisec毎に処理される
- 間隔=10millisecで作成したTickerの場合、
- 例えば QPSを100に制限したい って場合はtickerを10millisecにすれば良い
-
<-lmt.ticker
すればそこで10millisec待機するので、その後に行いたい処理を書けばOK
ケース4:処理間隔を制御するが、瞬間的なリクエストのburstにも対応する
func (lmt limit) ByTickerDurationWithBurst(req ReqEntity) {
select {
case <-lmt.capacity:
// 指定した最大同時処理量のサービスを同時起動する
go lmt.kickService(req)
case t := <-lmt.ticker:
// burstが起こっていれば指定したtickerの間隔で捌く
// 完了後のcapacityの補充は行わない
go lmt.kickServiceAndRefillCap(req, false)
fmt.Println("process by ticker", req.Id, t)
}
}
:
:
func (lmt limit) kickService(req ReqEntity) {
lmt.kickServiceAndRefillCap(req, true)
}
func (lmt limit) kickServiceAndRefillCap(req ReqEntity, isRefillCap bool) {
if lmt.capacity != nil && isRefillCap {
// 完了したらcapacityを補充する
defer func() {
lmt.capacity <- time.Now()
fmt.Println("*refilled capacity")
}()
}
:
:
}
- ケース3のように処理間隔のみで制御する場合、負荷的に余裕があっても常に等間隔で処理されるので、リソースが余る事も考えられる
- ちょっとしたリクエスト爆発は一気に処理しても良い、という場合の実装例
- 書き方としては、
lmt.capacity
とlmt.ticker
の両チャネルを同じselect
文で受信待機すれば良い- このselectに
default
節を書くとちょっとハマる、ハマった - Goのtime.Tickをselect caseで受ける時にdefault節を記述した場合の挙動メモ - Qiita
- このselectに
- ticker側で受信した場合は 起動したゴルーチン完了後にcapacityの再補充を行わない