5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Monix Task

Last updated at Posted at 2019-10-08
1 / 23
  • Monixとは

    • Scala及びScala.jsの非同期ライブラリ
  • Monix-Task

    • 遅延評価かつ非同期な計算を扱ってくれるもの
    • つまりScala標準のFutureのようなもの

Futureの使いにくいところ

  • 最低限の機能しか提供されてない

    • sleepやcancelがない
    • 結果Thread.sleepをつい書いてしまったり
    • Scala 2.12で多少はましになったが部分もあるが。。
  • ほぼ全てのメソッドが呼び出す度にExecutionContextを渡す必要がある

    • .map毎にスレッドプールが処理 効率的に微妙(?)
    • 抽象化がとても辛い

    _人人人人人_
    > Monix Task <
     ̄Y^Y^^Y^Y^Y ̄


例)
1+1を計算後一秒待ち1を足す。

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

val task = Task(1 + 1).delayResult(1.second).map(_ + 1)
task.runSyncUnsafe() //3

Futureと違うところ

  • Futureは呼び出し後、即非同期計算を行い結果を保時する mapやflatMapを呼ぶ度にExecutionContextが必要。
  • Taskはrunするまで計算せず結果はキャッシュしない runする時に渡したSchedulerを使う
    • SchedulerはExecutionContextを継承したもの
    • 効率のためmapやflatMapの度に非同期タスクを作らず複数のタスクを同期バッチで実行する

即時評価 遅延評価
同期 A () => A
非同期 Future[A] Task[A]

即時で非同期なのがFuture
遅延かつ非同期なのがTask


Scheduler

FutureでいうところのExecutionContext
実際 ExecutionContextを継承している
作成にはExecutionContextまたは、javaのExecutorServiceを使う
JVMのデフォルトではタスクの実行には
ScalaのExecutionContext.global
タスクのスケジュールには
Executors.newSingleThreadScheduledExecutor を使っている
JSはsetTimeoutでまかなっている。


// 大体こんな感じ
trait Scheduler extends ExecutionContext with UncaughtExceptionReporter with Executor {
  
  def execute(command: Runnable): Unit

  def reportFailure(t: Throwable): Unit

  def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable

  def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable

  def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable

  def clockRealTime(unit: TimeUnit): Long

  def clockMonotonic(unit: TimeUnit): Long

  def executionModel: ExecutionModel
}

ExecutionModel

Schedulerに渡す、 Taskをどのくらいまとめて実行するかを指定するもの
FutureではmapやflatMapをする度にスレッドプールにタスクを送るがこれは効率が悪い

  • SynchronousExecution
    • 全て一つのタスクとして扱うテスト以外使わないだろう
  • AlwaysAsyncExecution
     - Futureと同じで常に非同期
  • BatchedExecution
    • 渡せるのは2の累乗のみでそれ以外渡すと2の累乗に丸められる。デフォルトはJVMでは1024(大きくない?) JSでは512

だだし Task(1).map(_ + 1).map(_.toString) のようなmapをチェーンは上とはまた別に一つのMapとしてまとめて実行される。
デフォルトでmax127個


MonixのTaskの機能


Taskの作成

  • Task.apply
    • 遅延評価だが非同期ではない 何故
  • Task.evalAsync
    • 遅延かつ非同期で
  • Task.now
    • 即実行
  • Task.raiseError
    • 与えた例外で失敗したタスクを作成
  • Task.never
    • 完了しないTask作成
  • Task.fromTry
    • Tryから作成
  • Task.fromFuture
    • futureから作成
    • ただしFutureが開始されているので遅延評価ではない
  • deferFutureAction
    • Scheduler => Future[A] を受け取る
    • 遅延評価
  • など

Taskの実行と完了

FutureがExecutionと同じくTaskの実行にSchedulerを渡す必要がある。
結果は

  • Futureで返すか
    • runToFuture
    • cancelできるCancelableFutureを返す
  • 結果をブロックして待つ
    • runSyncUnsafe
    • JSからはこの関数は呼べない
  • 非同期に実行し結果を待たない
    • runAsync

の三方法がある.
副作用があるメソッドにはUnsafeBecauseImpureアノテーション
スレッドをブロックするメソッドにはUnsafeBecauseBlockingアノテーション
が付いているので区別は容易


sleep系

  • Task.sleep
    • Futureに欲しかったやつ
    • スレッドプールにあるスレッドをブロックせずに指定時間待つ
(for {
 a <- Task(1)
 _ <- Task.sleep(1.second)
 b <- Task(1)
} yield a + b).runSyncUnsafe() // 2
  • delayExecution
    • Taskの実行前に指定時間待つ
Task(heavyCalculation).delayExecution(1.second)
  • delayResult
    • 実行はするが結果を指定時間まつ

上はこう書ける

Task(1).delayResult(1.second).map(_ + 1)

Timeout系

これもFutureに欲しいなぁ

  • timeout
    • 所定の時間内に終了しないとエラー
  • timeoutTo
    • 所定の時間内に終了しない場合渡したTaskが実行される
  • timeoutL
    • 所定の時間内に終了しないとエラーだが所定の時間自体がTaskとして渡される
    • ただし時間の計算後からの時間ではなくtotalでかかった計算
Task.never.timeout(1.second).runUnsafeSync() //TimeoutException!

Task.never.timeout(1.second, Task(1)).runUnsafeSync() // 1

Task.never.timeoutL(Task(1.second).delayExecution(1.second)) //2秒ではなく1秒後にTimeoutException

Restart

エラー発生時の再実行
成功するまで繰り返し外部APIを叩き続きけないといけないパターンとか
代表的なもの

  • restartUntil
    • 渡した関数がtrueになるまで繰り返す
  • onErrorRestart
    • エラー時渡したmax回数繰り返し実行する
  • onErrorRestartLoop
    • もっと複雑なパターンをかきたい時、指定回数実行
Task(api.getRequest()).restartUntil(_.isSuccess) //成功するまで実行する

Task(api.getRequest()).onErrorRestart(10) //10回まで失敗しても再実行

Task(api.getRequest()).onErrorRestartLoop(10) { (err, maxRetries, retry) =>
   if (maxRetries > 0)
     // 一秒後、再実行
     retry(maxRetries - 1).delayExecution(1.second)
   else
     // 10回 実行したのでエラーを返す
     Task.raiseError(err)
}

複数のタスクを扱う1

  • map2, map3,...map6
    • 複数のTaskを実行するが渡したTaskの左から一つずつ実行
  • sequence
    • Iterable[Task[A]]Task[Iterable[A]]に変換するが左が順番に実行
  • mapBoth, parMap2, ... parMap6
    • 複数のTaskを並列に実行
    • どれかが失敗した場合残りのTaskはcancelされる
  • gather
    • Iterable[Task[A]]Task[Iterable[A]]に変換し並列に実行
    • 結果のリストは渡したリストの順番と同じ
  • gatherUnordered
    • Iterable[Task[A]]Task[List[A]]に変換し並列に実行
    • 結果のリストはTaskが終了した順に構築(つまり終了順序と逆)
Task.map2(Task(1), Task(1))(_ + _)
Task.parMap2(Task(1), Task(1))(_ + _)  

Task.sequence(Task(1) :: Task(2) :: Nil)// Task(List(1, 2))
Task.gather(Task(1) :: Task(2) :: Nil)// Task(List(1, 2))
Task.gatherUnordered(Task(1).delayExecution(1.second) :: Task(2) :: Nil)// Task(List(2, 1))

どこかのTaskが失敗した場合、残りのTaskはキャンセルされる


複数のタスクを扱う2

  • race
    • 二つTaskを走らせ、先に完了したものを返す。 残りはキャンセル
  • raceMany
    • 複数のTaskを走らせ、最初に完了したものを返す。 残りはキャンセル
  • racePair
    • 二つTaskを走らせ、先に完了したものを返すが残りをキャンセルするかどうかを選べる
Task.race(Task(1), Task("slow").delayExecution(1.second)).runSyncUnsafe() //Left(1)
Task.raceMany(Task(1) :: Task(2).delayExecution(1.second)).runSyncUnsafe() //1
Task.racePair(Task(1), Task("slow").delayExecution(1.second)).map { 
  case Left(a, taskB) => a -> taskB.join
  case Right(taskA, b) => taskA.cancel -> b
}.runSyncUnsafe() //(1, "slow")

cancel

sleep中のTaskやスレッドプールのキューに置かれる前のTask(ここら辺あまり自信がない)はキャンセルする事ができる
TaskのcancelはrunToFutureが返すCancelableFutureやrunAsyncが返すCancelableのcancelを呼ぶとcancel出来る他
Fiberという仕組みを用いてTaskの中でもcancel出来る
Fiberを使うためにはstartメソッドを呼びそれに対してcancelを行う

(for {
  fa <- Task(1).delayExecution(5.seconds).start
  _ <- fa.cancel.delayExecution(1.second) //1秒後cancelされる
  a <- fa.join //結果を待つ
} yield a).runSyncUnsafe() // cancelされたので永遠に完了しない
  • onCancelRaiseError
  • cancelされたら例外を発生させる

リソースの解放

Scalaではリソースの解放をfinally節やLoanパターンを使う事が多い?が非同期ではうまく行かない

val fs = new FileInputStream(file)
try {
  Future(fs.read())
} finally {
  fis.close() // まだFutureが完了していない!
}

これもMonix Taskはサポートしている

Task(new FileInputStream(file)).bracket { fs =>
  Task(fs.read())
} { fs =>
  Task(fs.close()) //リソース解放!
}

catsサポート

Monixはcats-effect(catsで副作用扱うやつ)に依存しているので
catsに対応している。

例えばさっきのgatherはcatsで

List(Task(1), Task(2)).parSequence

と書ける


その他

  • timed
  • Taskがかかった時間を計測する。 精度はnano秒
Task(1 + 1).timed { 
  case (duration, _) => duration.toMillis 
}.runSyncUnsafe // millsecond

紹介したものはMonix Taskの機能の一部分なので興味が出たら追ってみて下さい。


5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?