この記事は、[Fringe81 アドベントカレンダー2017](Fringe81 Advent Calendar 2017 - Qiita)の23日目です。
はじめに
いつもはScalaで広告配信サーバの開発を行っているichizinです。
Scala.jsを調べている中でMonixというライブラリを知ったので紹介します。
けっこう便利そうなので見てください!!
Monixとは
Monixは、非同期プログラミングのためのライブラリでScalaやScala.jsで使うことができます。
Monixは色々な機能があり、すべて取り上げるのはボリュームが大きすぎるので今回はTaskとSchedulerについて紹介したいと思います。
Task
ScalaのFutureは、未来の結果を保持する値であって、Futureを評価しようしているスレッドが既に開始済みで完了している可能性もあるしまだ終わってないかもしれなものであるのに対し、Taskは関数であり、こちらが実行を指定するまでは開始しません。
説明してもわかりづらいのでコードで説明します。
val future: Future[Int] = Future { println("Future"); 1 + 1 }
// Future
// 2
// 2
future.foreach(println)
future.foreach(println)
val task: Task[Int] = Task { println("Task"); 1 + 1 }
// Task
// 2
// Task
// 2
task.runAsync.foreach(println)
task.runAsync.foreach(println)
Futureは結果が最初の時に評価され、次呼ばれた時は結果がキャッシュされていることがわかります。
一方Taskは、実行が明示的に呼ばれた段階で都度評価されていることがわかります。
もちろん、下記をすることでTaskも1度の評価で判定することもできます。
val taskValOnce = Task.evalOnce{ println("taskValOnce"); 1 + 1 }
// taskValOnce
// 2
// 2
taskValOnce.runAsync.foreach(println)
taskValOnce.runAsync.foreach(println)
Taskは、Futureと違ってこちらが明示的に実行しない限り処理は開始しません。
上記ではrunAsync
で実行開始しています。
また遅延実行を行うこともできます。
val task = Task.eval("Hi, Monix!").delayExecution(3.seconds)
// 3秒間待ってから処理実行
task.runAsync.foreach(println)
runAsync
などのメソッドで実行するとCancelableFuture[A]
というCancelが可能なFuture型で返ってきます。以下のようにキャンセルするとGCの対象になります。
val cancelable: CancelableFuture[String] = Task.eval("CancelTask").delayExecution(10.seconds).runAsync
TimeUnit.SECONDS.sleep(5)
cancelable.cancel
println("task cancel")
実行した結果を Callback
関数に渡すこともできます。
処理結果のインターフェースを変えたい時などにもってこいですね。
val task = Task( 1 + 1 )
val callback = new Callback[Int] {
override def onSuccess(value: Int): Unit = println(value)
override def onError(ex: Throwable): Unit = ex.printStackTrace()
}
task.map(i => i * 4).runAsync(callback)
なんとなくTask便利そうな雰囲気は伝わってきましたかね?
Taskにはその他にも色々なメソッドがあってそれらを組み合わせることで非同期処理をより扱いやすくなります。
エラーハンドリングもよくできています。
例えば、何か外部に値を取得にいったが失敗してリトライしたいことってよくあると思います。
そんな時は、こんな感じに書けます。
val retryTask = Task {
val i = Random.nextInt(10)
println(i)
if (i < 3) { "Success!!"
} else throw new IllegalStateException(i.toString)
}
retryTask.onErrorRestart(5).runAsync.foreach(println)
TimeUnit.SECONDS.sleep(3)
公式ドキュメントにも載っていますが、間隔あけてリトライも簡単に実装できます
val retryTask = Task {
val i = Random.nextInt(10)
println(i)
if (i < 3) { "Success!!"
} else throw new IllegalStateException(i.toString)
}
retryBackoff(retryTask, 5, new FiniteDuration(2, TimeUnit.SECONDS)).runAsync.foreach(println)
def retryBackoff[A](source: Task[A],
maxRetries: Int, firstDelay: FiniteDuration): Task[A] = {
source.onErrorHandleWith {
case ex: Exception =>
if (maxRetries > 0)
// Recursive call, it's OK as Monix is stack-safe
retryBackoff(source, maxRetries - 1, firstDelay * 2)
.delayExecution(firstDelay)
else
Task.raiseError(ex)
}
}
Task色々使えそうですね!
Scheduler
MonixはScheduler機能も提供しています。Akka Schedulerを私はよく使うのですが。Akka SchedulerはAkkaに依存しているため、とてもheavyだがMonixはサードパーティに依存していないというところがポイントみたいです。Scala.jsを使う時はいいかもしれませんね。
訂正
Monixはサードパーティに依存していない
誤りのようです。コメントでxuwei_kさんが指摘してくださいました。
xuwei_kさん、ありがとうございます!
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.{global => scheduler}
import scala.concurrent.duration._
object SchedulerSample1 extends App{
val cancelable: Cancelable = scheduler.scheduleOnce(2.seconds) { println("Monix Scheduler") }
// cancelable.cancel()
TimeUnit.SECONDS.sleep(5)
}
2秒後に”Monix Scheduler”と出力する場合の例です。
まずSchedulerを実行するためには、import monix.execution.Scheduler.global
をimportする必要があります。ここではglobalをschedulerの名前に置き換えてます。
どのどれくらいスレッドプールを使うかを指定することができます。デフォルトでは、プロセッサの数からスレッド数を決めて使用しているようです。このあたりについてはまた今度記事を上げようと思います。
scheduler.scheduleOnce
は、メソッド名から分かる通り、1回だけ実行するメソッドです。返り値にCancelable
を受け取ります。お察しの通り、スケジューラのキャンセル処理ができます。
下記は定期実行する際の例です。
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.{global => schduler}
object SchedulerSample2 extends App {
val cancelable = schduler.scheduleWithFixedDelay(
0, 2, TimeUnit.SECONDS,
new Runnable {
def run(): Unit = {
println("start")
TimeUnit.SECONDS.sleep(2)
println("end")
}
})
TimeUnit.SECONDS.sleep(10)
cancelable.cancel()
}
このスケジューラの良い点は、スケジューラ処理を開始しようした際に処理を待ってくれるところです。
Akka Schedulerは私の知るかぎり自分で制御するように実装しなければいけなかったと思います(詳しい人教えてください〜)
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.{global => schduler}
object SchedulerSample3 extends App {
// val cancelable = schduler.scheduleWithFixedDelay(
// 0, 2, TimeUnit.SECONDS,
// new Runnable {
// def run(): Unit = {
// println("start")
// TimeUnit.SECONDS.sleep(4)
// println("end")
// }
// })
val cancelable = schduler.scheduleAtFixedRate(
0, 2, TimeUnit.SECONDS,
new Runnable {
def run(): Unit = {
println("start")
TimeUnit.SECONDS.sleep(4)
println("end")
}
})
TimeUnit.SECONDS.sleep(30)
cancelable.cancel()
}
scheduleWithFixedDelay
で実行すると前の処理が完了してから2秒待ってから実行するのに対し、scheduleAtFixedRate
は2秒おきに実行しますが、処理中の場合はその処理が終わってから実行されます。
便利ですね!!
さいごに
MonixのTaskとSchedulerについて紹介しました。
Monixはまだまだ紹介できない機能があります。例えばObserver
はback-presureが効くノンブロッキングストリーミング処理が可能です。これについて書こう思った時にAkkaStream比べて(パファーマンスも含めて)書きたいなと思って今回は力つきました。。。
他にもCircuit Breaker
の機能があったり(Akkaに強く影響されているなー)、lazy val
やCall-by-name
の代わりになるCoeval
、スレッド間で共有したいmutableな変数を作成することができるMVar
などなどあります。
Monixについて調べてみて、エラーハンドリングに優れている点がとてもいいなと感じています。非同期処理をより使いやすくする道具としてMonixはいいライブラリだと思いましたー。
今回のサンプルコード
https://github.com/ichizin/MonixSample