LoginSignup
5

More than 5 years have passed since last update.

JavaのFutureをScalaのFutureへ変換する

Last updated at Posted at 2017-06-08

はじめに

ScalaでJavaのライブラリを利用することがしばしばあるが、ScalaのFutureとJavaのFutureは別である。Scalaを利用する時はJavaのFutureではなく、ScalaのFutureを利用したい。この記事ではJavaのFutureをScalaのFutureへ変換する方法について述べる。なお、この記事の完全なコードは次のGitHubリポジトリに置かれている。

表記

この記事ではScalaのFutureとJavaのFutureを次のように区別する。

import java.util.concurrent.{Future => JavaFuture}
import scala.concurrent.{Future => ScalaFuture}

つまり、JavaFuture[A]はJavaのFutureであり、一方でScalaFuture[A]はScalaのFutureである。

直感的な実装

Stackoverflowなどでは次のようなコードで変換できると書かれている。

BrokenJavaFutureConverter.scala
object BrokenJavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    val p: Promise[A] = Promise[A]

    ec.execute(
      new Runnable {
        override def run() =
          p.complete(
            Try(jf.get())
          )
      }
    )

    p.future
  }

  implicit class RichJavaFuture[A](val jf: JavaFuture[A]) extends AnyVal {
    def asScala(implicit ec: ExecutionContext): ScalaFuture[A] = toScala(jf)
  }
}

これはJavaのFutureをExecutionContextに基づいて実行しScalaのPromiseで受け取ってそれでScalaのFutureを返すというコードである。これで一見良さそうで、次のように正常に動作するように見える。

BrokenJavaFutureConverterSpec.scala
class BrokenJavaFutureConverterSpec extends WordSpec {
  import BrokenJavaFutureConverter._

  trait SetupWithFixedThreadPool {
    val timeout = Duration(1, TimeUnit.SECONDS)

    val threadPool: ExecutorService = Executors.newFixedThreadPool(1)

    val executor: Executor = new ExecutorFromExecutorService(threadPool)

    implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
  }

  "toScala" should {
    "return the value" in new SetupWithFixedThreadPool {
      val javaFuture: JavaFuture[Int] = threadPool.submit { () =>
        Thread.sleep(200)
        10
      }

      assert(Await.result(toScala(javaFuture), timeout) == 10)
    }
  }
}

一方で次のように、作成したScalaのFutureをrecoverしようとすると正しく動作しない。

BrokenJavaFutureConverterSpec.scala
"not be able to recover the exception" in new SetupWithFixedThreadPool {
  val javaFuture: JavaFuture[Int] = threadPool.submit{ () =>
    throw new TestException()
  }

  val recover = javaFuture.asScala.recover {
    case e: TestException => 10
  }

  assertThrows[ExecutionException](Await.result(recover, timeout))
}
TestException.scala
class TestException(message: String = null, cause: Throwable = null)
  extends Exception(message, cause)

上記のテストではJavaのFutureは実行時に例外TestExceptionを送出し、それをScalaFuture#recoverで変換しているはずだが、これの結果をAwait#resultで取り出すと例外ExecutionExceptionが送出される。このように、JavaのFutureは実行時の例外をExecutionExceptionでラップしてしまう。これではScalaのFutureとしてrecoverがやりずらいので改善を考える。

ExecutionExceptionをアンラップする

次のようなコードで先ほどの問題を解決する。

JavaFutureConverter.scala
object JavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    val p: Promise[A] = Promise[A]

    ec.execute { () =>
      p.complete(
        Try(jf.get()) match {
          case Failure(e: ExecutionException) =>
            Failure(e.getCause)
          case x =>
            x
        }
      )
    }

    p.future
  }

  implicit class RichJavaFuture[A](val jf: JavaFuture[A]) extends AnyVal {
    def asScala(implicit ec: ExecutionContext): ScalaFuture[A] = toScala(jf)
  }
}

これはJavaのFutureをgetしたものをTryで包み、もし結果がFailureでかつ例外の型がExecutionExceptionである場合は、getCauseを利用してアンラップするようにしている。こうすることで次のようにrecoverが動作する。

JavaFutureConverterSpec.scala
"be able to recover the exception" in new SetupWithFixedThreadPool {
  val javaFuture: JavaFuture[Int] = threadPool.submit { () =>
    throw new TestException()
  }

  val recover = javaFuture.asScala.recover {
    case e: TestException => 10
  }

  assert(Await.result(recover, timeout) == 10)
}

ForkJoinPoolによる問題

これでよいものができたかに見えたが、調べたところForkJoinPool.commonPool()といった方法で作成されたExecutorServiceを利用するとこれは次のように正しく動作しないことがあると分った。

JavaFutureConverterSpec.scala
trait SetupWithForkJoinPool {
  val timeout = Duration(1, TimeUnit.SECONDS)

  val forkJoinPool: ExecutorService = ForkJoinPool.commonPool()

  val executor: Executor = new ExecutorFromExecutorService(forkJoinPool)

  implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
}

"return RuntimeException despite it returns IOException if you use the ForkJoinPool executor" in new SetupWithForkJoinPool {
  val javaFuture: JavaFuture[Unit] = forkJoinPool.submit { () =>
    throw new IOException()
  }

  assertThrows[RuntimeException](Await.result(javaFuture.asScala, timeout))
}

これはIOExceptionを送出しているにも関わらず、RuntimeExceptionにラップされていることを示している。このように、このJavaFutureConverter#toScalaは利用するExecutorServiceによっては正しく動作しないことがあることに注意が必要である。もしこのコードを本番で利用する場合は、そのコードがどのような方法で作成したExecutorServiceを利用しているのかを調べて、そのExecutorServiceを利用したテストを実行してからこのコードを利用するべきである。

がくぞさんからのアドバイス

がくぞさんから次のようなコードでもよいという意見をいただいた。

JavaFutureConverter#toScalaExecutionContextを暗黙に受け取ってそれでJavaのFutureを実行するが、ScalaのFutureも同じような動作だったのでこちらの方がシンプルであるという理由でこちらに修正した。

JavaFutureConverter.scala
object JavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    ScalaFuture(jf.get()).transform {
      case Failure(e: ExecutionException) =>
        Failure(e.getCause)
      case x => x
    }
  }
}

まとめ

JavaのFutureをScalaのFutureへ変換するのは思っていたよりも難しいということが分かった。もしこれよりもよい方法があれば、気軽にこの記事のコメントなどで指摘して欲しい。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5