LoginSignup
0
0

More than 3 years have passed since last update.

【Scala】処理中のものを処理し終わった上で終了するProducer-Consumerパターン

Last updated at Posted at 2019-07-07

並行処理のデザインパターンでよくあるProducer-Consumerパターンですが、
このパターンに、タイトルの通りの動きのロジックを作成しましたので、紹介いたします。

コード

import java.util.concurrent.ArrayBlockingQueue

class ProducerConsumer[E <: AnyRef](producerConcurrentNumber: Int, producer: () => E, consumerConcurrentNumber: Int, consumer: E => Unit, sentinelObject: E) {
  @volatile private[this] var terminateProducer: Boolean = _

  private[this] val queue = new ArrayBlockingQueue[E](consumerConcurrentNumber * 2)

  private[this] val producerThreadList = List.fill(producerConcurrentNumber) {
    new Thread(() => {
      while (!terminateProducer) queue.put(producer())
    })
  }
  private[this] val consumerThreadList = List.fill(producerConcurrentNumber){
    new Thread(() => {
      //whileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました
      //var item: E = sentinelObject//ignore value
      //while( {item = queue.take();item} ne sentinelObject)
      //  consumer(item)
      @tailrec
      def consumeLoop(): Unit = {
        val item = queue.take()
        if(sentinelObject eq item) return
        consumer(item)
        consumeLoop()
      }
      consumeLoop()
    })
  }

  def start() {
    producerThreadList.foreach(_.start())
    consumerThreadList.foreach(_.start())
  }

  def stop() {
    terminateProducer = true
    producerThreadList.foreach(_.join())

    1 to producerConcurrentNumber foreach(_ => queue.put(sentinelObject))
    consumerThreadList.foreach(_.join())
  }
}

1秒間に64スレッドで生産して、128スレッドで消費するパターンでの使用例

使用例
import java.util.concurrent.TimeUnit

object Main extends App {
  import java.util.concurrent.atomic.AtomicLong
  val prodCnt = new AtomicLong()
  val consCnt = new AtomicLong()
  val cc: ProducerConsumer[String] = new ProducerConsumer(
    64,
    () =>
      "aaa"+prodCnt.getAndIncrement()
    ,
    128,
    _a => consCnt.getAndIncrement(),
    new String(""))//終了処理の判定だけに使う番兵オブジェクト
  cc.start()
  Thread.sleep(TimeUnit.SECONDS.toMillis(1))
  cc.stop()
  println("----- finish -----")
  println(f"生産した個数: ${prodCnt.get}%,d")
  println(f"消費した個数: ${consCnt.get}%,d")
  assert( prodCnt.get == consCnt.get)
}

解説

Producerの終了方法

terminateProducerをtrueにすると、whileループから外れて生産しなくなって終了
terminateProducerはマルチスレッドから呼ばれるため、@volatileをつけておいたが、
つけなくても終了処理はされるため、外してもよい。

Consumerの終了方法

Producerのスレッドが全部終了し、queueへの追加が行われなくなった後に、
番兵オブジェクト(sentinelObject)をConsumerのスレッド数分だけ投入する。

Consumerスレッドでは、sentinelObjectが来たら終了という判定を行っている。
番兵オブジェクト(sentinelObject)の判定は、!=ではダメでneで同じ参照値かのチェックで判定を行う。

その他

ConsumerスレッドのロジックがScalaっぽくないコードになってしまった。
Scalaでは、ローカル変数宣言時の初期値Must、代入式の結果を利用できないため…。
こういう場合だけは、Javaのほうが短く書ける。

こういうところでwhileを使うと微妙なようなので、再帰関数にして、それに末尾再帰最適化を効かせました。
たぶん、Scalaではこう書くもんなんだと思われる。

参考資料

番兵 - Wikipedia

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0