Scala

Futureをマスターする -インスタンス操作-

More than 3 years have passed since last update.

最後はFutureトレイトに関する操作をやりきる。


シリーズ目次

はじめに

コンパニオンオブジェクト操作

インスタンス操作

おまけ

Futureにハマる


変換


map (※要ExecutionContext、別スレッドでの実行)

お馴染みの。結果を何かしら変換する際に。

val f: Future[String] = Future("hello")

val ff: Future[Int] = f.map(_.size)

println(Await.result(ff, Duration.Inf)) // 5


mapTo

戻り値のFuture[A]を別の指定した型のFutureに変換します。

trait X

trait A
trait B extends A
case class Foo(z: Int) extends X with B

val f: Future[Foo] = Future{ Foo(1) }
val fx: Future[X] = f.mapTo[X]
val fb: Future[B] = fx.mapTo[B]
val fa: Future[A] = fb // Futureは共変なのでこれはmapToしなくてもそのまま代入可能


failed

Future内で例外が起きると、Future#valueした結果のTryはFailureになっている。このようなFutureに対してfailedメソッドを呼ぶことでSuccess[Throwable]なFutureを返すことが出来る。

但し、Successなものにfailedメソッドを実行するとNoSuchElementExceptionとなり今度はFailureになってしまう。

どういうところで使うのか一瞬戸惑うが、この性質上、失敗した時だけxxxするといった使い道しか無いと考えられる。

Future#mapなどはFailureになっていると実行されない。よって、failedメソッドを呼ぶことで失敗した時だけ後続処理を続けることができるようになる。


flatMap (※要ExecutionContext、別スレッドでの実行)

結果に対して何かしら加工する際、Futureの入れ子になるような場合はflatMapで。

結果が条件を満たさないような場合も失敗にしてしまえば後続処理は行われない。

  val f1: Future[Int] = Future{ Thread.sleep(3000); 1 }

val f2: Future[Int] = Future{ Thread.sleep(2000); 2 }

val f: Future[Int] = f1.flatMap{ n1: Int =>
val ff: Future[String] = f2.map{ n2: Int => "hello" * (n1 + n2) }
if (Await.result(ff, Duration.Inf).size > 10) ff else Future.failed(new Exception("not over 10"))
}.map { str: String =>
str.size
}

f.onComplete {
case Success(result) => println(result)
case Failure(t) => println(t.getMessage())
}

Await.ready(f, Duration.Inf)


transform (※要ExecutionContext、別スレッドでの実行)

成功時と失敗時、それぞれで変換処理を登録しておく。

なお、失敗時は-1に、というような変換はできない。第二引数の関数の戻り値はThrowableと定義されているためだ。

  val f: Future[Int] = Future{ 55 }

val ff: Future[Int] = f.transform(
{ n => n * 2 },
{ t => throw new Exception("error!") })
ff foreach println

Await.ready(ff, Duration.Inf)


フィルタリング


filterとfilterWith (※要ExecutionContext、別スレッドでの実行)

条件を満たさない場合はNoSuchElementExceptionが発生しFailureになる。

以下の例の場合はfilter後のmap処理は呼ばれない。

  val f: Future[Int] = Future{ 5 }

val ff: Future[Int] = f.filter(_ % 2 == 0).map(_ * 2)
ff.onComplete {
case Success(result) => println(result)
case Failure(t) => println(t.getMessage())
}

Await.ready(ff, Duration.Inf)

Futureの場合は、コレクション系と異なり複数要素があるわけではないのでwithFilterとfilterを使い分けるようなシーンはあまりないように思う。

と思っていたら、公式ドキュメントにもそう書いてあった。


For futures calling filter has exactly the same effect as does calling withFilter.


withFilterは直接使うというよりはfor内包記法のif文使用時に裏側で呼ばれるというくらいだろうか。

  val f1: Future[Int] = Future{ Thread.sleep(3000); 1 }

val f2: Future[Int] = Future{ Thread.sleep(2000); 2 }

val f: Future[Int] = for (n1 <- f1 if (n1 % 2 == 0); n2 <- f2) yield { n1 + n2 }


collect (※要ExecutionContext、別スレッドでの実行)

filterとwithFilterの明示的な使い分けが無いのであればcollectはパフォーマンスを意識した使い方というよりはfilter + mapと書くのかcollect一発で書くのかというスタイル的な選択かなと。

上記filter + mapの例は以下の一行で書き換え可能だ。

f.collect{ case v: Int if (v % 2 == 0) => v * 2 }


例外発生時のリカバリ処理


recoverとrecoverWith (※要ExecutionContext、別スレッドでの実行)

Future#traverseは複数のFutureのうち1つでもエラーがあるとonFailでしか結果を受け取れなくなってしまう。

また、mapやflatMapなどはFutureがFailureになっているとスキップされるので後続処理が行われなくなってしまう。

通常はこれで問題ない場合のほうが多いと思うが、例えば10個のFutureのうち、途中でエラーが起きたとしても全てをやりきり、最後に9つが成功して1つが失敗したといった情報を得たい場合はどうすればよいだろうか。

ひとつのアイディアとしてrecoverを使ってSuccess状態にすることで処理を続行するというアイディアがある。以下そのサンプル。

  val f: Future[List[Int]] = Future.traverse((1 to 10).toList) { i =>

Future {
if (i == 5 || i == 7) throw new Exception(s"error in $i") else i
} recover {
case t => -1
}
}

f.onSuccess{
case result: List[Int] => {
val grouped = result.groupBy(v => v != -1)
def count(key: Boolean): Int = grouped.get(key).map(_.size) getOrElse 0

println(s"success: ${count(true)}, fail: ${count(false)}")
}
}

Await.ready(f, Duration.Inf)

recover部分はrecoverWithを使ってFutureを返すように書くことも出来る。

実行済みFutureと組み合わせて使うことが多いだろうか。

} recoverWith {

case t => Future.successful(-1)
}


fallbackTo

recoverWithとほぼ同じ。fallbackToでSuccessなFutureを登録しておけば、失敗時にそちらが返される。

なお、recoverWithは失敗した時にだけ呼ばれるが、fallbackToは成功時も呼ばれ、結果は同じだが新たにFutureが生成されるという違いがある。

fallbackToの方がPartialFunctionでない分すっきり書けて好みだったが、あまり気にしなくてもいいレベルかも知れないものの基本はrecover/recoverWithを使うことになりそうだ。

  val f: Future[Int] = Future { throw new Exception("error!") } fallbackTo Future.successful(-1)

f.foreach(println)

Await.ready(f, Duration.Inf)


その他


foreach (※要ExecutionContext、別スレッドでの実行)

Future#foreachは公式ドキュメントによると、onSuccessと同等であるとのこと。


This means that the foreach has exactly the same semantics as the onSuccess callback.


val f: Future[Int] = Future.successful(5)

f.foreach(println)


andThen (※要ExecutionContext、別スレッドでの実行)

このメソッドは少し特殊で副作用を前提とした処理とのこと。


The andThen combinator is used purely for side-effecting purposes.


引数にPartialFunctionを取るものの、その戻り値は一切考慮されず常に同じ結果を持つ新しいFutureが返されるだけである。

なお、以下のプログラム例のように値を変えたり型を変えたりしても結局は100を保持したFuture[Int]が返ってくる。

もちろん、exeCntに対する処理は2回実行されることから出力時は2と表示される。

なお、FutureがFailure状態であってもandThenは都度実行されるというのも他のメソッドと違っているところである。

  var exeCnt = 0

val f: Future[Int] = Future.successful(5)

val ff: Future[Int] = f
.andThen { case n: Try[Int] => exeCnt += 1; n.get * 2 }
.andThen { case n: Try[Int] => exeCnt += 1; "hello" * n.get }
ff.foreach{ r => println(s"$r : $exeCnt") }


zip

2つのFutureの結果をzipする。

zipで処理する2つのFutureは並列で実行される。

  // これだと4sかかる

for {
v1 <- Future { Thread.sleep(1000); 1 }
v2 <- Future { Thread.sleep(3000); 3 }
} yield {
v1 + v2
}

// これだと3s
for {
v <- Future { Thread.sleep(1000); 1 } zip Future { Thread.sleep(3000); 3 }
} yield {
v._1 + v._2
}


メソッドコンプリート状況

完了!

trait Awaitable: 5/5

trait Future: 16/16

object Future: 10/10