遅延キューとは?どんな業務シーンに使えるのか?
実践シナリオ
- 注文の支払いに失敗した場合、一定時間ごとにユーザーにリマインドを送る
- ユーザーの同時接続数が多い状況では、2 分遅らせてメールを送信する
Redis による一般的なメッセージキューの実装
Kafka や RabbitMQ などの専門的なメッセージキュー・ミドルウェアでは、コンシューマーがメッセージを消費する前に一連の煩雑な手順を踏む必要があります。
例えば RabbitMQ では、メッセージ送信前に Exchange を作成し、Queue を作成し、さらに Queue と Exchange をあるルールでバインドしなければなりません。メッセージを送信する際は routingkey を指定する必要があり、ヘッダー情報も制御しなければなりません。
しかし、ほとんどの場合、メッセージキューには 1 つのコンシューマーしか存在しないにもかかわらず、上記の手順を経る必要があります。
Redis を使えば、1 つのコンシューマーしかいないようなシンプルなメッセージキューであれば、非常に簡単に実装できます。Redis は専門のメッセージキューではないため、高度な機能や ack の保証はありません。メッセージの信頼性に非常に高い要求がある場合、Redis は適していません。
非同期メッセージキューの基本的な実装
Redis の list(リスト)データ構造は、非同期メッセージキューとしてよく使われます。rpush/lpush
でキューに追加し、lpop/rpop
でキューから取り出します。
> rpush queue Leapcell_1 Leapcell_2 Leapcell_3
(integer) 3
> lpop queue
"Leapcell_1"
> llen queue
(integer) 2
問題 1:キューが空になったら?
クライアントはキューの pop 操作を通じてメッセージを取得し、処理します。処理が終わるとまた次のメッセージを pop して処理し続けます。これがコンシューマーとしてのクライアントのライフサイクルです。
しかしキューが空になった場合、クライアントは pop の無限ループに陥ります。データがないのに pop を繰り返す、いわゆる「無駄なポーリング」です。これはクライアントの CPU を浪費し、Redis の QPS(クエリ毎秒)も上昇させます。こういった無駄なポーリングを行うクライアントが数十台いると、Redis のスロークエリが急増する可能性があります。
この問題の一般的な対処法は、sleep を使ってスレッドを少し休ませることです。1 秒間スリープするだけでも、クライアントの CPU 使用率と Redis の QPS を下げることができます。
問題 2:キューの遅延
前述の sleep による方法で問題はある程度解決できます。しかし、コンシューマーが 1 人だけの場合、その遅延は 1 秒になります。複数のコンシューマーがいる場合は、各コンシューマーのスリープタイミングがずれるため、遅延は多少改善されます。
では、遅延を大幅に削減する方法はあるのでしょうか?
それが blpop
や brpop
です。
これら 2 つのコマンドの接頭辞 b
は「blocking」(ブロッキング=待機)を意味します。
ブロッキング読み取りは、キューにデータがない場合はすぐにスリープ状態に入り、データが到着すると即座に目覚めます。つまり、メッセージの遅延はほぼゼロになります。lpop/rpop
の代わりに blpop/brpop
を使うことで、上記の問題は完全に解決されます。
問題 3:アイドル接続の自動切断
実は、もう 1 つ解決すべき問題があります。それがアイドル接続の問題です。
スレッドがずっとブロッキング状態で待機していると、Redis クライアントの接続はアイドル(待機)状態になります。長時間アイドル状態が続くと、サーバーは接続を自動的に切断してリソース消費を抑えようとします。この時、blpop/brpop
は例外をスローします。
そのため、クライアント側のコンシューマーを実装する際は例外処理を丁寧に記述し、リトライ処理も忘れずに実装する必要があります。
分散ロックの競合処理
もしクライアントがリクエスト処理中に分散ロックの取得に失敗したら、どうすればよいでしょう?
一般的に、ロック取得失敗への対応策は以下の 3 つです:
- 例外をそのまま投げて、ユーザーに後で再試行するよう促す
- 少し sleep してから再試行する
- リクエストを遅延キューに送って、時間を置いてから再実行する
特定のタイプの例外を投げる
この方法は、ユーザーが直接発行したリクエストに適しています。ユーザーはエラーダイアログを見て、内容を確認し、「再試行」ボタンを押すなどの操作で再実行します。つまり、ユーザー自身による「手動の遅延再試行」になります。
ユーザー体験をより良くするには、フロントエンドのコードが代わりに遅延処理を行うのも良い方法です。本質的にはこの方法は「現在のリクエストを放棄する」アプローチであり、再実行の判断をユーザーに委ねます。
sleep を使う
この方法では現在のメッセージ処理スレッドをブロックしてしまい、キュー内の後続メッセージ処理に遅延を発生させます。もしロックの衝突が頻発したり、キューにメッセージが大量にある場合は、sleep は適していません。特定の key がデッドロックを引き起こしてロック取得できない場合、スレッドが完全に詰まり、その後のメッセージが永遠に処理されなくなる可能性もあります。
遅延キューに移す
この方法は非同期メッセージ処理に適しており、現在競合中のリクエストを別のキューに移して、時間をずらして再実行することで競合を回避します。
遅延キューの実装
zset
(ソート済みセット)を利用し、設定されたタイムスタンプを score
として使ってソートできます。
zadd score1 value1 ...
コマンドを使ってメモリにメッセージを次々と投入します。
zrangebyscore
を使って、条件に合致する全てのタスクを取得し、ループ処理することでメッセージを順次処理できます。
また、zrangebyscore key min max withscores limit 0 1
を使えば、最も早い 1 件だけを取り出して処理することもできます。
private Jedis jedis;
public void redisDelayQueueTest() {
String key = "delay_queue";
// 実際の開発では、業務ID + ランダムなユニークID を value にするのが推奨される。
// ランダムなIDはメッセージの一意性を保証し、業務IDは value に過剰な情報を含めずに済む。
String orderId1 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1);
String orderId2 = UUID.randomUUID().toString();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2);
new Thread() {
@Override
public void run() {
while (true) {
Set<String> resultList;
// 最初の1件だけを取得(この時点では削除しない)
resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1);
if (resultList.size() == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
} else {
// 取得したデータを削除(zrem)して処理を行う
if (jedis.zrem(key, resultList.get(0)) > 0) {
String orderId = resultList.get(0);
log.info("orderId = {}", resultList.get(0));
this.handleMsg(orderId);
}
}
}
}
}.start();
}
public void handleMsg(T msg) {
System.out.println(msg);
}
この実装はマルチスレッドでも問題ありません。
仮に 2 つのスレッド T1、T2(または他多数)で以下のような処理を行うとします:
- T1、T2、および他スレッドが
zrangebyscore
でメッセージ A を取得。 - T1 が A の削除準備を開始(
zrem
は原子操作)。T2 や他スレッドは T1 の処理を待機。 - T1 が A を削除し、削除成功(戻り値が 1)→ A を処理。
- T2 他スレッドが A の
zrem
を試みるが、既に削除済みなので全て失敗し、A の処理を放棄。
注意点:handleMsg
の処理内では例外キャッチを行い、個別のタスク処理の失敗がループ全体に影響しないようにすること。
さらなる最適化
前述のアルゴリズムでは、同じタスクが複数のプロセスに一時的に取得され、最終的に zrem
で競合するという無駄が生じる可能性があります。
タスクを取得したものの、zrem
に失敗して何もできず終わるプロセスが出てくるのは非効率です。
この問題は Lua スクリプト を使って、zrangebyscore
と zrem
を Redis サーバー側でまとめて原子的に実行することで解決できます。
複数のプロセスが同時にタスクを取り合っても、無駄な競合を防ぐことが可能になります。
Lua スクリプトでのさらなる最適化
以下の Lua スクリプトは、期限切れのメッセージがある場合、それを削除して返します。
なければ空文字列を返します。
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +
"if #resultArray > 0 then\n" +
" if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +
" return resultArray[1]\n" +
" else\n" +
" return ''\n" +
" end\n" +
"else\n" +
" return ''\n" +
"end";
jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Redis 遅延キューのメリット
Redis を使った遅延キューには以下の利点があります:
- Redis の
zset
は高性能なスコア順ソートをサポートしている - 操作はすべてメモリ上で行われるため、非常に高速
- Redis はクラスタ構成が可能で、メッセージ量が多い場合でも処理速度と可用性を向上できる
- 永続化機構(AOF や RDB)を持っているため、障害発生時でもデータ復元が可能で、信頼性がある
Redis 遅延キューのデメリット
一方で、Redis による遅延キューには以下のような欠点も存在します:
-
データの永続性と信頼性に課題がある
Redis 自体はメモリベースであり、永続化があっても信頼性は MQ 専用システムに劣る -
リトライメカニズムがない
メッセージ処理時にエラーが発生しても、自動的な再試行機構がないため、リトライ回数や間隔などを自分で実装する必要がある -
ACK メカニズムがない
例えば、メッセージを取得して削除後に処理中にクラッシュすると、そのメッセージは失われてしまう。
専用の MQ システムでは「このメッセージを正しく処理した」と明示的に返さない限り、完了とは見なされないため、信頼性が高い。
もしメッセージの信頼性を重視するのであれば、MQ の使用が推奨されます。
Redisson による遅延キューの実装
Redis ベースの Redisson には、分散遅延キュー構造である RDelayedQueue
という Java オブジェクトがあります。
これは RQueue
インターフェースを拡張しており、指定した遅延時間の後にキューにメッセージを追加することができます。
この機能を使えば、メッセージを一定時間遅らせて送信する、指数的な増加または減少といった再送戦略を実現できます。
RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒後に指定のキューにメッセージを送信
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 1分後にメッセージを送信
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
このオブジェクトが不要になった場合、明示的に破棄するべきです。
ただし、Redisson の他オブジェクト全体を閉じる場合は、明示的な破棄は不要です。
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
とても便利ですよね?
私たちはLeapcell、バックエンド・プロジェクトのホスティングの最適解です。
Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです:
複数言語サポート
- Node.js、Python、Go、Rustで開発できます。
無制限のプロジェクトデプロイ
- 使用量に応じて料金を支払い、リクエストがなければ料金は発生しません。
比類のないコスト効率
- 使用量に応じた支払い、アイドル時間は課金されません。
- 例: $25で6.94Mリクエスト、平均応答時間60ms。
洗練された開発者体験
- 直感的なUIで簡単に設定できます。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- 実行可能なインサイトのためのリアルタイムのメトリクスとログ。
簡単なスケーラビリティと高パフォーマンス
- 高い同時実行性を容易に処理するためのオートスケーリング。
- ゼロ運用オーバーヘッド — 構築に集中できます。
Xでフォローする:@LeapcellHQ