やりたいこと
大きな処理があるので、100個くらいのタスクに均等分割して、マルチコアで並列実行したい。
ただし、実行している途中で「やっぱやーめた」となることもあるので、そのときは実行全体をキャンセルしたい。
もっとちゃんとしたやり方があるのかもしれないけど、とりあえずうまくいったのでメモ程度に書いておきます。
プログラム
Application.scala
import akka.actor.ActorSystem
import akka.actor.Props
object Application {
def main(args: Array[String]) {
val system = ActorSystem("system")
val master = system.actorOf(Props[Master], "master")
master ! Start
Thread.sleep(500)
master ! Cancel // キャンセル!
system.awaitTermination()
}
}
Master.scala
import akka.actor.Actor
import akka.actor.Props
import akka.routing.RoundRobinRouter
class Master extends Actor {
private val coreNum = scala.collection.parallel.availableProcessors
private val router = RoundRobinRouter(nrOfInstances = coreNum)
private val worker = context.actorOf(Props[Worker].withRouter(router), "worker")
private var status: Status = Waiting
def receive = {
case Start => {
status = Running
(1 to 10).foreach { i =>
worker ! i
}
}
case Cancel => {
status = Canceled
}
case WhichStatus => {
sender ! status
}
}
}
Worker.scala
import akka.actor.Actor
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
class Worker extends Actor {
implicit val timeout = Timeout(5 seconds)
def receive = {
case i: Int => {
val future = sender ? WhichStatus
val status = Await.result(future, timeout.duration).asInstanceOf[Status]
if (status == Canceled) {
doNotWork(i)
} else {
doWork(i)
}
}
}
private def doWork(i: Int) {
println("Task" + i + ": Start")
Thread.sleep(1000)
println("Task" + i + ": Finished")
}
private def doNotWork(i: Int) {
println("Task" + i + ": Canceled!!!")
}
}
Message.scala
sealed trait Message
case object Start extends Message
case object Cancel extends Message
case object WhichStatus extends Message
Status.scala
sealed trait Status
case object Waiting extends Status
case object Running extends Status
case object Canceled extends Status
実行結果
キャンセルしない場合(4コア)
Task2: Start
Task1: Start
Task3: Start
Task4: Start
Task4: Finished
Task8: Start
Task1: Finished
Task3: Finished
Task5: Start
Task7: Start
Task2: Finished
Task6: Start
Task8: Finished
Task5: Finished
Task7: Finished
Task9: Start
Task6: Finished
Task10: Start
Task9: Finished
Task10: Finished
キャンセルした場合(4コア)
Task1: Start
Task3: Start
Task4: Start
Task2: Start
Task3: Finished
Task4: Finished
Task1: Finished
Task7: Canceled!!!
Task8: Canceled!!!
Task5: Canceled!!!
Task9: Canceled!!!
Task2: Finished
Task6: Canceled!!!
Task10: Canceled!!!