LoginSignup
8
8

More than 5 years have passed since last update.

メモ:Akka でオレオレ「キャンセルパターン」

Last updated at Posted at 2013-05-20

やりたいこと

大きな処理があるので、100個くらいのタスクに均等分割して、マルチコアで並列実行したい。
ただし、実行している途中で「やっぱやーめた」となることもあるので、そのときは実行全体をキャンセルしたい。
もっとちゃんとしたやり方があるのかもしれないけど、とりあえずうまくいったのでメモ程度に書いておきます。

プログラム

Application.scala
import akka.actor.ActorSystem
import akka.actor.Props

object Application {
  def main(args: Array[String]) {
    val system = ActorSystem("system")
    val master = system.actorOf(Props[Master], "master")

    master ! Start

    Thread.sleep(500)

    master ! Cancel   // キャンセル!

    system.awaitTermination()
  }
}
Master.scala
import akka.actor.Actor
import akka.actor.Props
import akka.routing.RoundRobinRouter

class Master extends Actor {
  private val coreNum = scala.collection.parallel.availableProcessors
  private val router = RoundRobinRouter(nrOfInstances = coreNum)
  private val worker = context.actorOf(Props[Worker].withRouter(router), "worker")
  private var status: Status = Waiting

  def receive = {
    case Start => {
      status = Running
      (1 to 10).foreach { i =>
        worker ! i
      }
    }
    case Cancel => {
      status = Canceled
    }
    case WhichStatus => {
      sender ! status
    }
  }
}
Worker.scala
import akka.actor.Actor
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

class Worker extends Actor {
  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case i: Int => {
      val future = sender ? WhichStatus
      val status = Await.result(future, timeout.duration).asInstanceOf[Status]
      if (status == Canceled) {
        doNotWork(i)
      } else {
        doWork(i)
      }
    }
  }
  private def doWork(i: Int) {
    println("Task" + i + ": Start")
    Thread.sleep(1000)
    println("Task" + i + ": Finished")
  }
  private def doNotWork(i: Int) {
    println("Task" + i + ": Canceled!!!")
  }
}
Message.scala
sealed trait Message
case object Start extends Message
case object Cancel extends Message
case object WhichStatus extends Message
Status.scala
sealed trait Status
case object Waiting extends Status
case object Running extends Status
case object Canceled extends Status

実行結果

キャンセルしない場合(4コア)

Task2: Start
Task1: Start
Task3: Start
Task4: Start
Task4: Finished
Task8: Start
Task1: Finished
Task3: Finished
Task5: Start
Task7: Start
Task2: Finished
Task6: Start
Task8: Finished
Task5: Finished
Task7: Finished
Task9: Start
Task6: Finished
Task10: Start
Task9: Finished
Task10: Finished

キャンセルした場合(4コア)

Task1: Start
Task3: Start
Task4: Start
Task2: Start
Task3: Finished
Task4: Finished
Task1: Finished
Task7: Canceled!!!
Task8: Canceled!!!
Task5: Canceled!!!
Task9: Canceled!!!
Task2: Finished
Task6: Canceled!!!
Task10: Canceled!!!
8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8