sekikatsu
@sekikatsu

Are you sure you want to delete the question?

Leaving a resolved question undeleted may help others!

ScalaのFutureでコールバックをFuture到達前のスレッドで実行する方法

funcX() //ThreadLocalに何か詰め込む
val future = Future { funcY() }
future.onComplete {
  case Success() => funcZ(true) //ThreadLocalから何か取得する
  case Failure(e) => funcZ(false) //ThreadLocalから何か取得する
}

上記のようなコードで、funcXfuncZを同一スレッドで実行する方法はありますでしょうか。

スレッドが一つしかないようなExecutionContextを指定すれば可能そうですが、その場合はfuncX/funcZが重い処理の場合にボトルネックになってしまうため、あくまでExecutionContextには複数スレッドが存在する状態で、funcX実行時のスレッドに処理を戻したいのです。

(ThreadLocalなんて使うな、という意見もあるかと思いますが、外部のJavaライブラリのため変更できず。。。)

0

2Answer

Comments

  1. @sekikatsu

    Questioner

    1スレッドしかないスレッドプールを複数束ねる、という感じでしょうか。
    仰る通り、それで実現できそうな気はします。
    ただ、可能な限りあまり自前で複雑な処理は実装したくない気持ちもあります。。。(せっかくスレッドプールの仕組みがあるわけなので)

    他に方法がなさそうであれば、最終手段として検討してみようと思います。
    ありがとうございました!

このコードを拝見した限りでは、次のような順序(時系列)で実行されることがとなっていそうな気がします。

  1. funcXの、少なくともThreadLocalでデータを詰め込むところまでが実行される
  2. funcXの残りの処理とfuncYが並列実行される
    • このときfuncXの残りの処理が終了することを待つ(同期する)必要はない
  3. funcYが終ってからfuncZを実行する

したがって、このような同期タイミングを利用すれば副作用的なThreadLocalを利用しなくてもよさそうに思いました。まず前提としてThreadLocalに突っ込んでおきたい(= funcZであとから使いたい)型をAとして

def funcXPhase1: Future[A] // funcZで使う型`A`な値をつくるまで
def funcXPhase2: Future[?] // 型`A`な値を作ったあと

def funcX(): Future[?] = {
  funcXPhase1 flatMap { _ =>
    funcXPhase2
  }
}

という :point_up_2: となっているものとします。そうすれば

for {
  a <- funcXPhase1
  y <- (funcXPhase2 zip Future(funcY())) {
    // 並列実行のためにzipするが`funcXPhase2`の返り値は捨てる
    case (_, y) => y
  } recover {
    // ナイスキャッチ
    case _ => funcZ(a, false)
  }
  z <- funcZ(a, true)
} yield z

というようにできるのではないでしょうか。この例だと正常系な場合はfuncXPhase2が終わってからfuncZをはじめているので、元のコードとは多少意味が違っているので、そこをもっと再現する必要はあるかもしれませんが、そこまで複雑なコードにせずに状態を排除できているのでいったんこれでとしました。

0Like

Your answer might help someone💌