14
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Wano GroupAdvent Calendar 2023

Day 21

Producer-Consumer Problemとふれあう

Last updated at Posted at 2023-12-21

はじめに

この記事は Wano Group Advent Calendar 2023の21日目の記事となります。

平行プログラミングの古典的な問題であるProducer-Consumer Problemの典型的な解を実装・確認していきます。実装には、golangを用います。golangは業務で扱うこともあるのですが、平行プログラミングを陽に意識したコードを書く機会は普段ないので、golangによる平行プログラミングの個人的な入門も兼ねた備忘録になります。

Producer-Consumer Problem

登場人物は、あるアイテムをためることができる有限サイズのキュー、そこにアイテムを入れるProducer、そこからアイテムを取り出すConsumerです。Producerはキューがいっぱいの時はアイテムを入れることはできないので待機する必要があり、Consumerはキューが空の時はアイテムを取り出せないので待機する必要があります。ProducerとConsumerが平行に動作する時、どうやって競合状態を回避すればよいかというのがこの問題になります。

セマフォを利用する

以下のpakageを利用します。

NewWeighted(n int64) *Weightedで最大値がnのセマフォを作成。
WeightedにはAcquireReleaseメソッドが用意されています。前者が一般的なセマフォでいうところのdownで後者がupに相当するようです。

以下がセマフォ利用した実装になります。

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

type Item int

type Queue struct {
	mtx   sync.Mutex
	empty *semaphore.Weighted // num of avalable slots
	full  *semaphore.Weighted // num of used slots
	items []Item
}

func NewQueue(size int) *Queue {
	q := &Queue{
		mtx:   sync.Mutex{},
		empty: semaphore.NewWeighted(int64(size)),
		full:  semaphore.NewWeighted(int64(size)),
		items: []Item{},
	}
	if err := q.full.Acquire(context.TODO(), int64(size)); err != nil {
		panic(err)
	}

	return q
}

func (q *Queue) Produce(producerID int) {
	if err := q.empty.Acquire(context.TODO(), 1); err != nil {
		panic(err)
	}
	q.mtx.Lock()
	q.items = append(q.items, Item(producerID))
	q.mtx.Unlock()
	q.full.Release(1)
}

func (q *Queue) Consume(consumerID int) Item {
	if err := q.full.Acquire(context.TODO(), 1); err != nil {
		panic(err)
	}
	q.mtx.Lock()
	producerID := q.items[0]
	q.items = q.items[1:]
	fmt.Printf("%d:%d\n", producerID, consumerID)
	q.mtx.Unlock()
	q.empty.Release(1)

	return producerID
}

func main() {
	q := NewQueue(1)
	n := 25 // num of consumer threads
	// spawn consumers
	wg := sync.WaitGroup{}
	for i := 0; i < n; i++ {
		i := i
		wg.Add(1)
		go func() {
			q.Consume(i)
			wg.Done()
		}()
		time.Sleep(50 * time.Millisecond)
	}

	fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
	for i := 0; i < n; i++ {
		q.Produce(i)
	}

	wg.Wait()
}

キューのサイズを1としたときの結果が以下になります。

ProducerID:ConsumerID
0:0
1:1
2:2
3:3
4:4
5:5
6:6
7:7
8:8
9:9
10:10
11:11
12:12
13:13
14:14
15:15
16:16
17:17
18:18
19:19
20:20
21:21
22:22
23:23
24:24

Consumerが待機した順にProducerによって起こされている様子がわかります。つまり、セマフォ上での待機時間の長いスレッドが順番に起こされています。これは、実際にgolang.org/x/sync/semaphoreの実装からも確認できます (そんなにボリュームもないので軽く読める量だと思います)。

キューのサイズを1より大きい値にしたときはやや様子が変わってきます。以下はキューのサイズを3にしたときの結果です。

ProducerID:ConsumerID
0:2
1:1
2:4
3:5
4:6
5:0
6:8
7:9
8:10
9:11
10:12
11:13
12:14
13:15
14:16
15:17
16:18
17:19
18:20
19:21
20:22
21:23
22:24
23:7
24:3

競合状態は発生していませんが、キューサイズが1の時は待機時間の長いConsumerから順次動作していたのが、キューサイズを3に上げることでこの性質が失われました。キューに余裕がある間は、Producerはブロックされず、セマフォ上で待機しているConsumerを順次起こしていきます。起こされた複数のコンシューマーはクリティカルセクション手前でsync.Mutex Lockを取り合います。セマフォ上での待機からの復帰とmutexの獲得はアトミックではありません。また、あるsync.Mutexに対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではありません。従ってこのような結果になったと推測されます(たぶん、おそらく、Maybe)。golangのsync.Mutexの性質については最後に軽く触れます。

条件変数を利用する

標準packageに含まれているのでそちらを利用します。

メソッド名などは標準的な条件変数と変わらないようにみえます。

以下が条件変数利用した実装になります。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Item int

type Queue struct {
	pcv      *sync.Cond
	ccv      *sync.Cond
	capacity int
	items    []Item
}

func NewQueue(size int) *Queue {
	mtx := sync.Mutex{}

	return &Queue{
		pcv:      sync.NewCond(&mtx),
		ccv:      sync.NewCond(&mtx),
		capacity: size,
		items:    make([]Item, 0, size),
	}
}

func (q *Queue) Produce(producerID int) {
	q.pcv.L.Lock()
	for len(q.items) >= q.capacity {
		q.pcv.Wait()
	}
	q.items = append(q.items, Item(producerID))
	q.pcv.L.Unlock()
	q.ccv.Signal()
}

func (q *Queue) Consume(consumerID int) Item {
	q.ccv.L.Lock()
	for len(q.items) == 0 {
		q.ccv.Wait()
	}
	producerID := q.items[0]
	q.items = q.items[1:]
	fmt.Printf("%d:%d\n", producerID, consumerID)
	q.ccv.L.Unlock()

	q.pcv.Signal()

	return producerID
}

func main() {
	q := NewQueue(1)
	n := 25 // num of consumer threads
	// spawn consumers
	wg := sync.WaitGroup{}
	for i := 0; i < n; i++ {
		i := i
		wg.Add(1)
		go func() {
			q.Consume(i)
			wg.Done()
		}()
		time.Sleep(10 * time.Millisecond)
	}

	fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
	for i := 0; i < n; i++ {
		q.Produce(i)
	}

	wg.Wait()
}

キューのサイズを1としたときの結果が以下になります。

ProducerID:ConsumerID
0:0
1:1
2:2
3:3
4:4
5:5
6:6
7:7
8:8
9:9
10:10
11:11
12:12
13:13
14:14
15:15
16:16
17:17
18:18
19:19
20:20
21:21
22:22
23:23
24:24

セマフォの時と同様にどうやらgolangの条件変数は、条件変数上での待機時間の長いスレッドが順番に起こすようになっていると推察されます。

キューのサイズを1より大きい値にしたときもセマフォ同様に順序に乱れが見られます。以下はキューのサイズを3にしたときの結果です。

ProducerID:ConsumerID
0:2
1:3
2:4
3:5
4:6
5:7
6:8
7:9
8:0
9:1
10:12
11:10
12:14
13:13
14:11
15:17
16:18
17:15
18:20
19:21
20:22
21:23
22:16
23:19
24:24

理由としては、おそらくセマフォの時と同様に条件変数上での待機からの復帰と待機開始時に解放したmutexの再取得がアトミックではないことと、mutex自体の性質によるものと推察されます。

メッセージ・パッシングを利用する

条件変数はセマフォより柔軟で便利ですが、golangではChannelを利用した同期の仕組みが提供されています。sync.Condのドキュメントにも記載がありますが、sync.Condを使って解決できる問題の多くはChannelでも可能であり、そちらの方がベターだという旨が記載されています。今回取り組んでいるProducer-Consumer ProblemもChannelを利用したほうがよりシンプルなコードになります。

以下がChannelを利用したメッセージ・パッシングによるアプローチによる実装になります。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Item int

type Queue struct {
	ch chan Item
}

func NewQueue(size int) *Queue {
	return &Queue{
		ch: make(chan Item, size),
	}
}

func (q *Queue) Produce(producerID int) {
	q.ch <- Item(producerID)
}

func (q *Queue) Consume(consumerID int) Item {
	producerID, ok := <-q.ch
	if !ok {
		panic("channel has been closed")
	}
	fmt.Printf("%d:%d\n", producerID, consumerID)

	return producerID
}

func (q *Queue) Close() {
	close(q.ch)
}

func main() {
	q := NewQueue(1)
	n := 25 // num of consumer threads
	// spawn consumers
	wg := sync.WaitGroup{}
	for i := 0; i < n; i++ {
		i := i
		wg.Add(1)
		go func() {
			q.Consume(i)
			wg.Done()
		}()
		time.Sleep(50 * time.Millisecond)
	}

	fmt.Printf("%s:%s\n", "ProducerID", "ConsumerID")
	for i := 0; i < n; i++ {
		q.Produce(i)
	}

	wg.Wait()

	q.Close()
}

キューのサイズを1としたときの結果が以下になります。

ProducerID:ConsumerID
6:6
9:9
7:7
8:8
24:24
0:0
1:1
2:2
3:3
4:4
5:5
16:16
10:10
22:22
12:12
13:13
14:14
15:15
20:20
17:17
18:18
19:19
11:11
23:23
21:21

Consumerスレッドが待機に入った順にProducerによって起こされています。今回の場合は、セマフォと条件変数の時と違いキューのサイズを2より大きくしても、この性質は変わりません。先の2つのアプローチと違って、itemを貯めるための共有変数Queue.itemsを必要としなくなったため、mutexも不要になったことが理由として考えられます。

golangのMutexと飢餓

先に、あるsync.Mutexに対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではないと記載しました。こちらについて少し触れていきます。

もし、ロック獲得を試みた順と実際に獲得できる順が異なると飢餓の問題が生じます。つまり、先にロックの獲得を試みたスレッドが、後からロックを獲得しようとした他のスレッドに割り込まるようなことが運悪く何度も続くと実質スレッドが止まったままになってしまう可能性があります。このような状況は飢餓 (starvation) と呼ばれており、平行プログラミングにおける一般的な問題として知られています。

golangの場合はざっくり次のような形で飢餓を避けているようです。sync.Mutexは1 ms以上mutexの上で待機しているスレッドがあれば、それをマークしてロックが解放された際に優先的にロックが取れるようにスケジューリングしているようです。なので、あるsync.Mutexに対して複数のスレッドがロックを取り合っているとき、それらが実際にロックを獲得できる順番は必ずしも獲得を試みた順と一致するわけではないではないが、極端な飢餓は生じないようになっています。

こちらについては、sync.Mutexのソースコード中に記載されているコメントや以下の記事が参考になります。

おわりに

結果に対する考察に確証が持てなかったり、条件変数の実装を追いかけようとして途中で断念したことが残念ポイントでした。goroutine自体の設計思想や平行プログラミング一般についての前提知識が十分にないと厳しそうだと感じたので、ちゃんと勉強してから再チャレンジしたい所存です。

参考

本稿を書くにあたって参考にした書籍を記載します。なお、Webから直接参照可能な資料については、本文中にリンクを記載しているのでここでは割愛させていただきます。

[1] 河野健ニ: オペレーティングシステムの仕組み, 朝倉書店 (2007)
[2] Andrew S. Tanenbaum, Herbert Bos: Modern Operating Systems: Global Edition, 4th edition, PEARSON (2014)
[3] 高野祐輝: 平行プログラミング入門, オライリー・ジャパン (2021)

募集

現在、Wanoグループでは人材募集をしています。興味のある方は下記を参照してください。

14
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?