概要
- 分散システムのコンシューマーに置ける1回以上のメッセージ到達の問題を解決するには
- イベント冪等性キーを基盤とする冪等的なサービスを構成する必要がある
- 同じ冪等性キーが一定の時間内に何回も処理されることを防ぐのだ
- そのためにはRedisを分散ロックシステムKVSとして取り入れるのがベストプラクティスとなっている
- 今回は冪等的な処理のためのDistributedLockクラスを実装する
DistributedLock
- まずは全体クラスを見てみよう
- 3つのメソッド全部キーとパラメーターのない返り値が一個のlambda関数を引数として受け
- Redisからロックを獲得し、lambda関数を実行する
- その中、ロックがかかっていればwaitTimeだけ待つ
- ロックを獲得してからはleaseTime以内にロックを解除する
import dev.ktcloud.black.client.redis.extension.awaitResponse
import org.redisson.api.RedissonClient
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class DistributedLock(
private val redissonClient: RedissonClient
) {
fun <R> execute(
key: String,
func: () -> R,
timeUnit: TimeUnit = TimeUnit.SECONDS,
waitTime: Long = 5L,
leaseTime: Long = 3L
): R {
val lock = redissonClient.getLock(key)
val available = lock.tryLock(waitTime, leaseTime, timeUnit)
if (!available) throw TimeoutException("Redisson Lock timed out for key: $key")
return try {
func.invoke()
} finally {
lock.unlock()
}
}
suspend fun <R> executeAsync(
key: String,
func: () -> R,
timeUnit: TimeUnit = TimeUnit.SECONDS,
waitTime: Long = 5L,
leaseTime: Long = 3L
): R {
val lock = redissonClient.getLock(key)
val available = lock.tryLockAsync(waitTime, leaseTime, timeUnit).awaitResponse()
if (!available) throw TimeoutException("Redisson Lock timed out for key: $key")
return try {
func.invoke()
} finally {
lock.unlockAsync()
}
}
fun <R> executeNowOrFail(
key: String,
func: () -> R,
timeUnit: TimeUnit = TimeUnit.SECONDS,
leaseTime: Long = 3L,
): R {
return try {
execute(key, func, timeUnit, 0, leaseTime)
} catch (_: TimeoutException) {
throw IllegalStateException("Already Locked Key for key: $key")
}
}
}
Redidisson
- まずはRedissonから見ると
- RedissonはJedisとLettuceとは違って、Redisのコマンドを直接扱うことなく、Javaインターフェースで抽象化された高水準のAPIが使える
- getLockメソッドで簡単にRedis HashベースのRLockが使える
import org.redisson.api.RedissonClient
private val redissonClient: RedissonClient
- そして、getLockはPub/Subを用いたノンブロキング方式として、メモリとCPUを無駄遣いしない
- 逆にスピンロック方式を使える「getSpinLock」メソッドも使えるのだ
/**
* Returns Lock instance by name.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order by threads.
* <p>
* To increase reliability during failover, all operations wait for propagation to all Redis slaves.
*
* @param name name of object
* @return Lock object
*/
RLock getLock(String name);
/**
* Returns Spin lock instance by name.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order by threads.
* <p>
* Lock doesn't use a pub/sub mechanism
*
* @param name name of object
* @return Lock object
*/
RLock getSpinLock(String name);
RedissonLock.java
- 実際にRLockのtryLockメソッドはnon-fairロックとして実装されている
- ロックに並んでいる待ち行列の順序を保証しないのである
- 下記のコードを見るとその原理が明確になる
- ロック待ちスレッドは平等にロックを奪い合い
- 待ち時間やデッドライン等は考慮されていない
- もし、fairなロックが必要な場合、getFairLockメソッドを使おう
- 無論、Lock処理にかかるオーバーヘッドは嵩張る
// redisson/redisson/src/main/java/org/redisson/RedissonLock.java - L227
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
...
// まずはロックの獲得を試みる
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ロックが獲得できたなら、trueをリターンする
if (ttl == null) {
return true;
}
...
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// ロックの獲得に失敗したのなら、ロック解除をサブスクしている
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
...
acquireFailed(waitTime, unit, threadId);
return false;
}
...
try {
...
while (true) {
long currentTime = System.currentTimeMillis();
// Blockからreleaseされたらロックの獲得に出る
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ロックが獲得できたなら、trueをリターンする
if (ttl == null) {
return true;
}
...
// セマフォーのLatchを用いてPubからreleaseになるまでスレッドはCPUを占有しないBlock状態になる
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// waitTimeがタイムしたときに例外処理をする
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 終わったら、Pubのサブスクを解除する
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
- 実際にRedisに問い合わせするコードを見てみると
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
- Redisの原子的な処理のためのLuaスクリプトを見てみよう
-- KEYS[1]: ロックのキー、Hashの名前
-- ARGV[1]: ロックのミリ秒単位の生存期間
-- ARGV[2]: ロックをリクエストした自分自身(UUID、ThreadID)
if (
(redis.call('exists', KEYS[1]) == 0) -- ロックキーがかかっている
or
(redis.call('hexists', KEYS[1], ARGV[2]) == 1) -- ロックキーがかかっているけど持ち主がロックをリクエストした自分自身である
) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- Hashロックキーのロックをリクエストした自分自身の値を1増加させる。増加処理をするため自分自身は何度も再侵入(re-entrant)できる
redis.call('pexpire', KEYS[1], ARGV[1]); -- ロックのTTLを設定する
return nil; -- 成功を変換、ここではnilを成功として扱う
end;
return redis.call('pttl', KEYS[1]); -- ロックが他によってかかっていると、残り時間を変換する
executeAsync
- RedissonはRFutureを返すtryLockAsyncメソッドも提供している
suspend fun <R> executeAsync(
key: String,
func: () -> R,
timeUnit: TimeUnit = TimeUnit.SECONDS,
waitTime: Long = 5L,
leaseTime: Long = 3L
): R {
val lock = redissonClient.getLock(key)
val available = lock.tryLockAsync(waitTime, leaseTime, timeUnit).awaitResponse()
if (!available) throw TimeoutException("Redisson Lock timed out for key: $key")
return try {
func.invoke()
} finally {
lock.unlockAsync()
}
}
- RFutureはJavaの標準非同期クラスFutureを受け継いでいる
public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionStage<V>
- そのため、RFutureのawaitを使ってしまうと、内部的にはfuture.getが呼び出され、スレッドがブロッキングされる
- kotlinのためのExtensionが必要な理由だ
@Deprecated
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
RFutureExtension
- Lockが獲得できるまで、現在スレッドに処理を譲るようにRFutureをCoroutineに変換するExtensionである
import kotlinx.coroutines.suspendCancellableCoroutine
import org.redisson.api.RFuture
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
suspend fun <T> RFuture<T>.awaitResponse(): T = suspendCancellableCoroutine { continuation ->
this.onComplete { result, throwable ->
if (throwable != null) {
continuation.resumeWithException(throwable)
} else {
continuation.resume(result)
}
}
continuation.invokeOnCancellation {
this.cancel(true)
}
}
結論
- 分散環境で冪等的な処理のために必要なKVSとしてRedisを取り入れるようにしよう
- RedissonをKotlinのCoroutineで使用するにはExtensionで拡張しないと、スレッドがブロックされてしまうことを覚えておこう