とあるマイクロサービスにスレッドセーフな連想カウンタを仕込みたくなったので、ここのところ格闘していました。要件はこんな感じです。
- リクエストごとに様々なキーのカウント値をインクリメントする
- 定期的に全キーのカウント値をログ出力して0にリセットする
- インクリメントされなくなったキーをメモリに保持しつづけない
フレームワークとして使っている Finagle の mterics 機能を初めに検討したのですが、リセットする方法がなさそうだったのと、数十万個のキーを登録するようなものではなさそうだったので、自前で作ることにしました。
ということで Java のスレッドセーフな連想配列 ConcurrentHashMap のドキュメントを見てみると
ConcurrentHashMapは、LongAdderの値を使用し、computeIfAbsentで初期化することにより、スケーラブルな頻度マップ(ヒストグラムやマルチセットの形式)として使用できます。たとえば、ConcurrentHashMap freqsにカウントを追加するには、freqs.computeIfAbsent(k -> new LongAdder()).increment();を使用できます。
とあるので、これがまさに求めていたもののようです。
LongAdder って、あれ AtomicLong じゃないんだとも思いましたが、これは用途によって使い分けるものみたいです。
きめ細かい同期制御のためではなく統計収集などの目的に使用される共通の合計が、複数のスレッドによって更新される場合、通常はAtomicLongよりもこのクラスをお薦めします。更新の競合が少ないときは、2つのクラスの特徴は似ています。競合が多いときは、期待されるスループットはこのクラスの方がかなり高くなります。ただし、容量消費も多くなります。
そして実は Scala にも TrieMap というスレッドセーフな連想配列があるので、今回は TrieMap と LongAdder を使って実装することにしました。すなわち、こんな感じで連想カウンタを宣言して、
val counter = TrieMap[Key, LongAdder]()
各リクエスト処理の随所でインクリメントして、もしもキーが存在しない場合はメモリ確保して、
counter.getOrElseUpdate(key, new LongAdder()).increment()
別途、タイマー仕掛けて定期的にログ出力しながら要素削除して、
for (key <- counter.keys) {
val sum = counter.remove(key).sum()
logger.debug(s"$key: $sum")
}
こんなもんでおっけーおっけー簡単じゃん、というノリで実装していたのですが、これ実は正確なカウンタになってないんですねorz
利用しているメソッドは全て atomic なのですが、 atomic なものを組み合わせても atomic にはならないというやつで、実例を確かめたわけではないのですが、こういうケースがありえるはずです。
- スレッドA:
getOrElseUpdate(key, new LongAdder())
- スレッドB:
remove(key)
- スレッドB:
sum()
- スレッドA:
increment()
この順で同じキーが処理された場合、最後の increment()
された LongAdder はもう TrieMap から削除されていて、ログ出力値も評価済みなので、インクリメントした情報は闇に飲まれてしまうんですね。
また計算負荷的な観点から見ても、ログ出力時に TrieMap は一気に空になって、そのあとまたインクリメントされるたび膨らんでいくため、ログ出力前後で負荷が上がってしまうデメリットもあります。
これはリセットと要素削除も一緒に行ってしまうのが筋悪そうで、とりあえずログ出力時には、
for {
key <- counter.keys
value <- counter.get(key)
} {
val sum = value.sumThenReset()
logger.debug(s"$key: $sum")
}
とリセットだけすれば良いかなとも思ったのですが、
public long sumThenReset()
sum()に続いてreset()を実行した場合と効果は同じです。たとえば、マルチスレッド計算の間の静止点の期間中に、このメソッドが適用される場合があります。このメソッドと並行して更新が行われている場合、返される値は、リセット前に発生した最終の値であることは保証されません。
なんと、これもインクリメントした情報は闇飲まな可能性があります。
これは LongAdder のデータ構造上仕方なさそうなので、ならば AtomicLong はどうだろうと見てみると、
public final long getAndSet(long newValue)
指定された値に原子的に設定して、以前の値を返します。
こちらは atomic になってますね。では連想カウンタのデータ構造を切り替えて、
val counter = TrieMap[Key, AtomicLong]()
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.getAndSet(0)
logger.debug(s"$key: $v")
}
ログ出力とは別のタイマーで、カウント値が 0 だったら要素削除すれば良さそうですね。この実装は、
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.get()
if (v == 0) counter.remove(key)
}
などとしてしまってはもちろんダメで get()
してから remove(key)
するまでにインクリメントされると、はい、やみのま!(おつかれさまです!)
すなわち値の条件判定と要素削除を atomic に行わなくてはならなくて、そんな都合の良いものがあるかというと、
def remove(k: K, v: V): Boolean
Removes the entry for the specified key if it's currently mapped to the specified value.
あるにはあるんですが、一致判定なんですよね。
AtomicLong は参照型なので、カウント値としては一緒でもインスタンスとしては、
scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false
というふうに等しくないわけですね。この事情は ConcurrentHashMap でも同じです。
public boolean remove(Object key,
Object value)
指定された値にキーが現在マッピングされている場合にのみ、そのキーのエントリを削除します。これは次の記述と同等です。
よって、この remove を利用するためには value の型をプリミティブな Long にせざるをえなくて、 atomic にインクリメントをする排他制御は ConcurrentHashMap.compute などに頼る必要があります。
compute
public V compute(K key,
BiFunction super K,? super V,? extends V> remappingFunction)
指定されたキーと現在マップされている値に対するマッピングの計算を試みます(現在のマッピングが存在しない場合はnull)。メソッドの呼出し全体は原子的に実行されます。他のスレッドがこのマップに対して試行する更新オペレーションの一部は計算の進行中にブロックされる可能性があるため、計算は短く簡単にしてください。また、計算でこのマップの他のマッピングを更新しようとしないでください
そして実は実は実は、このちょっと手前まで頭ひねったところで気づいたのですが、そういう実装の例が Gauva にありました。
public final class AtomicLongMap<K> implements Serializable {
private final ConcurrentHashMap<K, Long> map;
これも昔は AtomicLong で実装されていたようですが、
public final class AtomicLongMap<K> {
private final ConcurrentHashMap<K, AtomicLong> map;
なにか問題があって切り替えたんだろうなーという気がしています。
結論としては、GauvaのAtomicLongMapを使うか、真似て実装すれば良さそうです。
並列処理は難しいデスねー。