13
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

LaravelをElasticMQ(Amazon SQS互換)と連携してみる

Last updated at Posted at 2019-08-26

こんにちは! ニアです。
Laravelには、Amazon SQSと連携して処理を実行させる機能があるのですが、
そのAmazon SQSと互換性のあるインターフェースを持つ、**ElasticMQ**というScala製のアプリがあったので、
Laravelと連携させてみました。

1. この記事で扱っている、Laravelの環境

  • Laravel 5.8
    • PHP(fpm版) 7.3
    • nginx 1.17.3
    • MySQL 5.7
    • Redis 5.0
  • ElasticMQ 0.14.10
    • OpenJDK(ランタイム) 11

2. ElasticMQのコンテナを追加

2.1. ディレクトリ構成

※DockerでLaravelの環境を構築する方法はこちら -> DockerでLaravelの開発環境を構築してみよう(&一緒に使うと便利なDockerイメージを紹介)
GitHub -> https://github.com/Nia-TN1012/docker-laravel

ディレクトリ構成
.
├── docker
│   ├── docker-compose.yml
│   ├── .env
│   ├── elasticmq       # 今回追加します。
│   │   └── conf
│   │       └── custom.conf
│   ├── mysql
│   │   └── init
│   │       └── grant.sh
│   ├── nginx
│   │   ├── conf
│   │   │   └── laravel.conf
│   │   └── logs
│   └── php-fpm
│       └── Dockerfile
└── laravel

2.2. ElasticMQの設定ファイル

https://github.com/softwaremill/elasticmq を参考に、ElasticMQの設定ファイルを作成します。

custom.conf
include classpath( "application.conf" )

node-address {
    protocol = http
    host = localhost
    port = 9324
    context-path = ""
}

rest-sqs {
    enabled = true
    bind-port = 9324
    bind-hostname = "0.0.0.0"
    sqs-limits = strict
}

generate-node-address = false

# ここでElasticMQに作成するキューを定義します。
queues {
    # http://elasticmq:9324/queue/laravel
    laravel {
        defaultVisibilityTimeout = 10 seconds
        delay = 5 seconds
        receiveMessageWait = 0 seconds
        deadLettersQueue {
            name = "laravel-dead-letters"
            maxReceiveCount = 3
        }
    }
    # http://elasticmq:9324/queue/laravel-dead-letters
    laravel-dead-letters { }
}

2.3. docker-compose.yml

version: "3"
services:
  # php-fpm
  php-fpm:
    image: laravel-php-fpm:7
    build: ./php-fpm/
    depends_on:
      - mysql
      - redis
    volumes:
      - ../laravel:/var/www/html

  # nginx
  nginx:
    image: nginx:mainline-alpine
    depends_on:
      - php-fpm
    ports:
      - 80:80
    volumes:
      - ./nginx/conf/laravel.conf:/etc/nginx/conf.d/default.conf:ro
      - ../laravel:/var/www/html
      - ./nginx/logs:/var/log/nginx

  # MySQL
  mysql:
    image: mysql:5.7
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: laravel
      MYSQL_USER: laravel_user
      MYSQL_PASSWORD: laravel_pass
    ports:
      - 3306:3306
    volumes:
      - ./mysql/init:/docker-entrypoint-initdb.d:ro
      - mysql-data:/var/lib/mysql

  # Redis
  redis:
    image: redis:alpine
    volumes:
      - redis-data:/data

  # (↓今回追加分)
  # ElasticMQ
  elasticmq:
    image: softwaremill/elasticmq
    volumes:
      # ElasticMQの設定ファイル
      - ./elasticmq/conf/custom.conf://opt/elasticmq.conf:ro

volumes: 
  mysql-data:
    driver: local
  redis-data:
    driver: local

2.4. php-fpmコンテナにaws/aws-sdk-phpをインストール

php-fpmコンテナからcomposer requireコマンドを実行し、aws/aws-sdk-phpをインストールします。

$ docker-compose exec php-fpm composer require aws/aws-sdk-php

3. ElasticMQコンテナの起動

$ docker-compose up -d
Creating laravel_elasticmq_1 ... done

3.1. ElasticMQのAPIにアクセス

では早速、ElasticMQのAPIにアクセスしてみましょう。

php-fpmコンテナからphp -aコマンドで、インタラクティブシェル(REPL)に入ります。

$ docker-compose exec php-fpm php -a

AWS SDKのSQLクライアントオブジェクト作成すると、そこからElasticMQのAPIにアクセスできます。

php-fpmコンテナのPHPのREPL内
require 'vendor/autoload.php';

// Amazon SQSクライアントの作成
$client = new Aws\Sqs\SqsClient( [
    'endpoint' => "http://elasticmq:9324",      // ElasticMQのエンドポイント
    'region' => "us-east-1",                    // リージョンは適当でOKです。
    'credentials' => false,                     // credentialsはfalseでOKです。
    'version' => "latest"                       // バージョン
] );

// ElasticMQで作成したキューの一覧を取得します。
echo $client->listQueues();
// 実行結果①

// laravelキューのURL情報を取得します。
echo $client->getQueueUrl( ['QueueName' => "laravel"] );
// 実行結果②
実行結果①
{
    /* キューのURL一覧 */
    "QueueUrls": [
        "http:\/\/localhost:9324\/queue\/laravel",
        "http:\/\/localhost:9324\/queue\/laravel-dead-letters"
    ],
    /* メタデータ */
    "@metadata": {
        "statusCode": 200,
        "effectiveUri": "http:\/\/elasticmq:9324",
        "headers": {
            "server": "akka-http\/10.1.9",
            "date": "Mon, 26 Aug 2019 07:42:51 GMT",
            "connection": "close",
            "content-type": "text\/plain; charset=UTF-8",
            "content-length": "483"
        },
        "transferStats": {
            "http": [
                []
            ]
        }
    }
}
実行結果②
{
    /* キューのURL */
    "QueueUrl": "http:\/\/localhost:9324\/queue\/laravel",
    "@metadata": {
        /* 中略 */
    }
}

4. LaravelからElasticMQにアクセスする

今度はLaravelからElasticMQを利用できるようにしてみましょう。

4.1. Laravelの設定ファイルの編集

laravel/config/queue.phpの'connections'内に以下を追加します。

laravel/config/queue.php
'connections' => [

    // 中略

    'elasticmq' => [
        'driver' => 'sqs',
        # ElasticMQは資格情報を使用しないので、
        # keyとsecretはnullを、credentialsはfalseを指定します。
        'key' => null,
        'secret' => null,
        'credentials' => false,
        'endpoint' => env('ELASTICMQ_ENDPOINT'),
        'prefix' => env('SQS_PREFIX'),
        'queue' => env('SQS_QUEUE'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    ],

],

laravel/.envを編集します。

laravel/.env
ELASTICMQ_ENDPOINT=http://elasticmq:9324        # ElasticMQのエンドポイント
SQS_PREFIX=queue                                # プレフィックス名
SQS_QUEUE=laravel                               # キュー名(キューのURLは、${ELASTICMQ_ENDPOINT}/${SQS_PREFIX}/${SQS_QUEUE} になります。)
QUEUE_CONNECTION=elasticmq                      # キューは'elasticmq'を指定します。

4.2. テーブル作成とMigration

ジョブが失敗した時のエラー情報を格納するテーブルを作成します。

# failed_jobsテーブルを作成するMigrationファイルを作成します。
# (※Laravel6では、はじめから作成されているので不要です。)
$ docker-compose exec php-fpm php artisan queue:failed-table
Migration created successfully!

# Migration
$ docker-compose exec php-fpm php artisan migrate
Migrating: 2019_08_26_060630_create_failed_jobs_table
Migrated:  2019_08_26_060630_create_failed_jobs_table (0.03 seconds)

4.3. ジョブファイルの作成

今回はサンプルとして、10秒待機するだけのジョブを作成します。

TestJobクラスの作成
$ docker-compose exec php-fpm php artisan make:job TestJob
app/Jobs/TestJob.php
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $param;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct( $param )
    {
        // パラメーターはここで受け取ります。
        $this->param = $param;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // ここにジョブの内容を記述します。
        \Log::info( "Job:{$this->param} started." );
        sleep( 10 );
        \Log::info( "Job:{$this->param} finished." );
    }
}

4.4. ジョブをキューに登録する

php-fpmコンテナからphp artisan tinkerコマンドでTinkerを起動し、先ほど作成したジョブをキューに登録します。

Tinkerの起動
$ docker-compose exec php-fpm php artisan tinker
php-fpmコンテナのTinker内
# TestJobをキューに登録
>>> Queue::push( new App\Jobs\TestJob( "Hoge" ) );
=> "797d7214-f2b5-4feb-b4ba-fe4841af3e57"
>>> exit
Exit:  Goodbye

※キューに格納されているメッセージを取得するには、ElasticMQのAPIにアクセスします。

php-fpmコンテナPHPのREPL内
require 'vendor/autoload.php';

$client = new Aws\Sqs\SqsClient( [
    'endpoint' => "http://elasticmq:9324",
    'region' => "us-east-1",
    'credentials' => false,
    'version' => "latest"
] );

// laravelキューに入っているメッセージ一覧を取得します。
echo $client->receiveMessage( ['QueueUrl' => "http://elasticmq:9324/queue/laravel"] );
// 実行結果③
実行結果③

{
    "Messages": [
        /* 先ほどTinkerからキューに登録したTestJobインスタンス */
        {
            "MessageId": "797d7214-f2b5-4feb-b4ba-fe4841af3e57",
            "ReceiptHandle": "797d7214-f2b5-4feb-b4ba-fe4841af3e57#0e4689f3-b9e7-452d-8517-75673ce3434c",
            "MD5OfBody": "8ba9014bd4d225cabec95df49e2f264d",
            "Body": "{\"displayName\":\"App\\\\Jobs\\\\TestJob\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"delay\":null,\"timeout\":null,\"timeoutAt\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\TestJob\",\"command\":\"O:16:\\\"App\\\\Jobs\\\\TestJob\\\":8:{s:8:\\\"\\u0000*\\u0000param\\\";s:4:\\\"Hoge\\\";s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:5:\\\"delay\\\";N;s:7:\\\"chained\\\";a:0:{}}\"}}"
        }
    ],
    "@metadata": {
        /* 中略 */
    }
}

4.5. ジョブを実行

キューに溜まっているジョブを実行するには、php-fpmコンテナからphp artisan queue:workコマンドを実行します。

$ docker-compose exec php-fpm php artisan queue:work
[2019-08-26 09:11:47][797d7214-f2b5-4feb-b4ba-fe4841af3e57] Processing: App\Jobs\TestJob
[2019-08-26 09:11:58][797d7214-f2b5-4feb-b4ba-fe4841af3e57] Processed:  App\Jobs\TestJob
^C  # 終了するときは、Ctrl+Cキーを押します。

4.6. php artisan queue:work専用のコンテナを立てる

ElasticMQにジョブを登録した時、自動的にジョブを順次実行できるように、php artisan queue:work専用のコンテナを立てます。

docker-compose.yml
# services:
  # `php artisan queue:work`専用コンテナ
  queue-worker:
    image: laravel-php-fpm:7
    build: ./php-fpm/
    depends_on:
      - mysql
      - redis
    # 起動時のコマンドを `php artisan queue:work` にします。
    command: ["php", "artisan", "queue:work"]
    # queue-workerコンテナが落ちた時、自動的に再起動します。
    restart: always
    volumes:
      # Laravelのソースファイル
      - ../laravel:/var/www/html
$ dcoker-compose up -d
Creating laravel_queue-worker_1 ... done

php-fpmコンテナのTinkerからジョブをキューに登録すると・・・、

php-fpmコンテナのTinker内
>>> Queue::push( new App\Jobs\TestJob( "HogeHoge" ) );
=> "4952d228-7fe9-49d2-a142-e5066375b148"
>>> Queue::push( new App\Jobs\TestJob( "FugaFuga" ) );
=> "8ba5ff4b-909c-48c5-925c-8ff591e95a49"
>>>

queue-workerコンテナはジョブを順次実行します。

queue-wokerコンテナのコンソール出力
# docker-compose logs queue-worker
queue-worker_1  | [2019-08-26 09:25:48][4952d228-7fe9-49d2-a142-e5066375b148] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-26 09:25:58][4952d228-7fe9-49d2-a142-e5066375b148] Processed:  App\Jobs\TestJob
queue-worker_1  | [2019-08-26 09:26:16][8ba5ff4b-909c-48c5-925c-8ff591e95a49] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-26 09:26:26][8ba5ff4b-909c-48c5-925c-8ff591e95a49] Processed:  App\Jobs\TestJob

4.7. FIFOキューを使う

ElsaticMQのキューには、標準キューの他にFIFO(First In First Out)キューがあります。

ElasticMQでFIFOキューを使用するには、設定ファイル内のキューの定義にて、fifoオプションをtrueに設定します。

custom.conf
include classpath( "application.conf" )

node-address {
    # 中略
}

rest-sqs {
    # 中略
}

generate-node-address = false

# ここでElasticMQに作成するキューを定義します。
queues {
    # 中略

    # http://elasticmq:9324/queue/laravel.fifo
    "laravel.fifo" {
        defaultVisibilityTimeout = 10 seconds
        delay = 5 seconds
        receiveMessageWait = 0 seconds
        fifo = true        # fifoオプションをtrueに設定します。
        deadLettersQueue {
            name = "laravel.fifo-dead-letters"
            maxReceiveCount = 3
        }
    }
    # http://elasticmq:9324/queue/laravel.fifo-dead-letters
    "laravel.fifo-dead-letters" { }
}

注: Amazon SQSでFIFOキューを作成する時、キュー名の最後に.fifoが必要なのですが、それに倣ってElasticMQで定義する場合、キュー名を"(ダブルクォーテーション)で囲む必要があります。(ScalaやJavaのTypesafe Configにおける、オブジェクト名の仕様のため)

ElasticMQコンテナを再起動
$ docker-compose restart elasticmq

ただし、このままではキューにジョブを登録しようとした時にBad Requestエラーになってしまうので、
LaravelでAmazon SQSのFIFOキューを使う方法
を参考に、FIFOキュー対応のSQSプロパイダーを作成します。

laravel/app/Services/SqsFifoQueue.php
<?php

namespace App\Services;

class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
    // MessageGroupIdパラメーターに渡す値(※同一の値であれば、FIFOキューに登録した順序でジョブが実行されます)
    protected $message_group_id;

    public function __construct($sqs, $default, $prefix = '', $message_group_id = null)
    {
        parent::__construct($sqs, $default, $prefix);
        $this->message_group_id = $message_group_id;
    }


    public function pushRaw($payload, $queue = null, $options = [])
    {
        $response = $this->sqs->sendMessage([
            'QueueUrl' => $this->getQueue($queue),
            'MessageBody' => $payload,
            'MessageGroupId' => $this->message_group_id ?? uniqid(),
            'MessageDeduplicationId' => uniqid(),
        ]);

        return $response->get('MessageId');
    }

    // FIFOキューでは、DelaySecondsパラメーターをサポートしないので、例外をスローします。
    public function later($delay, $job, $data = '', $queue = null)
    {
        throw new \Exception( "Cannot support DelaySeconds for FIFO queues." );
    }
}
laravel/app/Services/SqsFifoConnector.php
<?php

namespace App\Services;

use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;

class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
    public function connect($config)
    {
        $config = $this->getDefaultConfiguration($config);

        if ($config['key'] && $config['secret']) {
            $config['credentials'] = Arr::only($config, ['key', 'secret']);
        }

        return new SqsFifoQueue(new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['message_group_id']);
    }
}
laravel/config/queue.php
'connections' => [

    // 中略

    'elasticmqfifo' => [
        'driver' => 'sqsfifo',
        'key' => null,
        'secret' => null,
        'credentials' => false,
        'endpoint' => env('ELASTICMQ_ENDPOINT'),
        'prefix' => env('SQS_PREFIX'),
        'queue' => env('SQS_QUEUE'),
        // ここでは、message_group_idのデフォルト値を"${SQS_PREFIX}/${SQS_QUEUE}"にしています。
        'message_group_id' => env('SQS_MESSAGE_GROUP_ID', env('SQS_PREFIX')."/".env('SQS_QUEUE')),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    ],

],
laravel/.env
ELASTICMQ_ENDPOINT=http://elasticmq:9324
SQS_PREFIX=queue
SQS_QUEUE=laravel.fifo
QUEUE_CONNECTION=elasticmqfifo

これで、FIFOキューにジョブを登録することができます。

php-fpmコンテナのTinker内
>>> for( $i = 1; $i <= 5; $i++ ) {
... echo "Job_{$i}: ".Queue::push( new App\Jobs\TestJob( "Job_{$i}" ) ).PHP_EOL;
... }
Job_1: bc8fa4e8-3c02-4916-a4a0-40603537f7b9
Job_2: b8e4c344-4c08-4a99-9d5a-c45bc44e46cb
Job_3: 2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2
Job_4: a3d68f0e-b5fa-4785-9961-0990ba4f3e28
Job_5: 6e2a2681-6aef-48ac-a8ab-05daac167c07
queue-wokerコンテナのコンソール出力
queue-worker_1  | [2019-08-28 04:55:32][bc8fa4e8-3c02-4916-a4a0-40603537f7b9] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:33][bc8fa4e8-3c02-4916-a4a0-40603537f7b9] Processed:  App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:33][b8e4c344-4c08-4a99-9d5a-c45bc44e46cb] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:34][b8e4c344-4c08-4a99-9d5a-c45bc44e46cb] Processed:  App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:34][2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:35][2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2] Processed:  App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:35][a3d68f0e-b5fa-4785-9961-0990ba4f3e28] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:37][a3d68f0e-b5fa-4785-9961-0990ba4f3e28] Processed:  App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:37][6e2a2681-6aef-48ac-a8ab-05daac167c07] Processing: App\Jobs\TestJob
queue-worker_1  | [2019-08-28 04:55:38][6e2a2681-6aef-48ac-a8ab-05daac167c07] Processed:  App\Jobs\TestJob

参考サイト

13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?