はじめに
この記事は Wano Group Advent Calendar 2023の21日目の記事となります。
平行プログラミングの古典的な問題であるProducer-Consumer Problemの典型的な解を実装・確認していきます。実装には、golangを用います。golangは業務で扱うこともあるのですが、平行プログラミングを陽に意識したコードを書く機会は普段ないので、golangによる平行プログラミングの個人的な入門も兼ねた備忘録になります。
Producer-Consumer Problem
登場人物は、あるアイテムをためることができる有限サイズのキュー、そこにアイテムを入れるProducer、そこからアイテムを取り出すConsumerです。Producerはキューがいっぱいの時はアイテムを入れることはできないので待機する必要があり、Consumerはキューが空の時はアイテムを取り出せないので待機する必要があります。ProducerとConsumerが平行に動作する時、どうやって競合状態を回避すればよいかというのがこの問題になります。
セマフォを利用する
以下のpakageを利用します。
NewWeighted(n int64) *Weighted
で最大値がn
のセマフォを作成。
Weighted
にはAcquire
とRelease
メソッドが用意されています。前者が一般的なセマフォでいうところのdown
で後者がup
に相当するようです。
以下がセマフォ利用した実装になります。
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
type Item int
type Queue struct {
mtx sync.Mutex
empty *semaphore.Weighted // num of avalable slots
full *semaphore.Weighted // num of used slots
items []Item
}
func NewQueue(size int) *Queue {
q := &Queue{
mtx: sync.Mutex{},
empty: semaphore.NewWeighted(int64(size)),
full: semaphore.NewWeighted(int64(size)),
items: []Item{},
}
if err := q.full.Acquire(context.TODO(), int64(size)); err != nil {
panic(err)
}
return q
}
func (q *Queue) Produce(producerID int) {
if err := q.empty.Acquire(context.TODO(), 1); err != nil {
panic(err)
}
q.mtx.Lock()
q.items = append(q.items, Item(producerID))
q.mtx.Unlock()
q.full.Release(1)
}
func (q *Queue) Consume(consumerID int) Item {
if err := q.full.Acquire(context.TODO(), 1); err != nil {
panic(err)
}
q.mtx.Lock()
producerID := q.items[0]
q.items = q.items[1:]
fmt.Printf("%d:%d\n", producerID, consumerID)
q.mtx.Unlock()
q.empty.Release(1)
return producerID
}
func main() {
q := NewQueue(1)
n := 25 // num of consumer threads
// spawn consumers
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go func() {
q.Consume(i)
wg.Done()
}()
time.Sleep(50 * time.Millisecond)
}
fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
for i := 0; i < n; i++ {
q.Produce(i)
}
wg.Wait()
}
キューのサイズを1としたときの結果が以下になります。
ProducerID:ConsumerID
0:0
1:1
2:2
3:3
4:4
5:5
6:6
7:7
8:8
9:9
10:10
11:11
12:12
13:13
14:14
15:15
16:16
17:17
18:18
19:19
20:20
21:21
22:22
23:23
24:24
Consumerが待機した順にProducerによって起こされている様子がわかります。つまり、セマフォ上での待機時間の長いスレッドが順番に起こされています。これは、実際にgolang.org/x/sync/semaphore
の実装からも確認できます (そんなにボリュームもないので軽く読める量だと思います)。
キューのサイズを1より大きい値にしたときはやや様子が変わってきます。以下はキューのサイズを3にしたときの結果です。
ProducerID:ConsumerID
0:2
1:1
2:4
3:5
4:6
5:0
6:8
7:9
8:10
9:11
10:12
11:13
12:14
13:15
14:16
15:17
16:18
17:19
18:20
19:21
20:22
21:23
22:24
23:7
24:3
競合状態は発生していませんが、キューサイズが1の時は待機時間の長いConsumerから順次動作していたのが、キューサイズを3に上げることでこの性質が失われました。キューに余裕がある間は、Producerはブロックされず、セマフォ上で待機しているConsumerを順次起こしていきます。起こされた複数のコンシューマーはクリティカルセクション手前でsync.Mutex
Lockを取り合います。セマフォ上での待機からの復帰とmutexの獲得はアトミックではありません。また、あるsync.Mutex
に対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではありません。従ってこのような結果になったと推測されます(たぶん、おそらく、Maybe)。golangのsync.Mutex
の性質については最後に軽く触れます。
条件変数を利用する
標準packageに含まれているのでそちらを利用します。
メソッド名などは標準的な条件変数と変わらないようにみえます。
以下が条件変数利用した実装になります。
package main
import (
"fmt"
"sync"
"time"
)
type Item int
type Queue struct {
pcv *sync.Cond
ccv *sync.Cond
capacity int
items []Item
}
func NewQueue(size int) *Queue {
mtx := sync.Mutex{}
return &Queue{
pcv: sync.NewCond(&mtx),
ccv: sync.NewCond(&mtx),
capacity: size,
items: make([]Item, 0, size),
}
}
func (q *Queue) Produce(producerID int) {
q.pcv.L.Lock()
for len(q.items) >= q.capacity {
q.pcv.Wait()
}
q.items = append(q.items, Item(producerID))
q.pcv.L.Unlock()
q.ccv.Signal()
}
func (q *Queue) Consume(consumerID int) Item {
q.ccv.L.Lock()
for len(q.items) == 0 {
q.ccv.Wait()
}
producerID := q.items[0]
q.items = q.items[1:]
fmt.Printf("%d:%d\n", producerID, consumerID)
q.ccv.L.Unlock()
q.pcv.Signal()
return producerID
}
func main() {
q := NewQueue(1)
n := 25 // num of consumer threads
// spawn consumers
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go func() {
q.Consume(i)
wg.Done()
}()
time.Sleep(10 * time.Millisecond)
}
fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
for i := 0; i < n; i++ {
q.Produce(i)
}
wg.Wait()
}
キューのサイズを1としたときの結果が以下になります。
ProducerID:ConsumerID
0:0
1:1
2:2
3:3
4:4
5:5
6:6
7:7
8:8
9:9
10:10
11:11
12:12
13:13
14:14
15:15
16:16
17:17
18:18
19:19
20:20
21:21
22:22
23:23
24:24
セマフォの時と同様にどうやらgolangの条件変数は、条件変数上での待機時間の長いスレッドが順番に起こすようになっていると推察されます。
キューのサイズを1より大きい値にしたときもセマフォ同様に順序に乱れが見られます。以下はキューのサイズを3にしたときの結果です。
ProducerID:ConsumerID
0:2
1:3
2:4
3:5
4:6
5:7
6:8
7:9
8:0
9:1
10:12
11:10
12:14
13:13
14:11
15:17
16:18
17:15
18:20
19:21
20:22
21:23
22:16
23:19
24:24
理由としては、おそらくセマフォの時と同様に条件変数上での待機からの復帰と待機開始時に解放したmutexの再取得がアトミックではないことと、mutex自体の性質によるものと推察されます。
メッセージ・パッシングを利用する
条件変数はセマフォより柔軟で便利ですが、golangではChannel
を利用した同期の仕組みが提供されています。sync.Cond
のドキュメントにも記載がありますが、sync.Cond
を使って解決できる問題の多くはChannelでも可能であり、そちらの方がベターだという旨が記載されています。今回取り組んでいるProducer-Consumer ProblemもChannelを利用したほうがよりシンプルなコードになります。
以下がChannelを利用したメッセージ・パッシングによるアプローチによる実装になります。
package main
import (
"fmt"
"sync"
"time"
)
type Item int
type Queue struct {
ch chan Item
}
func NewQueue(size int) *Queue {
return &Queue{
ch: make(chan Item, size),
}
}
func (q *Queue) Produce(producerID int) {
q.ch <- Item(producerID)
}
func (q *Queue) Consume(consumerID int) Item {
producerID, ok := <-q.ch
if !ok {
panic("channel has been closed")
}
fmt.Printf("%d:%d\n", producerID, consumerID)
return producerID
}
func (q *Queue) Close() {
close(q.ch)
}
func main() {
q := NewQueue(1)
n := 25 // num of consumer threads
// spawn consumers
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go func() {
q.Consume(i)
wg.Done()
}()
time.Sleep(50 * time.Millisecond)
}
fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
for i := 0; i < n; i++ {
q.Produce(i)
}
wg.Wait()
q.Close()
}
キューのサイズを1としたときの結果が以下になります。
ProducerID:ConsumerID
6:6
9:9
7:7
8:8
24:24
0:0
1:1
2:2
3:3
4:4
5:5
16:16
10:10
22:22
12:12
13:13
14:14
15:15
20:20
17:17
18:18
19:19
11:11
23:23
21:21
Consumerスレッドが待機に入った順にProducerによって起こされています。今回の場合は、セマフォと条件変数の時と違いキューのサイズを2より大きくしても、この性質は変わりません。先の2つのアプローチと違って、itemを貯めるための共有変数Queue.items
を必要としなくなったため、mutexも不要になったことが理由として考えられます。
golangのMutexと飢餓
先に、あるsync.Mutex
に対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではないと記載しました。こちらについて少し触れていきます。
もし、ロック獲得を試みた順と実際に獲得できる順が異なると飢餓の問題が生じます。つまり、先にロックの獲得を試みたスレッドが、後からロックを獲得しようとした他のスレッドに割り込まるようなことが運悪く何度も続くと実質スレッドが止まったままになってしまう可能性があります。このような状況は飢餓 (starvation) と呼ばれており、平行プログラミングにおける一般的な問題として知られています。
golangの場合はざっくり次のような形で飢餓を避けているようです。sync.Mutex
は1 ms以上mutexの上で待機しているスレッドがあれば、それをマークしてロックが解放された際に優先的にロックが取れるようにスケジューリングしているようです。なので、あるsync.Mutex
に対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではないではないが、極端な飢餓は生じないようになっています。
こちらについては、sync.Mutex
のソースコード中に記載されているコメントや以下の記事が参考になります。
おわりに
結果に対する考察に確証が持てなかったり、条件変数の実装を追いかけようとして途中で断念したことが残念ポイントでした。goroutine自体の設計思想や平行プログラミング一般についての前提知識が十分にないと厳しそうだと感じたので、ちゃんと勉強してから再チャレンジしたい所存です。
参考
本稿を書くにあたって参考にした書籍を記載します。なお、Webから直接参照可能な資料については、本文中にリンクを記載しているのでここでは割愛させていただきます。
[1] 河野健ニ: オペレーティングシステムの仕組み, 朝倉書店 (2007)
[2] Andrew S. Tanenbaum, Herbert Bos: Modern Operating Systems: Global Edition, 4th edition, PEARSON (2014)
[3] 高野祐輝: 平行プログラミング入門, オライリー・ジャパン (2021)
募集
現在、Wanoグループでは人材募集をしています。興味のある方は下記を参照してください。