はじめに
Amazon SQSでFIFOキューを使用するワーカーを実装したとき、期待通りに動かない事象が発生したので、注意すべきことをまとめます。
目的を検討する
まず、本当にFIFOキューでいいのか? を検討しましょう。
FIFOキューの特徴は
- 配信はExactly Onceである
- 標準キューはAt least once(1回以上、0回はなし)
- 順序が保証される(First-In-First-Out)
- ただし、同じMessageGroupIdを指定した場合、その中で順序が保証される
- かわりに並列処理できなくなる(直列化)ことに注意
- ただし、同じMessageGroupIdを指定した場合、その中で順序が保証される
- キューに投入した時点で重複排除される
- 設定によっては、中身を見て自動処理してくれる
- 同じMessageDeduplicationIdを指定すると重複扱いになる
もしも、複数回実行チェックを自前で実装できるのであれば、標準キューの方がスループットも高く、コストも低いです。また、順序が保証されることは並列処理できないこととトレードオフなので、性能要件がある場合には問題となります。
実装する
古いバージョンのLaravelでは、既存クラスに拡張が必要です。
https://qiita.com/suzumurakk/items/d23e39c322c44c99f0ed
Laravel 12からは標準クラスで対応できるようです。
https://readouble.com/laravel/12.x/ja/queues.html#sqs-fifo-and-fair-queues
以下のコードは旧バージョンでのサンプルになります。
キュー投入側で注意すること
キューにデータを投入する、図の「スケジューラー」側の確認事項をまとめます。
一意のMessageGroupIdを設定する
同じMessageGroupIdを指定した場合、一度呼び出したものがすべて処理されない限り、ほかのワーカーの処理は進みません。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/interleaving-multiple-ordered-message-groups.html
たとえばDIで初期化したときに値を設定してしまうと、その値がずっと使われ続けるため、同じMessageGroupIdになり、直列化されます。
ただしその場合でも、スケジューラーで稼働する処理の中でDIが動く場合、スケジューラーの実行単位で直列化されるので、スケジューラーごとに並列稼働します。しかし単一スケジューラーでは直列で動きます。
この挙動が非常にわかりにくい。怪しい挙動がある場合、このあたりをチェックしてください。
class MailSqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
{
public function register()
{
// 起動時のみ呼ばれる
$this->app->afterResolving('queue', function ($manager) {
$manager->addConnector('mail', function () {
return new SqsFifoConnector;
});
});
}
}
class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
public function connect($config): SqsFifoQueue
{
$config = $this->getDefaultConfiguration($config);
if (!empty($config['key']) && !empty($config['secret']) && !empty($config['token'])) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
// このuniqid()は起動時刻
return new SqsFifoQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? '', uniqid()
);
}
}
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
protected mixed $message_group_id;
public function __construct($sqs, $default, $prefix = '', $message_group_id = null)
{
parent::__construct($sqs, $default, $prefix);
$this->message_group_id = $message_group_id;
}
...
}
一意のMessageDeduplicationIdを設定する
同じMessageDeduplicationIdを指定すると、FIFOキュー側で無視されます。しかも、エラーにはなりません。
サイレントに無視される形になります。
この値を uniqid() などを使って生成したとき、意図せず重複排除され、キュー投入に失敗することがあります。しかし同じMessageGroupIdを使ったときは直列で稼働し、直列稼働ならuniqid()の結果もほぼ間違いなく異なるため、気づかないことが多いです。
並列稼働するようになって初めて問題が発生します。
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
public function pushRaw($payload, $queue = null, $options = [])
{
// microsecが重複すると、同じ値になる
$messageDeduplicationId = uniqid();
try {
$result = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'MessageGroupId' => $this->message_group_id,
'MessageDeduplicationId' => $messageDeduplicationId,
])->get('MessageId');
return $result;
} catch (\Exception $e) {
$message = $e->getMessage();
return false;
}
}
}
ビジネスロジック内で、一意のIDを取得しましょう。先述のMessageGroupIdと同じ値を使えば、並列処理もできます。
キュー処理側で注意すること
キューを処理する、図の「ワーカー」側の注意事項をまとめます。
プログラム側の直列処理機能
FIFOキューは直列実行を容易にしてくれますが、Laravelなどのフレームワーク内で直列実行を保証することもできます。
LaravelではShouldBeUniqueインタフェースを実装することで一意性が保証されます。内部的にはRedisなどのキャッシュを使ってロックを取って処理しているようです。
https://readouble.com/laravel/12.x/ja/queues.html#unique-jobs
上記のマニュアル上では「一意」「unique」という言葉が多発しています。しかしこの「一意」とは「ひとつのジョブのみが稼働していること」であり、「重複排除」の文脈で使われる「一意」とは異なるので注意してください。
APIで動作確認する
これらの実装を動作確認するには、実データを見るのが一番です。
$ aws sqs receive-message --queue-url https://sqs.ap-northeast-1.amazonaws.com/000000000000/xxxxx.fifo --attribute-names All --message-attribute-names All --max-number-of-messages 2
{
"Messages": [
{
"MessageId": "3e076147-e38d-4c0d-b3d1-3c8ee01b15c5",
"ReceiptHandle": "...",
"MD5OfBody": "2bdee2fba4432efee35ef4ddbf15d684",
"Body": "...",
"Attributes": {
"SenderId": "AROAT5CDXIPKRPC4UA265:530a68ce26e744f5a2c70462f4a6704f",
"ApproximateFirstReceiveTimestamp": "1759737991098",
"ApproximateReceiveCount": "2",
"SentTimestamp": "1759737727202",
"SequenceNumber": "2287846757998148112384",
"MessageDeduplicationId": "101322146",
"MessageGroupId": "68e3777f181f2" // ここが重複=配信が直列
}
},
{
"MessageId": "588a0325-2f23-4c60-9654-801fdccc1be2",
"ReceiptHandle": "...",
"MD5OfBody": "71b0beda8ca3502ccb368bd162016152",
"Body": "...",
"Attributes": {
"SenderId": "AROAT5CDXIPKRPC4UA265:530a68ce26e744f5a2c70462f4a6704f",
"ApproximateFirstReceiveTimestamp": "1759738313929",
"ApproximateReceiveCount": "1",
"SentTimestamp": "1759737727250",
"SequenceNumber": "2287846757998160400384",
"MessageDeduplicationId": "101322147",
"MessageGroupId": "68e3777f181f2" // ここが重複=配信が直列
}
}
]
}
MessageGroupIdが重複しているため、これらは直列に配信されます。
つまりメッセージを1個のみ処理するとき、1個目が処理完了するまで、どのワーカーも2個目を取りに行くことができません。
たとえばチケットサイトで、標準キューに入った順に整理番号を割り振るとすると、購入順と整理番号が一致しないことになるでしょう。この場合はFIFOキューで、同じMessageGroupIdを使った直列処理の方が好ましい動作となります。
しかし、そもそも購入を受け付けておいてキュー順で処理すること自体、申込成功したと思ったら失敗した(処理順によって在庫が切れた)ことが起こりうるため、微妙な挙動です。購入時に整理番号を発行して、後続の処理だけキューで行うのがよいです。
ログで確認する
Cloudwatch logsにワーカーの開始・終了を記録している場合は、ログの挙動を眺めてみてください。
ワーカーが複数台あって、並列処理されている場合は、複数ワーカーからのログが入り乱れるはずです。それがなく、
- ワーカー処理終了の直後に別ワーカーのログが入る
- しばらくそのワーカーの処理ログが続いて、また別ワーカーのログが入る
以上のような挙動を長く繰り返している場合、そのワーカーは並列処理されていない可能性が高いです。
同様に、通常より多くの台数のワーカーを起動して稼働させてもスループットが向上しない場合は、おそらく直列処理で動いています。
まとめ
なんとなくお分かりかと思いますが、このドキュメントは、既存FIFOキューを並列稼働させようとしたときにハマったことから作られています。
キュー・ワーカーの構成で、何をやってもスループットが出ないとき、「そのワーカーは本当に並列稼働していますか?」ということを、まず疑ってみましょう。