こんにちは! ニアです。
Laravelには、Amazon SQSと連携して処理を実行させる機能があるのですが、
そのAmazon SQSと互換性のあるインターフェースを持つ、**ElasticMQ**というScala製のアプリがあったので、
Laravelと連携させてみました。
1. この記事で扱っている、Laravelの環境
- Laravel 5.8
- PHP(fpm版) 7.3
- nginx 1.17.3
- MySQL 5.7
- Redis 5.0
- ElasticMQ 0.14.10
- OpenJDK(ランタイム) 11
2. ElasticMQのコンテナを追加
2.1. ディレクトリ構成
※DockerでLaravelの環境を構築する方法はこちら -> DockerでLaravelの開発環境を構築してみよう(&一緒に使うと便利なDockerイメージを紹介)
GitHub -> https://github.com/Nia-TN1012/docker-laravel
.
├── docker
│ ├── docker-compose.yml
│ ├── .env
│ ├── elasticmq # 今回追加します。
│ │ └── conf
│ │ └── custom.conf
│ ├── mysql
│ │ └── init
│ │ └── grant.sh
│ ├── nginx
│ │ ├── conf
│ │ │ └── laravel.conf
│ │ └── logs
│ └── php-fpm
│ └── Dockerfile
└── laravel
2.2. ElasticMQの設定ファイル
https://github.com/softwaremill/elasticmq を参考に、ElasticMQの設定ファイルを作成します。
include classpath( "application.conf" )
node-address {
protocol = http
host = localhost
port = 9324
context-path = ""
}
rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
sqs-limits = strict
}
generate-node-address = false
# ここでElasticMQに作成するキューを定義します。
queues {
# http://elasticmq:9324/queue/laravel
laravel {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
deadLettersQueue {
name = "laravel-dead-letters"
maxReceiveCount = 3
}
}
# http://elasticmq:9324/queue/laravel-dead-letters
laravel-dead-letters { }
}
2.3. docker-compose.yml
version: "3"
services:
# php-fpm
php-fpm:
image: laravel-php-fpm:7
build: ./php-fpm/
depends_on:
- mysql
- redis
volumes:
- ../laravel:/var/www/html
# nginx
nginx:
image: nginx:mainline-alpine
depends_on:
- php-fpm
ports:
- 80:80
volumes:
- ./nginx/conf/laravel.conf:/etc/nginx/conf.d/default.conf:ro
- ../laravel:/var/www/html
- ./nginx/logs:/var/log/nginx
# MySQL
mysql:
image: mysql:5.7
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: laravel
MYSQL_USER: laravel_user
MYSQL_PASSWORD: laravel_pass
ports:
- 3306:3306
volumes:
- ./mysql/init:/docker-entrypoint-initdb.d:ro
- mysql-data:/var/lib/mysql
# Redis
redis:
image: redis:alpine
volumes:
- redis-data:/data
# (↓今回追加分)
# ElasticMQ
elasticmq:
image: softwaremill/elasticmq
volumes:
# ElasticMQの設定ファイル
- ./elasticmq/conf/custom.conf://opt/elasticmq.conf:ro
volumes:
mysql-data:
driver: local
redis-data:
driver: local
2.4. php-fpmコンテナにaws/aws-sdk-php
をインストール
php-fpmコンテナからcomposer require
コマンドを実行し、aws/aws-sdk-phpをインストールします。
$ docker-compose exec php-fpm composer require aws/aws-sdk-php
3. ElasticMQコンテナの起動
$ docker-compose up -d
Creating laravel_elasticmq_1 ... done
3.1. ElasticMQのAPIにアクセス
では早速、ElasticMQのAPIにアクセスしてみましょう。
php-fpmコンテナからphp -a
コマンドで、インタラクティブシェル(REPL)に入ります。
$ docker-compose exec php-fpm php -a
AWS SDKのSQLクライアントオブジェクト作成すると、そこからElasticMQのAPIにアクセスできます。
require 'vendor/autoload.php';
// Amazon SQSクライアントの作成
$client = new Aws\Sqs\SqsClient( [
'endpoint' => "http://elasticmq:9324", // ElasticMQのエンドポイント
'region' => "us-east-1", // リージョンは適当でOKです。
'credentials' => false, // credentialsはfalseでOKです。
'version' => "latest" // バージョン
] );
// ElasticMQで作成したキューの一覧を取得します。
echo $client->listQueues();
// 実行結果①
// laravelキューのURL情報を取得します。
echo $client->getQueueUrl( ['QueueName' => "laravel"] );
// 実行結果②
{
/* キューのURL一覧 */
"QueueUrls": [
"http:\/\/localhost:9324\/queue\/laravel",
"http:\/\/localhost:9324\/queue\/laravel-dead-letters"
],
/* メタデータ */
"@metadata": {
"statusCode": 200,
"effectiveUri": "http:\/\/elasticmq:9324",
"headers": {
"server": "akka-http\/10.1.9",
"date": "Mon, 26 Aug 2019 07:42:51 GMT",
"connection": "close",
"content-type": "text\/plain; charset=UTF-8",
"content-length": "483"
},
"transferStats": {
"http": [
[]
]
}
}
}
{
/* キューのURL */
"QueueUrl": "http:\/\/localhost:9324\/queue\/laravel",
"@metadata": {
/* 中略 */
}
}
4. LaravelからElasticMQにアクセスする
今度はLaravelからElasticMQを利用できるようにしてみましょう。
4.1. Laravelの設定ファイルの編集
laravel/config/queue.php
の'connections'内に以下を追加します。
'connections' => [
// 中略
'elasticmq' => [
'driver' => 'sqs',
# ElasticMQは資格情報を使用しないので、
# keyとsecretはnullを、credentialsはfalseを指定します。
'key' => null,
'secret' => null,
'credentials' => false,
'endpoint' => env('ELASTICMQ_ENDPOINT'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
],
],
laravel/.env
を編集します。
ELASTICMQ_ENDPOINT=http://elasticmq:9324 # ElasticMQのエンドポイント
SQS_PREFIX=queue # プレフィックス名
SQS_QUEUE=laravel # キュー名(キューのURLは、${ELASTICMQ_ENDPOINT}/${SQS_PREFIX}/${SQS_QUEUE} になります。)
QUEUE_CONNECTION=elasticmq # キューは'elasticmq'を指定します。
4.2. テーブル作成とMigration
ジョブが失敗した時のエラー情報を格納するテーブルを作成します。
# failed_jobsテーブルを作成するMigrationファイルを作成します。
# (※Laravel6では、はじめから作成されているので不要です。)
$ docker-compose exec php-fpm php artisan queue:failed-table
Migration created successfully!
# Migration
$ docker-compose exec php-fpm php artisan migrate
Migrating: 2019_08_26_060630_create_failed_jobs_table
Migrated: 2019_08_26_060630_create_failed_jobs_table (0.03 seconds)
4.3. ジョブファイルの作成
今回はサンプルとして、10秒待機するだけのジョブを作成します。
$ docker-compose exec php-fpm php artisan make:job TestJob
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class TestJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $param;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct( $param )
{
// パラメーターはここで受け取ります。
$this->param = $param;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
// ここにジョブの内容を記述します。
\Log::info( "Job:{$this->param} started." );
sleep( 10 );
\Log::info( "Job:{$this->param} finished." );
}
}
4.4. ジョブをキューに登録する
php-fpmコンテナからphp artisan tinker
コマンドでTinkerを起動し、先ほど作成したジョブをキューに登録します。
$ docker-compose exec php-fpm php artisan tinker
# TestJobをキューに登録
>>> Queue::push( new App\Jobs\TestJob( "Hoge" ) );
=> "797d7214-f2b5-4feb-b4ba-fe4841af3e57"
>>> exit
Exit: Goodbye
※キューに格納されているメッセージを取得するには、ElasticMQのAPIにアクセスします。
require 'vendor/autoload.php';
$client = new Aws\Sqs\SqsClient( [
'endpoint' => "http://elasticmq:9324",
'region' => "us-east-1",
'credentials' => false,
'version' => "latest"
] );
// laravelキューに入っているメッセージ一覧を取得します。
echo $client->receiveMessage( ['QueueUrl' => "http://elasticmq:9324/queue/laravel"] );
// 実行結果③
{
"Messages": [
/* 先ほどTinkerからキューに登録したTestJobインスタンス */
{
"MessageId": "797d7214-f2b5-4feb-b4ba-fe4841af3e57",
"ReceiptHandle": "797d7214-f2b5-4feb-b4ba-fe4841af3e57#0e4689f3-b9e7-452d-8517-75673ce3434c",
"MD5OfBody": "8ba9014bd4d225cabec95df49e2f264d",
"Body": "{\"displayName\":\"App\\\\Jobs\\\\TestJob\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"delay\":null,\"timeout\":null,\"timeoutAt\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\TestJob\",\"command\":\"O:16:\\\"App\\\\Jobs\\\\TestJob\\\":8:{s:8:\\\"\\u0000*\\u0000param\\\";s:4:\\\"Hoge\\\";s:6:\\\"\\u0000*\\u0000job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:5:\\\"delay\\\";N;s:7:\\\"chained\\\";a:0:{}}\"}}"
}
],
"@metadata": {
/* 中略 */
}
}
4.5. ジョブを実行
キューに溜まっているジョブを実行するには、php-fpmコンテナからphp artisan queue:work
コマンドを実行します。
$ docker-compose exec php-fpm php artisan queue:work
[2019-08-26 09:11:47][797d7214-f2b5-4feb-b4ba-fe4841af3e57] Processing: App\Jobs\TestJob
[2019-08-26 09:11:58][797d7214-f2b5-4feb-b4ba-fe4841af3e57] Processed: App\Jobs\TestJob
^C # 終了するときは、Ctrl+Cキーを押します。
4.6. php artisan queue:work
専用のコンテナを立てる
ElasticMQにジョブを登録した時、自動的にジョブを順次実行できるように、php artisan queue:work
専用のコンテナを立てます。
# services:
# `php artisan queue:work`専用コンテナ
queue-worker:
image: laravel-php-fpm:7
build: ./php-fpm/
depends_on:
- mysql
- redis
# 起動時のコマンドを `php artisan queue:work` にします。
command: ["php", "artisan", "queue:work"]
# queue-workerコンテナが落ちた時、自動的に再起動します。
restart: always
volumes:
# Laravelのソースファイル
- ../laravel:/var/www/html
$ dcoker-compose up -d
Creating laravel_queue-worker_1 ... done
php-fpmコンテナのTinkerからジョブをキューに登録すると・・・、
>>> Queue::push( new App\Jobs\TestJob( "HogeHoge" ) );
=> "4952d228-7fe9-49d2-a142-e5066375b148"
>>> Queue::push( new App\Jobs\TestJob( "FugaFuga" ) );
=> "8ba5ff4b-909c-48c5-925c-8ff591e95a49"
>>>
queue-workerコンテナはジョブを順次実行します。
# docker-compose logs queue-worker
queue-worker_1 | [2019-08-26 09:25:48][4952d228-7fe9-49d2-a142-e5066375b148] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-26 09:25:58][4952d228-7fe9-49d2-a142-e5066375b148] Processed: App\Jobs\TestJob
queue-worker_1 | [2019-08-26 09:26:16][8ba5ff4b-909c-48c5-925c-8ff591e95a49] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-26 09:26:26][8ba5ff4b-909c-48c5-925c-8ff591e95a49] Processed: App\Jobs\TestJob
4.7. FIFOキューを使う
ElsaticMQのキューには、標準キューの他にFIFO(First In First Out)キューがあります。
ElasticMQでFIFOキューを使用するには、設定ファイル内のキューの定義にて、fifoオプションをtrueに設定します。
include classpath( "application.conf" )
node-address {
# 中略
}
rest-sqs {
# 中略
}
generate-node-address = false
# ここでElasticMQに作成するキューを定義します。
queues {
# 中略
# http://elasticmq:9324/queue/laravel.fifo
"laravel.fifo" {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
fifo = true # fifoオプションをtrueに設定します。
deadLettersQueue {
name = "laravel.fifo-dead-letters"
maxReceiveCount = 3
}
}
# http://elasticmq:9324/queue/laravel.fifo-dead-letters
"laravel.fifo-dead-letters" { }
}
注: Amazon SQSでFIFOキューを作成する時、キュー名の最後に
.fifo
が必要なのですが、それに倣ってElasticMQで定義する場合、キュー名を"
(ダブルクォーテーション)で囲む必要があります。(ScalaやJavaのTypesafe Configにおける、オブジェクト名の仕様のため)
$ docker-compose restart elasticmq
ただし、このままではキューにジョブを登録しようとした時にBad Requestエラーになってしまうので、
LaravelでAmazon SQSのFIFOキューを使う方法
を参考に、FIFOキュー対応のSQSプロパイダーを作成します。
<?php
namespace App\Services;
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
// MessageGroupIdパラメーターに渡す値(※同一の値であれば、FIFOキューに登録した順序でジョブが実行されます)
protected $message_group_id;
public function __construct($sqs, $default, $prefix = '', $message_group_id = null)
{
parent::__construct($sqs, $default, $prefix);
$this->message_group_id = $message_group_id;
}
public function pushRaw($payload, $queue = null, $options = [])
{
$response = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'MessageGroupId' => $this->message_group_id ?? uniqid(),
'MessageDeduplicationId' => uniqid(),
]);
return $response->get('MessageId');
}
// FIFOキューでは、DelaySecondsパラメーターをサポートしないので、例外をスローします。
public function later($delay, $job, $data = '', $queue = null)
{
throw new \Exception( "Cannot support DelaySeconds for FIFO queues." );
}
}
<?php
namespace App\Services;
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
public function connect($config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsFifoQueue(new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['message_group_id']);
}
}
'connections' => [
// 中略
'elasticmqfifo' => [
'driver' => 'sqsfifo',
'key' => null,
'secret' => null,
'credentials' => false,
'endpoint' => env('ELASTICMQ_ENDPOINT'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE'),
// ここでは、message_group_idのデフォルト値を"${SQS_PREFIX}/${SQS_QUEUE}"にしています。
'message_group_id' => env('SQS_MESSAGE_GROUP_ID', env('SQS_PREFIX')."/".env('SQS_QUEUE')),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
],
],
ELASTICMQ_ENDPOINT=http://elasticmq:9324
SQS_PREFIX=queue
SQS_QUEUE=laravel.fifo
QUEUE_CONNECTION=elasticmqfifo
これで、FIFOキューにジョブを登録することができます。
>>> for( $i = 1; $i <= 5; $i++ ) {
... echo "Job_{$i}: ".Queue::push( new App\Jobs\TestJob( "Job_{$i}" ) ).PHP_EOL;
... }
Job_1: bc8fa4e8-3c02-4916-a4a0-40603537f7b9
Job_2: b8e4c344-4c08-4a99-9d5a-c45bc44e46cb
Job_3: 2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2
Job_4: a3d68f0e-b5fa-4785-9961-0990ba4f3e28
Job_5: 6e2a2681-6aef-48ac-a8ab-05daac167c07
queue-worker_1 | [2019-08-28 04:55:32][bc8fa4e8-3c02-4916-a4a0-40603537f7b9] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:33][bc8fa4e8-3c02-4916-a4a0-40603537f7b9] Processed: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:33][b8e4c344-4c08-4a99-9d5a-c45bc44e46cb] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:34][b8e4c344-4c08-4a99-9d5a-c45bc44e46cb] Processed: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:34][2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:35][2e01225e-8c6b-47ea-bc9e-b6f6b8ba82f2] Processed: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:35][a3d68f0e-b5fa-4785-9961-0990ba4f3e28] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:37][a3d68f0e-b5fa-4785-9961-0990ba4f3e28] Processed: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:37][6e2a2681-6aef-48ac-a8ab-05daac167c07] Processing: App\Jobs\TestJob
queue-worker_1 | [2019-08-28 04:55:38][6e2a2681-6aef-48ac-a8ab-05daac167c07] Processed: App\Jobs\TestJob