みなさん,高速化に勤しんでますか?キャッシュは使ったことありますか?,Go言語でキャッシュ機構を取り入れると,「やった,高速化だ!」と思いたくなるものです.
だが,そこに並行処理が絡むと,一見シンプルだったはずのキャッシュが 「ややこしい問題」 を呼び寄せることがある.
本記事では,複数のゴルーチンが同時にキャッシュされたデータへアクセスして「アレっ?」と思うような不整合が起こる状況を例示し,その原因と対処法をわかりやすく解説する.
最近これにめちゃくちゃ苦しめられました
例えばキャッシュ内のデータを使用して同時にソートのアクセスがあった場合の並行処理を実装した時以下のような結果になることがあった.
Goroutine 0 result: [3 2 1]
Goroutine 1 result: [2 3 1]
Goroutine 2 result: [3 2 1]
Goroutine 3 result: [1 3 2]
Goroutine 4 result: [3 1 2]
ソートできてないやんけ
なんじゃこりゃ
なぜこんな問題が起きるのか
キャッシュ内に保持したスライスを複数のゴルーチンが同時に扱うと,誰がいつ並べ替えたか,バラバラになってしまう.例えば,sort.Sliceなどの操作はスライスを「その場で」並べ替えるため,並行アクセス下での衝突が発生する.結果として, 「なんだか並び順がおかしいぞ?」 と首を傾げるハメになる...
実際にコードをもとに考えてみよう
前提
- UnsafeCache スレッドセーフでないキャッシュ
- RepositoryImpl キャッシュを通じて商品データを取得する
- Executor 取得した商品を並べ替える
- main関数 複数のゴルーチンでExecutorを呼び出して不整合を確認する
// 商品データ
type Product struct {
ID string
Price int64
}
// データ取得の抽象
type Repository interface {
FindAllWithCache(ctx context.Context) ([]*Product, error)
}
// Executorは取得データを並べ替える
type Executor struct {
repo Repository
}
func NewExecutor(repo Repository) *Executor {
return &Executor{repo: repo}
}
// sortTypeが"price"なら価格基準,それ以外はID基準で降順ソート
func (e *Executor) Execute(sortType string) []*Product {
products, err := e.repo.FindAllWithCache(context.Background())
if err != nil {
panic(err)
}
// このproductsはキャッシュ内スライス参照であり,並列アクセスでは他ゴルーチンも同じものをいじる可能性あり
if sortType == "price" {
sort.SliceStable(products, func(i, j int) bool {
return products[i].Price > products[j].Price
})
} else {
sort.SliceStable(products, func(i, j int) bool {
return products[i].ID > products[j].ID
})
}
return products
}
キャッシュとリポジトリ実装
ここではスレッドセーフでないUnsafeCacheを使ってみる.
GetとSetを行うが,ロックはなし.
並行アクセスされたらどうなるか,このコードを見るとちょっとハラハラしますね...
type UnsafeCache struct {
data map[string][]*Product
}
func NewUnsafeCache() *UnsafeCache {
return &UnsafeCache{data: map[string][]*Product{}}
}
func (c *UnsafeCache) Get(key string) ([]*Product, bool) {
p, ok := c.data[key]
return p, ok
}
func (c *UnsafeCache) Set(key string, val []*Product) {
c.data[key] = val
}
// リポジトリはキャッシュから取得し,なければロード
type RepositoryImpl struct {
cache *UnsafeCache
loader func() []*Product
}
func NewRepositoryImpl(cache *UnsafeCache) *RepositoryImpl {
return &RepositoryImpl{
cache: cache,
loader: func() []*Product {
// 仮のデータ
return []*Product{
{ID: "1", Price: 100},
{ID: "2", Price: 200},
{ID: "3", Price: 300},
}
},
}
}
func (r *RepositoryImpl) FindAllWithCache(ctx context.Context) ([]*Product, error) {
if data, ok := r.cache.Get("products"); ok {
return data, nil
}
data := r.loader()
r.cache.Set("products", data)
return data, nil
}
main関数で並行実行してみる
複数ゴルーチンを走らせると,ちょっとしたスロットマシン状態になり,出てくる並びは予測不能.
どのゴルーチンがどのタイミングでどのソートをかけるかによって結果が変動する.
func main() {
cache := NewUnsafeCache()
repo := NewRepositoryImpl(cache)
executor := NewExecutor(repo)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
sortType := ""
if i%2 == 0 {
sortType = "price"
}
products := executor.Execute(sortType)
fmt.Printf("Goroutine %d result: %v\n", i, extractIDs(products))
}(i)
}
wg.Wait()
}
func extractIDs(products []*Product) []string {
var ids []string
for _, p := range products {
ids = append(ids, p.ID)
}
return ids
}
実行すると,時々「アレ?おかしいな?」という並びが得られるかもしれない.
これが並行アクセスによるデータ不整合だ.
ここからは実行結果の一例と対策案を具体的なコード例で示していく.
実行結果は状況次第で異なるため,あくまで一例として眺めてほしい.
実行結果の一例(対策なしの状態)
対策を何も行わずに前述のサンプルを実行すると,例えば以下のような結果が出る場合がある.
(出力例)
Goroutine 0 result: [3 2 1]
Goroutine 1 result: [2 3 1]
Goroutine 2 result: [3 2 1]
Goroutine 3 result: [1 3 2]
Goroutine 4 result: [3 1 2]
... など,実行するたびに微妙に結果がブレる
本来,ID基準の並びなら[3 2 1]や[2 1]などの安定した結果が期待できるが,別のゴルーチンがprice基準で並び替えた直後のスライスを参照すると中途半端な順序が混入する.
対策アイデア
"えー,どうやって直すの?"
そんな時は以下を検討しよう!
対策1. データ取得時にコピーを返す例
RepositoryImplのFindAllWithCacheで取得したスライスをコピーしてから返却する.
こうすることで,每回Executorは独立したスライスを操作し,他ゴルーチンの影響を受けなくなる.
func (r *RepositoryImpl) FindAllWithCache(ctx context.Context) ([]*Product, error) {
if data, ok := r.cache.Get("products"); ok {
// キャッシュヒット時もコピーを作る
copyData := make([]*Product, len(data))
copy(copyData, data)
return copyData, nil
}
data := r.loader()
r.cache.Set("products", data)
// ローダーから取得した場合もコピーして返す(厳密にはこの段階では不要かもしれないが,
// 一貫性のためにコピーを返すようにする)
copyData := make([]*Product, len(data))
copy(copyData, data)
return copyData, nil
}
この変更により,Executor.Execute内でsort.SliceStableを呼んでも,他のゴルーチンとスライスを共有しなくなる.
結果として,不整合な並びがほぼ発生しなくなる.
対策2. 排他制御(mutex)を導入する例
キャッシュアクセスや並び替え処理にミューテックスを用いてロックをかけることで,同時アクセスを防ぐ方法.
以下はUnsafeCacheをMutexで保護した例だ.
type SafeCache struct {
mu sync.Mutex
data map[string][]*Product
}
func NewSafeCache() *SafeCache {
return &SafeCache{data: map[string][]*Product{}}
}
func (c *SafeCache) Get(key string) ([]*Product, bool) {
c.mu.Lock()
defer c.mu.Unlock()
p, ok := c.data[key]
return p, ok
}
func (c *SafeCache) Set(key string, val []*Product) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = val
}
さらに,Executor.Executeで並び替える時にもロックを使えば,他ゴルーチンが並べ替え中に割り込めなくなる.
func (e *Executor) Execute(sortType string) []*Product {
products, err := e.repo.FindAllWithCache(context.Background())
if err != nil {
panic(err)
}
// 排他制御(実運用では共有のロックや細分化されたロック戦略が必要)
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
if sortType == "price" {
sort.SliceStable(products, func(i, j int) bool {
return products[i].Price > products[j].Price
})
} else {
sort.SliceStable(products, func(i, j int) bool {
return products[i].ID > products[j].ID
})
}
return products
}
この例は単純にMutexで全体を覆っているが,実際にはスコープやロック範囲を考慮して効率的な排他を行うべきだ.
対策3. スレッドセーフなデータ構造を利用する例
sync.Mapを使ってキャッシュを管理すれば,内部的にスレッドセーフな実装が行われる.
以下はSyncMapCacheという例示的な実装である.
type SyncMapCache struct {
data sync.Map // keyはstring, valueは[]*Productを想定
}
func (c *SyncMapCache) Get(key string) ([]*Product, bool) {
val, ok := c.data.Load(key)
if !ok {
return nil, false
}
// valはinterface{}として取得するため,型アサーションする
return val.([]*Product), true
}
func (c *SyncMapCache) Set(key string, products []*Product) {
c.data.Store(key, products)
}
SyncMapCacheを使えば,基本的なキャッシュ操作はスレッドセーフになる.
ただし,同じ問題を完全に回避するには,並べ替え対象のスライスをいかに扱うか(コピーするかどうか)を引き続き検討する必要がある.
対策4. イミュータブル設計の例
キャッシュに格納したデータを変更しない方針を徹底する場合,
例えばGet時には常に新規スライスを生成するイミュータブル戦略を導入する.
func (r *RepositoryImpl) FindAllWithCache(ctx context.Context) ([]*Product, error) {
if data, ok := r.cache.Get("products"); ok {
// キャッシュから取得後に必ず新規スライスを返し,元のスライスは不変とする
newData := make([]*Product, len(data))
for i, p := range data {
newP := *p // Productは値コピー
newData[i] = &newP
}
return newData, nil
}
// ローディング時も不変データを格納し,参照のみ行う
data := r.loader()
r.cache.Set("products", data)
// 取得時には必ず新規作成
copyData := make([]*Product, len(data))
for i, p := range data {
newP := *p
copyData[i] = &newP
}
return copyData, nil
}
このように不変データとして扱うことで,他ゴルーチンが同時に参照していても安全が担保される.
並行処理とキャッシュは夢の高速化コンビに思えるが,扱いを間違えると「お祭り状態」で手が付けられなくなる.
ここで示したような問題点を意識し,コピーやロック,スレッドセーフな仕組みの活用をしてみよう. 結果的に,こうした事前対策は将来のデバッグ時間を節約し,安心してサービスを運用するための投資となる.
さあ,これで「アレっ?」となる不整合ともサヨナラだ!
それではQiitaアドカレ.24企画の今日のクリスマスツリーです.
詳しくはこちらの記事から