Edited at
Fringe81Day 23

非同期処理を便利にするMonix

More than 1 year has passed since last update.

この記事は、Fringe81 アドベントカレンダー2017の23日目です。


はじめに

いつもはScalaで広告配信サーバの開発を行っているichizinです。

Scala.jsを調べている中でMonixというライブラリを知ったので紹介します。

けっこう便利そうなので見てください!!


Monixとは

Monixは、非同期プログラミングのためのライブラリでScalaやScala.jsで使うことができます。

公式ドキュメント

Monixは色々な機能があり、すべて取り上げるのはボリュームが大きすぎるので今回はTaskとSchedulerについて紹介したいと思います。


Task

ScalaのFutureは、未来の結果を保持する値であって、Futureを評価しようしているスレッドが既に開始済みで完了している可能性もあるしまだ終わってないかもしれなものであるのに対し、Taskは関数であり、こちらが実行を指定するまでは開始しません。

説明してもわかりづらいのでコードで説明します。

  val future: Future[Int] = Future { println("Future");  1 + 1 }

// Future
// 2
// 2
future.foreach(println)
future.foreach(println)

val task: Task[Int] = Task { println("Task"); 1 + 1 }

// Task
// 2
// Task
// 2
task.runAsync.foreach(println)
task.runAsync.foreach(println)

Futureは結果が最初の時に評価され、次呼ばれた時は結果がキャッシュされていることがわかります。

一方Taskは、実行が明示的に呼ばれた段階で都度評価されていることがわかります。

もちろん、下記をすることでTaskも1度の評価で判定することもできます。

  val taskValOnce = Task.evalOnce{ println("taskValOnce"); 1 + 1  }

// taskValOnce
// 2
// 2
taskValOnce.runAsync.foreach(println)
taskValOnce.runAsync.foreach(println)

Taskは、Futureと違ってこちらが明示的に実行しない限り処理は開始しません。

上記ではrunAsyncで実行開始しています。

また遅延実行を行うこともできます。

  val task = Task.eval("Hi, Monix!").delayExecution(3.seconds)

// 3秒間待ってから処理実行
task.runAsync.foreach(println)

runAsyncなどのメソッドで実行するとCancelableFuture[A]というCancelが可能なFuture型で返ってきます。以下のようにキャンセルするとGCの対象になります。

  val cancelable: CancelableFuture[String] =  Task.eval("CancelTask").delayExecution(10.seconds).runAsync

TimeUnit.SECONDS.sleep(5)
cancelable.cancel
println("task cancel")

実行した結果を Callback関数に渡すこともできます。

処理結果のインターフェースを変えたい時などにもってこいですね。

  val task = Task( 1 + 1 )

val callback = new Callback[Int] {
override def onSuccess(value: Int): Unit = println(value)

override def onError(ex: Throwable): Unit = ex.printStackTrace()
}

task.map(i => i * 4).runAsync(callback)

なんとなくTask便利そうな雰囲気は伝わってきましたかね?

Taskにはその他にも色々なメソッドがあってそれらを組み合わせることで非同期処理をより扱いやすくなります。

エラーハンドリングもよくできています。

例えば、何か外部に値を取得にいったが失敗してリトライしたいことってよくあると思います。

そんな時は、こんな感じに書けます。

  val retryTask = Task {

val i = Random.nextInt(10)

println(i)

if (i < 3) { "Success!!"
} else throw new IllegalStateException(i.toString)
}

retryTask.onErrorRestart(5).runAsync.foreach(println)

TimeUnit.SECONDS.sleep(3)

公式ドキュメントにも載っていますが、間隔あけてリトライも簡単に実装できます

  val retryTask = Task {

val i = Random.nextInt(10)

println(i)

if (i < 3) { "Success!!"
} else throw new IllegalStateException(i.toString)
}

retryBackoff(retryTask, 5, new FiniteDuration(2, TimeUnit.SECONDS)).runAsync.foreach(println)

def retryBackoff[A](source: Task[A],
maxRetries: Int, firstDelay: FiniteDuration): Task[A] = {
source.onErrorHandleWith {
case ex: Exception =>
if (maxRetries > 0)
// Recursive call, it's OK as Monix is stack-safe
retryBackoff(source, maxRetries - 1, firstDelay * 2)
.delayExecution(firstDelay)
else
Task.raiseError(ex)
}
}

Task色々使えそうですね!


Scheduler

MonixはScheduler機能も提供しています。Akka Schedulerを私はよく使うのですが。Akka SchedulerはAkkaに依存しているため、とてもheavyだがMonixはサードパーティに依存していないというところがポイントみたいです。Scala.jsを使う時はいいかもしれませんね。

訂正


Monixはサードパーティに依存していない


誤りのようです。コメントでxuwei_kさんが指摘してくださいました。

xuwei_kさん、ありがとうございます!

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.{global => scheduler}
import scala.concurrent.duration._

object SchedulerSample1 extends App{

val cancelable: Cancelable = scheduler.scheduleOnce(2.seconds) { println("Monix Scheduler") }

// cancelable.cancel()

TimeUnit.SECONDS.sleep(5)
}

2秒後に”Monix Scheduler”と出力する場合の例です。

まずSchedulerを実行するためには、import monix.execution.Scheduler.globalをimportする必要があります。ここではglobalをschedulerの名前に置き換えてます。

どのどれくらいスレッドプールを使うかを指定することができます。デフォルトでは、プロセッサの数からスレッド数を決めて使用しているようです。このあたりについてはまた今度記事を上げようと思います。

scheduler.scheduleOnce は、メソッド名から分かる通り、1回だけ実行するメソッドです。返り値にCancelableを受け取ります。お察しの通り、スケジューラのキャンセル処理ができます。

下記は定期実行する際の例です。


import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.{global => schduler}

object SchedulerSample2 extends App {

val cancelable = schduler.scheduleWithFixedDelay(
0, 2, TimeUnit.SECONDS,
new Runnable {
def run(): Unit = {
println("start")
TimeUnit.SECONDS.sleep(2)
println("end")
}
})

TimeUnit.SECONDS.sleep(10)
cancelable.cancel()
}

このスケジューラの良い点は、スケジューラ処理を開始しようした際に処理を待ってくれるところです。

Akka Schedulerは私の知るかぎり自分で制御するように実装しなければいけなかったと思います(詳しい人教えてください〜)

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.{global => schduler}

object SchedulerSample3 extends App {
// val cancelable = schduler.scheduleWithFixedDelay(
// 0, 2, TimeUnit.SECONDS,
// new Runnable {
// def run(): Unit = {
// println("start")
// TimeUnit.SECONDS.sleep(4)
// println("end")
// }
// })

val cancelable = schduler.scheduleAtFixedRate(
0, 2, TimeUnit.SECONDS,
new Runnable {
def run(): Unit = {
println("start")
TimeUnit.SECONDS.sleep(4)
println("end")
}
})

TimeUnit.SECONDS.sleep(30)
cancelable.cancel()
}

scheduleWithFixedDelayで実行すると前の処理が完了してから2秒待ってから実行するのに対し、scheduleAtFixedRateは2秒おきに実行しますが、処理中の場合はその処理が終わってから実行されます。

便利ですね!!


さいごに

MonixのTaskとSchedulerについて紹介しました。

Monixはまだまだ紹介できない機能があります。例えばObserverはback-presureが効くノンブロッキングストリーミング処理が可能です。これについて書こう思った時にAkkaStream比べて(パファーマンスも含めて)書きたいなと思って今回は力つきました。。。

他にもCircuit Breakerの機能があったり(Akkaに強く影響されているなー)、lazy valCall-by-nameの代わりになるCoeval、スレッド間で共有したいmutableな変数を作成することができるMVarなどなどあります。

Monixについて調べてみて、エラーハンドリングに優れている点がとてもいいなと感じています。非同期処理をより使いやすくする道具としてMonixはいいライブラリだと思いましたー。

今回のサンプルコード

https://github.com/ichizin/MonixSample