課題
Laravelではメッセージキューを利用する場合、それを処理するワーカーもLaravelで実装されている必要があるのですが
AWS SQSとSNSを利用したファンアウトパターンを実現しようとすると、ペイロード形式が異なるため処理ができないという問題がありました。
Laravelのワーカーが想定している形式:
{
"displayName": "Job\\TestJob",
"job": "Illuminate\\Queue\\CallQueuedHandler@call",
"maxTries": null,
"delay": null,
"timeout": null,
"timeoutAt": null,
"data": {
"commandName": "Job\\TestJob",
"command": "O:89:\"Job\\TestJob\":9:{s:101:\"\u0000Job\\TestJob\u0000id\";i:123;s:95:\"\u0000Job\\TestJob\u0000type\";s:4:\"type\";s:10:\"connection\";s:34:\"test-queue\";s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
}
}
SNSのメッセージ形式:
(パブリッシャーで指定するペイロードは Message
に含まれている)
{
"Type": "Notification",
"MessageId": "8cb29634-1558-4e4d-9cb7-45a5fe740d97",
"TopicArn": "arn:aws:sns:ap-northeast-1:000000000000:test-topic",
"Message": "{\"job\":\"Jobs\\\\TestJob\",\"data\":{\"id\":12345,\"type\":\"hogeType\"}}",
"Timestamp": "2022-04-11T02:01:02.168Z",
"SignatureVersion": "1",
"Signature": "EXAMPLEpH+..",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
"MessageAttributes": {
"status": {
"Type": "String",
"Value": "hoge"
}
}
}
アプローチ
SNS側の仕様を変えることは難しそうで、パブリッシャー(Laravelアプリケーション)側で打つ手はないので
コンシューマ(Laravelワーカー)側で対応を取ろうと考えました。
個人開発のプラグインもありましたが、今後の保守性を考慮して、参考にしつつ自前で実装しました。
(Laravelのバージョンは6系です)
まず、失敗するケースです。
普通にキューの接続情報を書きます。
'test-job' => [
'driver' => 'sqs',
// 以下接続情報は省略
],
次に、ワーカープロセスを起動するコマンドを叩きます
php artisan queue:work test-job
この時、パブリッシャ側は TestJob
的なジョブクラスをペイロードとしてtest-job
で指定されたキューにメッセージングしているとします。
App -> SQS <- Worker という構図なら、ワーカーはペイロードから、実行するジョブがTestJob
であると判定し、実行してくれます。
しかしこれがファンアウトパターンを取っていたとき、設定ファイルのdriverにsqs
が設定されていると、Laravelが用意している SqsConnector
が使用され、メッセージ形式が異なる問題が発生します。
なのでまず、 QueueServiceProvider
を継承した独自のサービスプロバイダを作成し、キュー接続情報を登録するメソッドをオーバーライドします。
class OriginalQueueServiceProvider extends QueueServiceProvider
{
public function registerConnectors($manager)
{
$manager->addConnector('sqs-sns-queue-driver', function () {
return new SqsSnsConnector(TestJob::class, true);
});
}
}
'test-job' => [
'driver' => 'sqs-sns-queue-driver',
// 以下接続情報は省略
],
次に、ワーカープロセスを起動するコマンドを叩きます
php artisan queue:work test-job
そうするとconfigファイルを読み取って sqs-sns-queue-driver
を使用してSQSからキューを取得します
この時使われる SqsSnsConnector
はこのようになっています
class SqsSnsConnector extends SqsConnector
{
private string $job_class;
private bool $is_fifo;
public function __construct(
string $job_class,
bool $is_fifo = false
) {
$this->job_class = $job_class;
$this->is_fifo = $is_fifo;
}
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
if ($this->is_fifo) {
return new SqsSnsFifoQueue(
new SqsClient($config),
$config['queue'],
$this->job_class,
Arr::get($config, 'prefix', ''),
$config['message-group-id'] ?? ''
);
} else {
return new SqsSnsQueue(
new SqsClient($config),
$config['queue'],
$this->job_class,
Arr::get($config, 'prefix', '')
);
}
}
}
FIFOが云々は後ほど説明します。
SqsConnector
を継承して、親クラスではSqsQueue
を返すところを、独自のキュークラスを返すようにしています。
SqsQueue
クラスはこのようになっています。
class SqsSnsQueue extends SqsQueue
{
private string $job_class;
public function __construct(SqsClient $sqs, $default, string $job_class, $prefix = '')
{
parent::__construct($sqs, $default, $prefix);
$this->job_class = $job_class;
}
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue,
'AttributeNames' => ['ApproximateReceiveCount'],
]);
if (is_array($response['Messages']) && count($response['Messages']) > 0) {
$body = json_decode($response['Messages'][0]['Body'], true);
if (isset($body['Type']) && $body['Type'] === 'Notification') {
return new SqsSnsJob(
$this->container,
$this->sqs,
$response['Messages'][0],
$this->connectionName,
$queue,
$this->job_class
);
} else {
return new SqsJob(
$this->container,
$this->sqs,
$response['Messages'][0],
$this->connectionName,
$queue
);
}
}
}
}
SqsQueue
クラスはAWSのSDKを経由してAPIを呼び出すインターフェース的なクラスです
これを継承して、取り出したキューのBodyがSNSのメッセージの形式であれば、またさらに独自のジョブクラスを返却します
class SqsSnsJob extends SqsJob
{
public function __construct(
Container $container,
SqsClient $sqs,
array $job,
?string $connection_name,
?string $queue,
string $job_class
) {
parent::__construct($container, $sqs, $job, $connection_name, $queue);
$this->job = $this->resolveSnsSubscription($this->job, $job_class);
}
protected function resolveSnsSubscription(array $job, string $job_class): array
{
$body = json_decode($job['Body'], true);
$commandName = $job_class;
$instance = $this->makeCommand($commandName, $body);
$command = serialize($instance);
$job['Body'] = json_encode([
'maxTries' => $instance->tries ?? null,
'delay' => $this->getJobRetryDelay($instance),
'timeout' => $instance->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($instance),
'uuid' => $body['MessageId'],
'displayName' => $commandName,
'job' => CallQueuedHandler::class . '@call',
'data' => compact('commandName', 'command'),
]);
return $job;
}
protected function makeCommand(string $command_name, array $body)
{
$data = json_decode($body['Message'], true);
return $this->container->make($command_name, $data);
}
public function getJobRetryDelay($job)
{
if (!method_exists($job, 'retryAfter') && ! isset($job->retryAfter)) {
return null;
}
$delay = $job->retryAfter ?? $job->retryAfter();
return $delay instanceof DateTimeInterface
? $this->secondsUntil($delay) : $delay;
}
public function getJobExpiration($job)
{
if (!method_exists($job, 'retryUntil') && ! isset($job->timeoutAt)) {
return null;
}
$expiration = $job->timeoutAt ?? $job->retryUntil();
return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp() : $expiration;
}
}
resolveSnsSubscription()
で、Laravelワーカーが処理できるペイロード形式に変換しています。
最大試行回数とか期限あたりはQueue::createObjectPayload()
を参考にしています。
これでSNS経由でもワーカーが処理できるようになりました。
ちなみに SqsSnsConnector
でFIFOかどうかをチェックしていたのは、LaravelのQueueクラスがFIFOキューの仕様に則っておらず
そのまま使用するとMessageGroupIDを指定しろと怒られるので
自前でMessageGroupIDをペイロードに含めてメッセージングするようなFIFO用のQueue拡張クラスを作成しており
FIFOかどうかによって返却するインスタンスを出し分けるためです。
もうちょっといい実装ができそうではありますが、参考になれば幸いです。