並行処理のデザインパターンでよくあるProducer-Consumerパターンですが、
このパターンに、タイトルの通りの動きのロジックを作成しましたので、紹介いたします。
コード
import java.util.concurrent.ArrayBlockingQueue
class ProducerConsumer[E <: AnyRef](producerConcurrentNumber: Int, producer: () => E, consumerConcurrentNumber: Int, consumer: E => Unit, sentinelObject: E) {
@volatile private[this] var terminateProducer: Boolean = _
private[this] val queue = new ArrayBlockingQueue[E](consumerConcurrentNumber * 2)
private[this] val producerThreadList = List.fill(producerConcurrentNumber) {
new Thread(() => {
while (!terminateProducer) queue.put(producer())
})
}
private[this] val consumerThreadList = List.fill(producerConcurrentNumber){
new Thread(() => {
//whileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました
//var item: E = sentinelObject//ignore value
//while( {item = queue.take();item} ne sentinelObject)
// consumer(item)
@tailrec
def consumeLoop(): Unit = {
val item = queue.take()
if(sentinelObject eq item) return
consumer(item)
consumeLoop()
}
consumeLoop()
})
}
def start() {
producerThreadList.foreach(_.start())
consumerThreadList.foreach(_.start())
}
def stop() {
terminateProducer = true
producerThreadList.foreach(_.join())
1 to producerConcurrentNumber foreach(_ => queue.put(sentinelObject))
consumerThreadList.foreach(_.join())
}
}
1秒間に64スレッドで生産して、128スレッドで消費するパターンでの使用例
使用例
import java.util.concurrent.TimeUnit
object Main extends App {
import java.util.concurrent.atomic.AtomicLong
val prodCnt = new AtomicLong()
val consCnt = new AtomicLong()
val cc: ProducerConsumer[String] = new ProducerConsumer(
64,
() =>
"aaa"+prodCnt.getAndIncrement()
,
128,
_a => consCnt.getAndIncrement(),
new String(""))//終了処理の判定だけに使う番兵オブジェクト
cc.start()
Thread.sleep(TimeUnit.SECONDS.toMillis(1))
cc.stop()
println("----- finish -----")
println(f"生産した個数: ${prodCnt.get}%,d")
println(f"消費した個数: ${consCnt.get}%,d")
assert( prodCnt.get == consCnt.get)
}
解説
Producerの終了方法
terminateProducer
をtrueにすると、whileループから外れて生産しなくなって終了
terminateProducer
はマルチスレッドから呼ばれるため、@volatile
をつけておいたが、
つけなくても終了処理はされるため、外してもよい。
Consumerの終了方法
Producerのスレッドが全部終了し、queueへの追加が行われなくなった後に、
番兵オブジェクト(sentinelObject
)をConsumerのスレッド数分だけ投入する。
Consumerスレッドでは、sentinelObject
が来たら終了という判定を行っている。
番兵オブジェクト(sentinelObject
)の判定は、!=
ではダメでne
で同じ参照値かのチェックで判定を行う。
その他
ConsumerスレッドのロジックがScalaっぽくないコードになってしまった。
Scalaでは、ローカル変数宣言時の初期値Must、代入式の結果を利用できないため…。
こういう場合だけは、Javaのほうが短く書ける。
こういうところでwhileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました。
たぶん、Scalaではこう書くもんなんだと思われる。