LoginSignup
20
22

More than 5 years have passed since last update.

負荷やスループットを意識してgoroutineを実行する

Posted at

動機

  • 何時・どの程度の処理リクエストが飛んでくるか分からないというケースでGoを採用したとして、「裏側のインフラ・ミドルに掛かる負荷をアプリレベルでもそれなりに考慮する必要がある」という機能要件が上がって来た場合、Goだとどう書くのがstandardなんだろう?
  • Web上で事例を探してみたところ、rate limit, worker pool, etcのサンプルを見かけるんだけど、情報が分散気味
  • 「負荷やスループットを意識した書き方」をパターン/スニペット風にまとめておけば後で役に立つかも という話。を調べてみた、という話。

※もっと良い書き方あるだろ!とか、他にもこんなケースがあるだろ!とか間違いなくあるはずなので、都度より良く改善していきたい。

使用したソース

type LimitExector interface {
  // 同時処理数に上限を設ける
  // 上限を超えるリクエストは処理しない
  ByMax(req ReqEntity)

  // 1秒間の同時処理数に上限を設ける
  // 上限を超えるリクエストは1秒待つ
  ByMaxPerSecond(req ReqEntity)

  // 処理に実行間隔を設ける => x秒間隔に1回処理を行う
  ByTickerDuration(req ReqEntity)

  // 処理に実行間隔を設ける => x秒間隔に1回処理を行う
  // 瞬間的な高負荷(=burst)への対応として、先頭200リクエストまでは一気に処理する事を許可する
  ByTickerDurationWithBurst(req ReqEntity)
}
  • LimitExectorインタフェースを実装するlimitstructと、limitを生成するために使用するLimitParamstruct
// どんな制限を掛けるか?を指定するパラメータ用struct
type LimitParam struct {
  BurstLimit      int
  DurationNanoSec int
}

// 制限を掛ける為のオブジェクト(ここではチャネル)を保持するstruct
// このstructにLimitExectorインタフェースのメソッドを実装する
type limit struct {
  capacity chan time.Time    // 最大同時処理数をコントロールするチャネル
  ticker   <-chan time.Time  // 処理間隔をコントロールするチャネル
}

// LimitExectorのインスタンスを取得する
func NewLimitExector(limitParam LimitParam) LimitExector {
  lmt := limit{}

  if limitParam.BurstLimit > 0 {
    lmt.capacity = make(chan time.Time, limitParam.BurstLimit)
    for i := 0; i < limitParam.BurstLimit; i++ {
      lmt.capacity <- time.Now()
    }
  }

  if limitParam.DurationNanoSec > 0 {
    lmt.ticker = time.Tick(time.Nanosecond * time.Duration(limitParam.DurationNanoSec))
  }

  return lmt
}
  • デモ用のリクエスト群を生成して、forループでLimitExectorの各メソッドに投げる
for req := range <リクエスト群> {
  limitExector.ByXXXX(req)
}

検証ケース

ケース1:最大同時処理数を制御する

func (lmt limit) ByMax(req ReqEntity) {
  select {
  case <-lmt.capacity:
    go lmt.kickService(req)
  default:
    // 処理量オーバーの場合
    // 実際の例では sorryサーバ的なトコに飛ばす とかありそう
    fmt.Println("Capacity Over ByMax", req.Id)
  }
}
  • lmt.capacity最大同時処理数と同じサイズのチャネル (事前に生成・値を送信済)
  • select case文でこのチャネルを受信するcaseを書く
    • 受信出来る要素があればgo lmt.kickService(req)が動く
  • 上記のコード例だと、lmt.capacityチャネルがサイズ=100で生成・値送信済の場合、最初の100個は一気に処理するが、この時点でチャネルから受信できる要素が無くなり、以降のリクエストが来てもdefault節に処理が飛ぶ
    • defaultブロックを書かなかった場合は、lmt.capacityから要素を受信可能になるまでcase部分で待つ挙動になる
  • go lmt.kickService(req)は実際に起動したいゴルーチン(のダミー関数)
    • この関数内で 処理が完了したらチャネルに値を再送信する(再補充する)
    • 再送信後はcase <-lmt.capacity節の受信が再び動くようになる
func (lmt limit) kickService(req ReqEntity) {
  if lmt.capacity != nil {
    // 完了したらcapacityを再補充する
    defer func() {
      lmt.capacity <- time.Now()
      fmt.Println("*refilled capacity")
    }()
  }
:
:

ケース2:1秒間の最大同時処理数を制御する

func (lmt limit) ByMaxPerSecond(req ReqEntity) {
  select {
  case <-lmt.capacity:
    go lmt.kickService(req)
  default:
    // 処理量オーバーの場合
    // 1秒待って再挑戦
    time.Sleep(time.Second)
    select {
    case <-lmt.capacity:
      go lmt.kickService(req)
    default:
      fmt.Println("Capacity Over ByMaxPerSecond", req.Id)
    }
  }
}
  • ケース1のByMax関数とほぼ同じだが、異なるのはdefaultブロックで1秒待ってlmt.capacityチャネルの受信に再挑戦している部分
  • 書いておいて何だけど、 実サービスでこれをやると処理滞留引き起こしてサチりそう なので非推奨

ケース3:処理間隔を制御する

func (lmt limit) ByTickerDuration(req ReqEntity) {
  // 指定した実行間隔を空けてサービスを起動する
  <-lmt.ticker
  go lmt.kickService(req)
}
  • time.Ticker を使う
  • Tickerは 指定した間隔で時刻を配信する同期チャネル
    • 間隔=10millisecで作成したTickerの場合、<-tickerの受信が10millisec毎に処理される
  • 例えば QPSを100に制限したい って場合はtickerを10millisecにすれば良い
  • <-lmt.tickerすればそこで10millisec待機するので、その後に行いたい処理を書けばOK

ケース4:処理間隔を制御するが、瞬間的なリクエストのburstにも対応する

func (lmt limit) ByTickerDurationWithBurst(req ReqEntity) {
  select {
  case <-lmt.capacity:
    // 指定した最大同時処理量のサービスを同時起動する
    go lmt.kickService(req)
  case t := <-lmt.ticker:
    // burstが起こっていれば指定したtickerの間隔で捌く
    // 完了後のcapacityの補充は行わない
    go lmt.kickServiceAndRefillCap(req, false)
    fmt.Println("process by ticker", req.Id, t)
  }
}
:
:
func (lmt limit) kickService(req ReqEntity) {
  lmt.kickServiceAndRefillCap(req, true)
}

func (lmt limit) kickServiceAndRefillCap(req ReqEntity, isRefillCap bool) {
  if lmt.capacity != nil && isRefillCap {
    // 完了したらcapacityを補充する
    defer func() {
      lmt.capacity <- time.Now()
      fmt.Println("*refilled capacity")
    }()
  }
:
:
}
  • ケース3のように処理間隔のみで制御する場合、負荷的に余裕があっても常に等間隔で処理されるので、リソースが余る事も考えられる
  • ちょっとしたリクエスト爆発は一気に処理しても良い、という場合の実装例
  • 書き方としては、lmt.capacitylmt.tickerの両チャネルを同じselect文で受信待機すれば良い
  • ticker側で受信した場合は 起動したゴルーチン完了後にcapacityの再補充を行わない

参考

20
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
22