Posted at

Finagle の Context の仕組みと使い方

More than 1 year has passed since last update.


概要

FinagleContexts.localContexts.broadcastの使い分けと、Context がどのように値を保持し、伝播させているのかなどを調べたので、その備忘録です。


対象


  • Java および Scala がなんとなく読める。

  • Finagle 使ってるか twitter.util.Futureへのある程度の理解がある。


環境


build.sbt

scalaVersion := "2.12.1"

libraryDependencies += "com.twitter" % "finagle-core_2.12" % "6.43.0"


せっかくなのでと Scala 2.12 と最新の Finagle で試してみましたが、私がプロダクション内で使ってるのはもっと古いバージョン1なので、今回試したところとコードには多少差異がありましたが、振る舞いは同じようでしたのでこちらのバージョンで試しています。転載した Finagle および Twitter uitl ライブラリのコードも、記事作成時点のものになります。


Context ?

端的に言えば、フィルターやサービス間を、引数として値を渡さずに値を引き回せる機能です。ただし、スコープが決まっておりグローバルな変数ではありません。

Finagle の Service はインターフェイスが決まっているため2、 implicit parameter で値を渡していくような実装ができません。しかし、トレースログ用にリクエスト単位で識別子を引き回したい場合など、引数として渡すドメインオブジェクトに含めるには微妙な値などがあります。

そういうときにこの Context が役に立ちます。3

Java で考えると、 Servlet の Request scope が近いかと思います(用途的な意味では)。

ただし、スレッドを跨いで値を参照するためには、非同期処理に必ず com.twitter.util.FuturePool を使う必要があります。(後述)


Example

実際にコードを書いてみます。

まず Context に保存するオブジェクトのクラスを定義します。

case class ExampleContext(value: String)

次に、 Context に保存するためのキーと、値を設定、参照するためのメソッドを定義します。

object ExampleContext {

import com.twitter.finagle.context.Contexts

private[this] val key = new Contexts.local.Key[ExampleContext]

def current: Option[ExampleContext] = Contexts.local.get(key)

object AsCurrent {
implicit final class AsCurrent(val value: ExampleContext) extends AnyVal {
def asCurrent[A](fn: => A): A = Contexts.local.let(key, value)(fn)
}
}
}

ここでは broadcast する必要はないため Contexts.local を用いています。使い分けについては後述します。

asCurrent メソッドは case class に直接定義しても大丈夫ですが、key の参照をコンパニオンオブジェクト内に限定したり、asCurrent メソッド自体へのアクセスも制限しやすくなるため、enrich my library パターンなどで分けておくと便利かと思います。

実際に使ってみます。


Example.scala

import java.util.concurrent.Executors

import java.util.concurrent.atomic.AtomicInteger

import com.twitter.util.{Await, Future, FuturePool}

object Main extends App {

val executor = {
val counter = new AtomicInteger(0)
Executors.newCachedThreadPool((r: Runnable) => new Thread(r, s"sub${counter.getAndIncrement}"))
}
val myFuturePool = FuturePool(executor)

def foo(num: Int): Unit = {
val exampleValue = ExampleContext.current.fold(ifEmpty = "context not set.")(_.value)
println(s"${num formatted "%02d"}: [${Thread.currentThread().getName}] $exampleValue")
}

// 設定されてない場合は参照できない
foo(1)

import ExampleContext.AsCurrent._

val f1: Future[Unit] = ExampleContext("bar bar") asCurrent {
// 設定すると値を渡さなくても参照ができる
foo(2)

val f2: Future[Unit] = myFuturePool {
// 別スレッドになっても参照できる
foo(3)
Thread.sleep(1000)
} flatMap { _ =>
// asCurrent 内で再設定されると上書きされるが、抜けると戻る
foo(4)
val f3: Future[Unit] = ExampleContext("baz baz baz") asCurrent {
foo(5)
myFuturePool {
foo(6)
Thread.sleep(1000)
}
}
foo(7)
Thread.sleep(1000)
f3
}
foo(8)

f2
}

foo(9)

Await.result(f1)
executor.shutdown()
}


01: [main] context not set.

02: [main] bar bar
03: [sub0] bar bar
08: [main] bar bar
09: [main] context not set.
04: [sub0] bar bar
05: [sub0] baz baz baz
07: [sub0] bar bar
06: [sub1] baz baz baz

Process finished with exit code 0

asCurrent メソッドに渡した関数内では、ExampleContext.current メソッドで ExampleContext が取得できています。また、FuturePool を用いてスレッドをまたいでいても、問題なく元の値が参照できています。

この例では main メソッド内に全て定義してますが、実際に使う場合は asCurrentを呼び出す Filter を定義するのが便利ではないかと思います。


local or broadcast

ここで一旦 Contexts.local (LocalContext) と Contexts.broadcast (MarshalledContext) の違いについても触れておきます。

LocalContext はその JVM 内でのみ使える Context を利用する場合に使用します。

MarshalledContext は、その名前の通り marshall(≒serialize) 可能なコンテキストを作ることができます。LocalContext と同じように使えるほか、Finagle クライアントから Finagle サーバーにも Context を伝播させることもできます。

ただし broadcast で伝播させるには、com.twitter.finagle.thrift に、finagle パッケージプライベートとして定義されている Filter である、TTwitterClientFilter および TTwitterServerFilter を使うように定義されている Thrift のクライアントとサーバーを用いるか、自前でその機能を持った client/server を作る必要があります。


つまりどちらを使うか

殆どの場合は、LocalContext で十分かと思います。

Thrift の Server/Client を使うか Contextを渡せるものを自前で用意し、かつクライアントからサーバーにもコンテキストを引き回したい、といった用途がない限りは broadcast を実装するのは冗長でしょう。


LocalContext の仕組み

どんな大層な仕組みかと思いきや、実は ThreadLocal 内に持たせた Map[LocalContext.Key, Any] から値を出し入れしているだけです。

ただ、直接 ThreadLocal を用いているわけではなく、twitter util で提供されている com.twitter.util.Local を使って実装されています。

50行ほどの小さいクラスなので、コードを追いながら簡単に説明していきます。4


12~19行目


LocalContext.scala#L12-L19

private[this] val local = new Local[Map[Key[_], Any]]

class Key[A]

/**
* A java-friendly key constructor.
*/
def newKey[A]() = new Key[A]


保存に用いる Local の生成とキーの定義です。見ての通り実態はただの Map で、キーも何の機能もないただのオブジェクトだということがわかります。


21~41行目


LocalContext.scala#L21-L41

def get[A](key: Key[A]): Option[A] = env.get(key).asInstanceOf[Option[A]]

def let[A, R](key: Key[A], value: A)(fn: => R): R =
letLocal(env.updated(key, value))(fn)

def let[A, B, R](key1: Key[A], value1: A, key2: Key[B], value2: B)(fn: => R): R = {
val next = env.updated(key1, value1).updated(key2, value2)
letLocal(next)(fn)
}

def let[R](pairs: Iterable[KeyValuePair[_]])(fn: => R): R = {
val next = pairs.foldLeft(env) { case (env, KeyValuePair(k, v)) => env.updated(k, v) }
letLocal(next)(fn)
}

def letClear[R](key: Key[_])(fn: => R): R = letLocal(env - key)(fn)

def letClear[R](keys: Iterable[Key[_]])(fn: => R): R = {
val next = keys.foldLeft(env) { (e, k) => e - k }
letLocal(next)(fn)
}


設定した値の取得用の get メソッドと、getを使う関数を実行するための letXxx メソッドが定義されています。

ここに出てくる env メソットの名前が示している環境というのは、今の実行環境であるスレッドのことです。つまり ThreadLocal に格納されている値を取り出しているコードになってます。


41~51行目


LocalContext.scala#L45-L51

def letClearAll[R](fn: => R): R = local.letClear(fn)

private[this] def env: Map[Key[_], Any] = local() match {
case Some(env) => env
case None => Map.empty
}

private[this] def letLocal[T](env: Map[Key[_], Any])(fn: => T): T =
local.let(env)(fn)


Local から今の環境の Map[Key, Any] を取り出す env メソッドと、渡した環境の値で関数を実行する letLocal および環境に設定されている値を消した状態で実行する letClear メソッドが定義されています。


この記事では触れませんが、LocalContext が利用している twitter.util.Local の仕組みも似たような感じになっています。コードはもう少々複雑で、ThreadLocalMap の代わりに配列を持たせて、キーに添え字を使って値を出し入れするコスト重視の泥臭いコードになっていますが、やっていることはほぼ同じです。


MarshalledContext の仕組み

長くなるので端折ります。実装を見て下さい。(丸投げ)

簡単に解説すると、環境が変わらない場合は LocalContext と同じくThreadLocalに保存してるだけで、環境が変わる場合は、MarshalledContext.Key に実装されたメソッドで、シリアライズ、デシリアライズを行います。ただし、環境の境界にシリアライズ/デリシアライズする機能が必要になります。

Thrift では、先にも書いたようにTTwitterClientFilter/TTwitterServerFilterが、クライアント側でシリアライズし、サーバー側でデシリアライズして実現しています。


スレッドが変わっても値が参照できる理由

Context はスレッドを跨いでいても同じ値が参照できますが、先にも書いたようにこれには FuturePool を使い非同期化した場合、という制約があります。

なぜ Finagle の Context は直接 ThreadLoacl を使わずに Local を使っているのかも、この部分にあります。

FuturePool の実装である ExecutorServiceFuturePoolapply メソッドを見てみましょう。

128行目から無名クラスの Runnable の実装が長々と続いてますが、Context に関係がある部分は、Runnable#run メソッドの129~151行目だけです。


FuturePool.scala#L129-L151

private[this] val saved = Local.save()

def run(): Unit = {
// Make an effort to skip work in the case the promise
// has been cancelled or already defined.
if (!runOk.compareAndSet(true, false))
return

val current = Local.save()
Local.restore(saved)

try
p.updateIfEmpty(Try(f))
catch {
case nlrc: NonLocalReturnControl[_] =>
val fnlrc = new FutureNonLocalReturnControl(nlrc)
p.updateIfEmpty(Throw(fnlrc))
throw fnlrc
case e: Throwable =>
p.updateIfEmpty(Throw(new ExecutionException(e)))
throw e
} finally Local.restore(current)
}


つまり、FuturePool が生成される際に Local の値を saved というローカル変数として保存しておき、別スレッド内で関数が実行される際にそのスレッドの ThreadLocal の値を

current に一時退避して saved で上書きし、関数の実行終了時に元の ThreadLocal に格納されてた current に書き戻しているだけでした。


Context を使う場合に気をつけること

実際にプロダクションコードでもやらかしたので、備忘録として。

当たり前といえば当たり前ですが、FuturePool を使わない非同期化されたコードでは Contextは参照出来ません。

例えば、Scala の Future を用いて非同期処理を実装します。この場合、実行される処理で Context を参照しても、ThreadLocal に値が書き戻されていないため参照出来ずに期待しない結果となってしまいます。


回避するには

どんな処理であっても、twitter.util.Futureや、twitter.util.Promise を使うようにします。また、ライブラリなどからの全ての非同期実行の戻り値を twitter.util.Future に揃えておくのが無難です。

他の非同期処理を twitter.util.Future に変換するのは簡単で、スレッドが変わる前に、tiwtter.util.Promise を作っておき、他の仕組みで実行された非同期処理の結果を Promise に反映するだけです。

このようなユーティリティを作っておくと便利かもしれません。


ToTwitterFutureSample.scala

object scala:ToTwitterFuture {

implicit final class SFutureToTFuture[A](
val sFuture: concurrent.Future[A]) extends AnyVal {

def tFuture: com.twitter.util.Future[A] = {
val p = com.twitter.util.Promise[A]()
sFuture.onComplete {
case util.Success(v) => p.setValue(v)
case util.Failure(t) => p.setException(t)
}
p
}
}
}


Context を使う場合、というかFinagle (ないしはその派生など)を使う場合は、必ずこの Future呪い 制約が常について回ると考えておいたほうがいいですね。:)


参考にした記事など





  1. おしごとではまだ finagle-http 6.33.0 使ってる。 



  2. abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) with Closable {...} 



  3. ただ、トレース用途IDが欲しい場合は Finagle が用意してるトレース用の機能で実現できたりするので、再発明する必要はなさげ。なおこの機能も、実装には Context を利用しているので参考になる。ただしだいぶ複雑になってるので読むのはちょっと大変。 



  4. 過去の Finagle では、ほぼContext トレイトのデフォルトの実装だけで実装されてたから本当にほぼ実装がなかった。