記事作成の背景
先日、大規模リクエストを捌く計測サーバ構築を学習する機会があったので
そのアウトプットとして残します!!
結論
大規模リクエストを捌くコツは簡潔に言うと、以下の2点です!
- DB周りの設定をしっかり行う
- ネットワークコストを減らす
このコツに関して色々とお話します!
以下の流れで進んでいきます!
改善前
パッケージ
- net/http
- log
- go-sql-driver/mysql
ソースコード
package main
import (
"net/http"
"log"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"os"
)
func main() {
dataSourceName := os.Getenv("DATASOURCENAME")
if dataSourceName == "" {
dataSourceName = "root:password@tcp(127.0.0.1:13306)/test"
}
testhandler := func(w http.ResponseWriter, r *http.Request) {
db, err := sql.Open("mysql", dataSourceName)
if err != nil {
panic(err.Error())
}
defer db.Close()
stmt, e := db.Prepare("INSERT INTO eventlog(at, name, value) values(NOW(), ?, ?)")
if e != nil {
panic(e.Error())
}
defer stmt.Close()
name := r.URL.Query().Get("name")
value := r.URL.Query().Get("value")
_, _ = stmt.Exec(name, value)
origin := r.Header.Get("Origin")
if origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
} else {
w.Header().Set("Access-Control-Allow-Origin", "*")
}
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "GET")
}
http.HandleFunc("/test", testHandler)
http.HandleFunc("/ok", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) })
// start server
if err := http.ListenAndServe(":8081", nil); err != nil {
log.Fatal(err)
}
}
GoのDB周りの設定とコードの修正
背景
初期状態では、大規模リクエストを捌くことは到底できませんでした。
負荷をかけるツールにて、30000rpsを15s試したところ
too many connections
というSQLの接続数に関するエラーが出ており
レスポンスも途中でタイムアウトし、負荷をかけるツールで
計測できなくなってしまいました。
そこで、too many connections
を解決する手段として
- DBを強くする
- GoのDBまわりの設定
- コードのDB周りの修正
- アプリケーションコードを修正する
の4つの手法が挙げられました。
そこで、まずはDBを強くする手法を行うことにしましたが
今回はスケールアップが禁止されていたため、
DB自体を強くするのではなく、GoのDB周りの設定を行いました。
また、それに伴い、DB周りに関してサクッと修正できる
アプリケーションコードの修正を行いました。
MaxOpenConns
MaxOpenConnsとは、DBと最大何本の接続をできるかの上限で
デフォルト値は0で、無限に接続できます。
このパラメータの値が初期値だと無限に接続しようとして
too many connections
を発生させるため
変更しました。
(今回使用したMySQLでは65だったので、とりあえず30程度に設定しました。)
変更後
// ~~~省略
func main() {
dataSourceName := os.Getenv("DATASOURCENAME")
if dataSourceName == "" {
dataSourceName = "root:password@tcp(127.0.0.1:13306)/test"
}
testhandler := func(w http.ResponseWriter, r *http.Request) {
db, err := sql.Open("mysql", dataSourceName)
+ db.SetMaxOpenConns(30)
if err != nil {
panic(err.Error())
}
defer db.Close()
stmt, e := db.Prepare("INSERT INTO eventlog(at, name, value) values(NOW(), ?, ?)")
if e != nil {
panic(e.Error())
}
// ~~~変更がないので省略
Open, Close
testhandler := func(w http.ResponseWriter, r *http.Request) {
db, err := sql.Open("mysql", dataSourceName)
if err != nil {
panic(err.Error())
}
defer db.Close()
stmt, e := db.Prepare("INSERT INTO eventlog(at, name, value) values(NOW(), ?, ?)")
if e != nil {
panic(e.Error())
}
defer stmt.Close()
次に、Open, Close関数はDBを開く・閉じることができる関数です。
DBの開閉はサーバーが起動した時に開閉するだけで十分で
SQLパッケージの方にも一度使用するだけで良いとあるので
ハンドラの外に出します。
返されたDBは複数のゴルーチンによる同時使用にも安全で、アイドル接続のプールを独自に維持します。従って、Open 関数は一度だけ呼ばれるべきです。DB を閉じる必要はほとんどない。
公式の翻訳(deepl先生)
https://pkg.go.dev/database/sql#Open
変更後
func main() {
dataSourceName := os.Getenv("DATASOURCENAME")
if dataSourceName == "" {
dataSourceName = "root:password@tcp(127.0.0.1:13306)/test"
}
+ db, err := sql.Open("mysql", dataSourceName)
+ db.SetMaxOpenConns(30)
+ if err != nil {
+ panic(err.Error())
+ }
+
+ defer db.Close()
testhandler := func(w http.ResponseWriter, r *http.Request) {
stmt, e := db.Prepare("INSERT INTO eventlog(at, name, value) values(NOW(), ?, ?)")
if e != nil {
panic(e.Error())
}
ここまででDB周りの設定になります。
次から重要な非同期での処理に関する部分になります🔥
バルクインサート
背景
DB周りをGoで設定してもtoo many connections
のエラーは
あまり減少しませんでした。
その原因としてDBに挿入する処理 で時間を食っていたことが挙げられます。
具体的に時間を計測した結果がこちらです。
{
"prepare":0.066958017,
"insert":0.09718244,
"response":0.164149096,
}
prepareステートメントまでをprepare
prepare後~Execステートメントまでをinsert
Exec後〜処理完了までをresponse
としています。
この結果から明らかにデータの挿入回数を減らす必要があり、
なおかつ限界までデータの挿入を行う必要があります。
つまり、非同期に複数のハンドラーを実行&&一気にデータを挿入
しなければならないということです。
そこで、Goの非同期に関するgoroutine, channelを利用しました✊
goroutine, channelの利用
goroutineとは
詳しくはこちらの Tour of Goで確認される方が
早いですが、Goのランタイムに管理される軽量なスレッドです。
スレッドとは、プロセス(実行中のプログラム)内で複数のプログラムを動かすことが
できるやつで、普段のプログラムはシングルスレッドですが
goroutineであればマルチスレッドに実行できます。
ちなみに、読みは「ゴルーチン」です笑
channelとは
公式を読んだり、実際に実行したりした方がイメージが湧きやすいですが
チャネルはチャネルオペレータの<-
を用いて値の送受信が
できる通り道(ちょっとしたバッファのようなもの)です。
ちょっとしたバッファのようなものなので、バッファとして使うこともできます。
ここまでが非同期に関する部分で、次から
一気にデータを挿入するバルクインサートの話になります。
バルクインサートとは
バルクインサートとは、複数のデータをまとめて挿入することです。
実装としては、一定数になるまでデータを溜め込むキューと
挿入の実行部分を用意することになります。
sqlxの利用
バルクインサートを便利なパッケージの利用なしで
自前に頑張る方法もありますが、SQLインジェクション対策まで
できていたり、既存のsqlに似た形で利用できたりするを考慮すると
このsqlxが優秀です。
sqlxとは
sqlxとは、Goのsqlパッケージの軽めのラッパです。軽いラッパなので
通常のsqlパッケージとほとんど似た形で実装できます。
また、先述の通りsqlxは変数を?
の文字でバインドしており
SQLインジェクションの対策も行っているそうです。
更に、バルクインサートもNamedExec(query, array)
の形式で
簡単に実装できます🌸
バルクインサートの部分
func recordEvents(db *sqlx.DB, eventChannel <-chan Event) {
events := make([]Event, 0)
for {
select {
case event := <-eventChannel:
events = append(events, event)
if len(events) >= MaxEvents {
if err := insertEvents(db, events); err != nil {
zlog.Error().Err(err).Msg("failure to record events")
}
events = make([]Event, 0)
}
case <-time.After(EventRecordingInterval):
if len(events) == 0 {
continue
}
if err := insertEvents(db, events); err != nil {
zlog.Error().Err(err).Msg("failure to record events")
}
events = make([]Event, 0)
}
}
}
func insertEvents(db *sqlx.DB, events []Event) error {
query := "INSERT INTO eventlog (at, name, value) VALUES (:at, :name, :value)"
if _, err := db.NamedExec(query, events); err != nil {
return err
}
return nil
}
改善後
主にDB周りの設定やバルクインサートについて
変更を加えました。そのついでに
定数化できるものを定数化したりしています。
また、goroutineのスレッドは30としています。
これらの変更を加えることで、too many connections
(接続数が多すぎる件)も
解消され、無事30000rps捌くことができました✨
package main
import (
"log"
"net/http"
"strconv"
"time"
"os"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
zlog "github.com/rs/zerolog/log"
)
type Event struct {
CreatedAt time.Time `db:"at"`
Name string `db:"name"`
Value int `db:"value"`
}
const (
MaxEvents int = 10000
EventQueueSize int = 100000
MaxConcurrency int = 30
DBMaxOpenConns int = 30
DBMaxIdleConns int = 30
EventRecordingInterval = time.Second
)
func main() {
dataSourceName := os.Getenv("DATASOURCENAME")
if dataSourceName == "" {
dataSourceName = "root:password@tcp(127.0.0.1:13306)/test"
}
db, err := sqlx.Open("mysql", dataSourceName)
if err != nil {
log.Panicf("sql open failure:%s", err.Error())
}
defer db.Close()
db.SetMaxOpenConns(DBMaxOpenConns)
if err := db.Ping(); err != nil {
log.Panicf("Ping failure:%s", err.Error())
}
eventChannel := make(chan Event, EventQueueSize)
for i := 0; i < MaxConcurrency; i++ {
go recordEvents(db, eventChannel)
}
testHandler := func(w http.ResponseWriter, r *http.Request) {
value, err := strconv.Atoi(r.URL.Query().Get("value"))
if err != nil {
w.WriteHeader(400)
zlog.Error().Err(err).Str("value", r.URL.Query().Get("value")).Msg("'value' is not an integer value")
return
}
event := Event{
CreatedAt: time.Now(),
Name: r.URL.Query().Get("name"),
Value: value,
}
eventChannel <- event
origin := r.Header.Get("Origin")
if origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
} else {
w.Header().Set("Access-Control-Allow-Origin", "*")
}
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "GET")
}
http.HandleFunc("/test", testHandler)
http.HandleFunc("/ok", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) })
// start server
if err := http.ListenAndServe(":8081", nil); err != nil {
log.Panicf("server couldn't start:%s", err.Error())
}
}
func recordEvents(db *sqlx.DB, eventChannel <-chan Event) {
events := make([]Event, 0)
for {
select {
case event := <-eventChannel:
events = append(events, event)
if len(events) >= MaxEvents {
if err := insertEvents(db, events); err != nil {
zlog.Error().Err(err).Msg("failure to record events")
}
events = make([]Event, 0)
}
case <-time.After(EventRecordingInterval):
if len(events) == 0 {
continue
}
if err := insertEvents(db, events); err != nil {
zlog.Error().Err(err).Msg("failure to record events")
}
events = make([]Event, 0)
}
}
}
func insertEvents(db *sqlx.DB, events []Event) error {
query := "INSERT INTO eventlog (at, name, value) VALUES (:at, :name, :value)"
if _, err := db.NamedExec(query, events); err != nil {
return err
}
return nil
}
感想
今まではフロントメインで情報技術に触れてきましたが、大規模リクエストを捌く計測サーバ構築を学習するにあたって、大学で習ったサーバーサイドの知識を実践的に使用することができました。また、サーバーサイドでは必要な知識はサーバーだけでなく、オペレーティングシステムの技術も深く関わりがあり、インフラに関しても軽く理解が深まる形となりました。初めはGo言語で何かしらできたらいいな、程度のきっかけでしたが、色々学べたので楽しかったです🙌
読んでいただいたみなさんもサーバーサイドに関して少しでも理解が深まりますと幸いです🙇
参考文献