はじめに
N予備校でScalaの「並行処理プログラミング」を受講しています。
Future
非同期プログラミングにおいて、終了しているかどうかわからない処理結果を抽象化した型。
非同期に処理される結果が入ったOption型のようなもの。
Node.js
でいうところのPromise
に近い。
import scala.concurrent.Future
val s = "Hello"
val f: Future[String] = Future {
Thread.sleep(1000)
s + " future!"
}
implicit メソッド
import scala.concurrent.ExecutionContext.Implicits.global
関数型プログラミングとマルチスレッドの並行処理プログラミングを、暗黙的に結びつけるメソッド。
JavaのExecutorService
を自動実行してくれる。
スレッドプールの仕組みを意識せず並行処理を実装していくことができる。
Future シングルトンは関数を与えるとその関数を非同期に与える Future[+T] を返す
onComplete
val random = new Random()
val waitMaxMilliSec = 3000
val futureMilliSec: Future[Int] = Future {
val waitMilliSec = random.nextInt(waitMaxMilliSec);
if (waitMilliSec < 1000) throw new RuntimeException(s"waitMilliSec is ${waitMilliSec}")
Thread.sleep(waitMilliSec)
waitMilliSec
}
val futureSec: Future[Double] = futureMilliSec.map(i => i.toDouble / 1000)
futureSec onComplete {
case Success(waitSec) => println(s"Success! ${waitSec} sec")
case Failure(t) => println(s"Failure: ${t.getMessage}")
}
Success(value)
, Failure(throwable)
の形でパターンマッチが可能。
アクターモデル
ユーザー => メッセージ(仕事) => メールボックス => アクター
アクターにはそれぞれメールアドレス(パス)があり、パスに基づいてアクター同士がメッセージを送り合うこともできる。
Akka Actor
において、すべてのアクターは ActorSystem
という一番偉いオブジェクトによって管理されている。
また全てのアクターは、スーパーバイザー
というアクターの動作を監視するオブジェクトでもある。
通信エラーが発生した場合、スーパーバイザーがアクターの仕事を辞めさせ、接続を作り直して、アクターも作り直す。
import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import scala.concurrent.Await
import scala.concurrent.duration._
// メッセージを受け取るアクターを定義
class MyActor extends Actor {
// Logging インスタンス
val log = Logging(context.system, this)
// メッセージを処理する メソッド
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
object ActorStudy extends App {
// ActorSystem
val system = ActorSystem("actorSystem")
// Actor の作成
val myActor = system.actorOf(Props[MyActor], "myActor")
// メッセージ を送信
// ! は、内部的には tell メソッドへの転送メソッド
// myActor.tell("text", Actor.noSender) と同義
myActor ! "test"
myActor ! "hoge"
Thread.currentThread().join()
}