-
Monixとは
- Scala及びScala.jsの非同期ライブラリ
-
- 遅延評価かつ非同期な計算を扱ってくれるもの
- つまりScala標準のFutureのようなもの
Futureの使いにくいところ
-
最低限の機能しか提供されてない
- sleepやcancelがない
- 結果Thread.sleepをつい書いてしまったり
- Scala 2.12で多少はましになったが部分もあるが。。
-
ほぼ全てのメソッドが呼び出す度にExecutionContextを渡す必要がある
- .map毎にスレッドプールが処理 効率的に微妙(?)
- 抽象化がとても辛い
_人人人人人_
> Monix Task <
 ̄Y^Y^^Y^Y^Y ̄
例)
1+1
を計算後一秒待ち1を足す。
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
val task = Task(1 + 1).delayResult(1.second).map(_ + 1)
task.runSyncUnsafe() //3
Futureと違うところ
- Futureは呼び出し後、即非同期計算を行い結果を保時する mapやflatMapを呼ぶ度にExecutionContextが必要。
- Taskはrunするまで計算せず結果はキャッシュしない runする時に渡したSchedulerを使う
- SchedulerはExecutionContextを継承したもの
- 効率のためmapやflatMapの度に非同期タスクを作らず複数のタスクを同期バッチで実行する
即時評価 | 遅延評価 | |
---|---|---|
同期 | A | () => A |
非同期 | Future[A] | Task[A] |
即時で非同期なのがFuture
遅延かつ非同期なのがTask
Scheduler
FutureでいうところのExecutionContext
実際 ExecutionContextを継承している
作成にはExecutionContextまたは、javaのExecutorServiceを使う
JVMのデフォルトではタスクの実行には
ScalaのExecutionContext.global
タスクのスケジュールには
Executors.newSingleThreadScheduledExecutor
を使っている
JSはsetTimeoutでまかなっている。
// 大体こんな感じ
trait Scheduler extends ExecutionContext with UncaughtExceptionReporter with Executor {
def execute(command: Runnable): Unit
def reportFailure(t: Throwable): Unit
def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable
def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable
def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable
def clockRealTime(unit: TimeUnit): Long
def clockMonotonic(unit: TimeUnit): Long
def executionModel: ExecutionModel
}
ExecutionModel
Schedulerに渡す、 Taskをどのくらいまとめて実行するかを指定するもの
FutureではmapやflatMapをする度にスレッドプールにタスクを送るがこれは効率が悪い
- SynchronousExecution
- 全て一つのタスクとして扱うテスト以外使わないだろう
- AlwaysAsyncExecution
- Futureと同じで常に非同期 - BatchedExecution
- 渡せるのは2の累乗のみでそれ以外渡すと2の累乗に丸められる。デフォルトはJVMでは1024(大きくない?) JSでは512
だだし Task(1).map(_ + 1).map(_.toString)
のようなmapをチェーンは上とはまた別に一つのMapとしてまとめて実行される。
デフォルトでmax127個
MonixのTaskの機能
Taskの作成
- Task.apply
- 遅延評価だが非同期ではない 何故
- Task.evalAsync
- 遅延かつ非同期で
- Task.now
- 即実行
- Task.raiseError
- 与えた例外で失敗したタスクを作成
- Task.never
- 完了しないTask作成
- Task.fromTry
- Tryから作成
- Task.fromFuture
- futureから作成
- ただしFutureが開始されているので遅延評価ではない
- deferFutureAction
-
Scheduler => Future[A]
を受け取る - 遅延評価
-
- など
Taskの実行と完了
FutureがExecutionと同じくTaskの実行にSchedulerを渡す必要がある。
結果は
- Futureで返すか
runToFuture
- cancelできるCancelableFutureを返す
- 結果をブロックして待つ
- runSyncUnsafe
- JSからはこの関数は呼べない
- 非同期に実行し結果を待たない
- runAsync
の三方法がある.
副作用があるメソッドにはUnsafeBecauseImpureアノテーション
スレッドをブロックするメソッドにはUnsafeBecauseBlockingアノテーション
が付いているので区別は容易
sleep系
- Task.sleep
- Futureに欲しかったやつ
- スレッドプールにあるスレッドをブロックせずに指定時間待つ
(for {
a <- Task(1)
_ <- Task.sleep(1.second)
b <- Task(1)
} yield a + b).runSyncUnsafe() // 2
- delayExecution
- Taskの実行前に指定時間待つ
Task(heavyCalculation).delayExecution(1.second)
- delayResult
- 実行はするが結果を指定時間まつ
上はこう書ける
Task(1).delayResult(1.second).map(_ + 1)
Timeout系
これもFutureに欲しいなぁ
- timeout
- 所定の時間内に終了しないとエラー
- timeoutTo
- 所定の時間内に終了しない場合渡したTaskが実行される
- timeoutL
- 所定の時間内に終了しないとエラーだが所定の時間自体がTaskとして渡される
- ただし時間の計算後からの時間ではなくtotalでかかった計算
Task.never.timeout(1.second).runUnsafeSync() //TimeoutException!
Task.never.timeout(1.second, Task(1)).runUnsafeSync() // 1
Task.never.timeoutL(Task(1.second).delayExecution(1.second)) //2秒ではなく1秒後にTimeoutException
Restart
エラー発生時の再実行
成功するまで繰り返し外部APIを叩き続きけないといけないパターンとか
代表的なもの
- restartUntil
- 渡した関数がtrueになるまで繰り返す
- onErrorRestart
- エラー時渡したmax回数繰り返し実行する
- onErrorRestartLoop
- もっと複雑なパターンをかきたい時、指定回数実行
Task(api.getRequest()).restartUntil(_.isSuccess) //成功するまで実行する
Task(api.getRequest()).onErrorRestart(10) //10回まで失敗しても再実行
Task(api.getRequest()).onErrorRestartLoop(10) { (err, maxRetries, retry) =>
if (maxRetries > 0)
// 一秒後、再実行
retry(maxRetries - 1).delayExecution(1.second)
else
// 10回 実行したのでエラーを返す
Task.raiseError(err)
}
複数のタスクを扱う1
- map2, map3,...map6
- 複数のTaskを実行するが渡したTaskの左から一つずつ実行
- sequence
-
Iterable[Task[A]]
をTask[Iterable[A]]
に変換するが左が順番に実行
-
- mapBoth, parMap2, ... parMap6
- 複数のTaskを並列に実行
- どれかが失敗した場合残りのTaskはcancelされる
- gather
-
Iterable[Task[A]]
をTask[Iterable[A]]
に変換し並列に実行 - 結果のリストは渡したリストの順番と同じ
-
- gatherUnordered
-
Iterable[Task[A]]
をTask[List[A]]
に変換し並列に実行 - 結果のリストはTaskが終了した順に構築(つまり終了順序と逆)
-
Task.map2(Task(1), Task(1))(_ + _)
Task.parMap2(Task(1), Task(1))(_ + _)
Task.sequence(Task(1) :: Task(2) :: Nil)// Task(List(1, 2))
Task.gather(Task(1) :: Task(2) :: Nil)// Task(List(1, 2))
Task.gatherUnordered(Task(1).delayExecution(1.second) :: Task(2) :: Nil)// Task(List(2, 1))
どこかのTaskが失敗した場合、残りのTaskはキャンセルされる
複数のタスクを扱う2
- race
- 二つTaskを走らせ、先に完了したものを返す。 残りはキャンセル
- raceMany
- 複数のTaskを走らせ、最初に完了したものを返す。 残りはキャンセル
- racePair
- 二つTaskを走らせ、先に完了したものを返すが残りをキャンセルするかどうかを選べる
Task.race(Task(1), Task("slow").delayExecution(1.second)).runSyncUnsafe() //Left(1)
Task.raceMany(Task(1) :: Task(2).delayExecution(1.second)).runSyncUnsafe() //1
Task.racePair(Task(1), Task("slow").delayExecution(1.second)).map {
case Left(a, taskB) => a -> taskB.join
case Right(taskA, b) => taskA.cancel -> b
}.runSyncUnsafe() //(1, "slow")
cancel
sleep中のTaskやスレッドプールのキューに置かれる前のTask(ここら辺あまり自信がない)はキャンセルする事ができる
TaskのcancelはrunToFutureが返すCancelableFutureやrunAsyncが返すCancelableのcancelを呼ぶとcancel出来る他
Fiberという仕組みを用いてTaskの中でもcancel出来る
Fiberを使うためにはstartメソッドを呼びそれに対してcancelを行う
(for {
fa <- Task(1).delayExecution(5.seconds).start
_ <- fa.cancel.delayExecution(1.second) //1秒後cancelされる
a <- fa.join //結果を待つ
} yield a).runSyncUnsafe() // cancelされたので永遠に完了しない
- onCancelRaiseError
- cancelされたら例外を発生させる
リソースの解放
Scalaではリソースの解放をfinally節やLoanパターンを使う事が多い?が非同期ではうまく行かない
val fs = new FileInputStream(file)
try {
Future(fs.read())
} finally {
fis.close() // まだFutureが完了していない!
}
これもMonix Taskはサポートしている
Task(new FileInputStream(file)).bracket { fs =>
Task(fs.read())
} { fs =>
Task(fs.close()) //リソース解放!
}
catsサポート
Monixはcats-effect(catsで副作用扱うやつ)に依存しているので
catsに対応している。
例えばさっきのgatherはcatsで
List(Task(1), Task(2)).parSequence
と書ける
その他
- timed
- Taskがかかった時間を計測する。 精度はnano秒
Task(1 + 1).timed {
case (duration, _) => duration.toMillis
}.runSyncUnsafe // millsecond
紹介したものはMonix Taskの機能の一部分なので興味が出たら追ってみて下さい。