LoginSignup
0
0

More than 1 year has passed since last update.

LaravelでAWS SNSを使ったファンアウトパターンを実現したいとき

Last updated at Posted at 2023-03-25

課題

Laravelではメッセージキューを利用する場合、それを処理するワーカーもLaravelで実装されている必要があるのですが
AWS SQSとSNSを利用したファンアウトパターンを実現しようとすると、ペイロード形式が異なるため処理ができないという問題がありました。

Laravelのワーカーが想定している形式:

  {
    "displayName": "Job\\TestJob",
    "job": "Illuminate\\Queue\\CallQueuedHandler@call",
    "maxTries": null,
    "delay": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
      "commandName": "Job\\TestJob",
      "command": "O:89:\"Job\\TestJob\":9:{s:101:\"\u0000Job\\TestJob\u0000id\";i:123;s:95:\"\u0000Job\\TestJob\u0000type\";s:4:\"type\";s:10:\"connection\";s:34:\"test-queue\";s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:5:\"delay\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"
    }
  }

SNSのメッセージ形式:
(パブリッシャーで指定するペイロードは Message に含まれている)

  {
    "Type": "Notification",
    "MessageId": "8cb29634-1558-4e4d-9cb7-45a5fe740d97",
    "TopicArn": "arn:aws:sns:ap-northeast-1:000000000000:test-topic",
    "Message": "{\"job\":\"Jobs\\\\TestJob\",\"data\":{\"id\":12345,\"type\":\"hogeType\"}}",
    "Timestamp": "2022-04-11T02:01:02.168Z",
    "SignatureVersion": "1",
    "Signature": "EXAMPLEpH+..",
    "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",
    "MessageAttributes": {
      "status": {
        "Type": "String",
        "Value": "hoge"
      }
    }
  }

アプローチ

SNS側の仕様を変えることは難しそうで、パブリッシャー(Laravelアプリケーション)側で打つ手はないので
コンシューマ(Laravelワーカー)側で対応を取ろうと考えました。
個人開発のプラグインもありましたが、今後の保守性を考慮して、参考にしつつ自前で実装しました。
(Laravelのバージョンは6系です)

まず、失敗するケースです。
普通にキューの接続情報を書きます。

config/queue.php
        'test-job' => [
            'driver'           => 'sqs',
            // 以下接続情報は省略
        ],

次に、ワーカープロセスを起動するコマンドを叩きます

php artisan queue:work test-job

この時、パブリッシャ側は TestJob的なジョブクラスをペイロードとしてtest-jobで指定されたキューにメッセージングしているとします。
App -> SQS <- Worker という構図なら、ワーカーはペイロードから、実行するジョブがTestJobであると判定し、実行してくれます。

しかしこれがファンアウトパターンを取っていたとき、設定ファイルのdriverにsqsが設定されていると、Laravelが用意している SqsConnectorが使用され、メッセージ形式が異なる問題が発生します。

なのでまず、 QueueServiceProviderを継承した独自のサービスプロバイダを作成し、キュー接続情報を登録するメソッドをオーバーライドします。

class OriginalQueueServiceProvider extends QueueServiceProvider
{
    public function registerConnectors($manager)
    {
        $manager->addConnector('sqs-sns-queue-driver', function () {
            return new SqsSnsConnector(TestJob::class, true);
        });
    }
}
config/queue.php
        'test-job' => [
            'driver'           => 'sqs-sns-queue-driver',
            // 以下接続情報は省略
        ],

次に、ワーカープロセスを起動するコマンドを叩きます

php artisan queue:work test-job

そうするとconfigファイルを読み取って sqs-sns-queue-driver を使用してSQSからキューを取得します
この時使われる SqsSnsConnectorはこのようになっています

class SqsSnsConnector extends SqsConnector
{
    private string $job_class;
    private bool $is_fifo;

    public function __construct(
        string $job_class,
        bool $is_fifo = false
    ) {
        $this->job_class = $job_class;
        $this->is_fifo = $is_fifo;
    }

    public function connect(array $config)
    {
        $config = $this->getDefaultConfiguration($config);

        if ($config['key'] && $config['secret']) {
            $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
        }

        if ($this->is_fifo) {
            return new SqsSnsFifoQueue(
                new SqsClient($config),
                $config['queue'],
                $this->job_class,
                Arr::get($config, 'prefix', ''),
                $config['message-group-id'] ?? ''
            );
        } else {
            return new SqsSnsQueue(
                new SqsClient($config),
                $config['queue'],
                $this->job_class,
                Arr::get($config, 'prefix', '')
            );
        }
    }
}

FIFOが云々は後ほど説明します。
SqsConnectorを継承して、親クラスではSqsQueueを返すところを、独自のキュークラスを返すようにしています。

SqsQueueクラスはこのようになっています。

class SqsSnsQueue extends SqsQueue
{
    private string $job_class;

    public function __construct(SqsClient $sqs, $default, string $job_class, $prefix = '')
    {
        parent::__construct($sqs, $default, $prefix);
        $this->job_class = $job_class;
    }

    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        $response = $this->sqs->receiveMessage([
            'QueueUrl'       => $queue,
            'AttributeNames' => ['ApproximateReceiveCount'],
        ]);

        if (is_array($response['Messages']) && count($response['Messages']) > 0) {
            $body = json_decode($response['Messages'][0]['Body'], true);

            if (isset($body['Type']) && $body['Type'] === 'Notification') {
                return new SqsSnsJob(
                    $this->container,
                    $this->sqs,
                    $response['Messages'][0],
                    $this->connectionName,
                    $queue,
                    $this->job_class
                );
            } else {
                return new SqsJob(
                    $this->container,
                    $this->sqs,
                    $response['Messages'][0],
                    $this->connectionName,
                    $queue
                );
            }
        }
    }
}

SqsQueueクラスはAWSのSDKを経由してAPIを呼び出すインターフェース的なクラスです
これを継承して、取り出したキューのBodyがSNSのメッセージの形式であれば、またさらに独自のジョブクラスを返却します

class SqsSnsJob extends SqsJob
{
    public function __construct(
        Container $container,
        SqsClient $sqs,
        array $job,
        ?string $connection_name,
        ?string $queue,
        string $job_class
    ) {
        parent::__construct($container, $sqs, $job, $connection_name, $queue);

        $this->job = $this->resolveSnsSubscription($this->job, $job_class);
    }

    protected function resolveSnsSubscription(array $job, string $job_class): array
    {
        $body = json_decode($job['Body'], true);

        $commandName = $job_class;

        $instance = $this->makeCommand($commandName, $body);

        $command = serialize($instance);

        $job['Body'] = json_encode([
            'maxTries'    => $instance->tries ?? null,
            'delay'       => $this->getJobRetryDelay($instance),
            'timeout'     => $instance->timeout ?? null,
            'timeoutAt'   => $this->getJobExpiration($instance),
            'uuid'        => $body['MessageId'],
            'displayName' => $commandName,
            'job'         => CallQueuedHandler::class . '@call',
            'data'        => compact('commandName', 'command'),
        ]);

        return $job;
    }

    protected function makeCommand(string $command_name, array $body)
    {
        $data = json_decode($body['Message'], true);

        return $this->container->make($command_name, $data);
    }

    public function getJobRetryDelay($job)
    {
        if (!method_exists($job, 'retryAfter') && ! isset($job->retryAfter)) {
            return null;
        }

        $delay = $job->retryAfter ?? $job->retryAfter();

        return $delay instanceof DateTimeInterface
            ? $this->secondsUntil($delay) : $delay;
    }

    public function getJobExpiration($job)
    {
        if (!method_exists($job, 'retryUntil') && ! isset($job->timeoutAt)) {
            return null;
        }

        $expiration = $job->timeoutAt ?? $job->retryUntil();

        return $expiration instanceof DateTimeInterface
            ? $expiration->getTimestamp() : $expiration;
    }
}

resolveSnsSubscription()で、Laravelワーカーが処理できるペイロード形式に変換しています。
最大試行回数とか期限あたりはQueue::createObjectPayload()を参考にしています。

これでSNS経由でもワーカーが処理できるようになりました。

ちなみに SqsSnsConnectorでFIFOかどうかをチェックしていたのは、LaravelのQueueクラスがFIFOキューの仕様に則っておらず
そのまま使用するとMessageGroupIDを指定しろと怒られるので
自前でMessageGroupIDをペイロードに含めてメッセージングするようなFIFO用のQueue拡張クラスを作成しており
FIFOかどうかによって返却するインスタンスを出し分けるためです。

もうちょっといい実装ができそうではありますが、参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0