41
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Laravelでキューを使った並列処理

Last updated at Posted at 2020-05-07

はじめに

業務でLaravelのデータベースキューを使用して並列処理を実装することがあったのでノウハウ記録用に記事にします。

検証環境について

この記事の内容を実現するにはLaravelと何かしらのデータベースが必要になります。
今回はDockerで環境構築を行い、以下の内容で解説していきます。
Laravelとデータベースの接続設定は済んでいる前提です。

phpコンテナ

バージョン
Debian 10.0
php 7.3.7
Laravel 5.8.29

mysqlコンテナ

バージョン
Debian 9.5
mysql 8.0.13

キュー関連テーブル作成と.envの修正

まずデータベースキューを使用するための準備を行います。
(この辺りはよく記事を見かけるので、解説は割愛)

  1. phpコンテナにログイン

  2. php artisan queue:tableでキュー処理用テーブルのマイグレーションファイルを作成

  3. php artisan queue:failed-tableでキュー処理失敗用テーブルのマイグレーションファイルを作成

  4. php artisan migrateでマイグレーション実行

  5. Laravelの.envのQUEUE_CONNECTIONを以下のように修正

    QUEUE_CONNECTION=database
    

これだけで準備完了!

キューを複数起動する

次にジョブを処理させるためのキューを起動します。
Laravelでphp artisan queue:work --queue={ 任意のキュー名 }コマンドで起動できますが、本番環境での使用を前提にsupervisorをインストールしてデーモン化したいと思います。

phpコンテナにログインし、apt-get install -y supervisorでsupervisorをインストールします。

インストール後にservice supervisor startで起動し、service supervisor statusで状態確認します。

以下が返ってくればOK。

supervisord is running

次に/etc/supervisor/conf.dにsupervisorの設定ファイルを作成します。
設定ファイルはこんな感じ。

sample.conf
[program:sample_queA]
command=php /var/www/artisan queue:work --tries=1 --queue=sample_queA
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
user=root
redirect_stderr=true
stdout_logfile=/var/log/supervisor/sample_queA.log

[program:sample_queB]
command=php /var/www/artisan queue:work --tries=1 --queue=sample_queB
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
user=root
redirect_stderr=true
stdout_logfile=/var/log/supervisor/sample_queB.log

[program:sample_queC]
command=php /var/www/artisan queue:work --tries=1 --queue=sample_queC
process_name=%(program_name)s_%(process_num)02d
numprocs=1
autostart=true
autorestart=true
user=root
redirect_stderr=true
stdout_logfile=/var/log/supervisor/sample_queC.log

sample_queAsample_queBsample_queCの3つのキューを起動するよう設定します。

各設定値の解説

  • command・・・実行コマンドを指定します。ここにキュー起動コマンドを設定します。/var/www/の部分は自分の環境のLaravelプロジェクトまでのパスを指定する必要があるので注意してください。
  • process_name・・・プロセス名です。
  • numprocs・・・起動プロセス数です。
  • autostart・・・trueでsupervisor起動時に自動的に起動します。
  • autorestart・・・trueでプロセス終了時に自動的に再起動します。
  • user・・・起動ユーザーです。指定されたコマンドもこのユーザーでの実行扱いとなります。
  • redirect_stderr・・・trueで標準エラーを出力します。
  • stdout_logfile・・・ログ出力先。

confファイル修正後は必ずsupervisorctl rereadで設定の再読み込みを行います。
supervisorctl statusで状態を確認します。

こんな感じで先ほど設定した3つのキューの起動が確認できればOK。

sample_queA:sample_queA_00       RUNNING   pid 463, uptime 0:13:02
sample_queB:sample_queB_00       RUNNING   pid 462, uptime 0:13:02
sample_queC:sample_queC_00       RUNNING   pid 460, uptime 0:13:02

ここまで手動でやりましたが、インストールなどをDockerfileで定義、confファイルを予め準備しておいてvolumesでマウントしておくとさらに便利です。

ジョブの作成

引き続きphpコンテナ内で以下のコマンドを実行して3つのジョブクラスを作成します。
php artisan make:job SampleJobA
php artisan make:job SampleJobB
php artisan make:job SampleJobC

ジョブは、以下の内容にしておきます。
1秒おきにログ出力するだけです。

例.SampleJobA.php
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Log;

class SampleJobA implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    private $jobType;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($jobType)
    {
        $this->jobType = $jobType;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        Log::info($this->jobType . "start");

        for ($i = 0; $i < 10; $i++) {
            Log::info("JobA:" . $i);
            sleep(1);
        }

        Log::info($this->jobType . "end");
    }
}

SampleJobB.phpSampleJobC.phpもループの中のログ出力内容だけ変えて同じように修正しておきます。

ジョブクラス修正後は、service supervisor stopservice supervisor startで再起動しないと修正が反映されないので注意。

あとはコントローラーやサービスクラスからジョブ発行(dispatch)するだけです。

use App\Jobs\SampleJobA;
use App\Jobs\SampleJobB;
use App\Jobs\SampleJobC;

(省略)

// ジョブAをキューAに向けて発行
SampleJobA::dispatch('jobA')->onQueue('sample_queA')->delay(now());
// ジョブBをキューBに向けて発行
SampleJobB::dispatch('jobB')->onQueue('sample_queB')->delay(now());
// ジョブCをキューCに向けて発行
SampleJobC::dispatch('jobC')->onQueue('sample_queC')->delay(now());

解説

  • dispatchにはパラメータを設定してジョブクラスに渡すことができます。サンプルでは、$jobTypeにセットされてログ出力で使ってます。
  • onQueueのパラメータで実行するキューを指定できます。
  • delayで遅延ディスパッチ扱いとなり、非同期での処理が実現できます。実行タイミングをパラメータで指定することが可能です。この例だとnow()を指定しているのですぐに実行です。

ジョブの実行

それでは作成したジョブを動かしてみます。
こんな感じでボタンでジョブA〜Cを発行する簡単な画面を作成して順番に押してみます。
ボタンを押すとコントローラーが呼ばれ、先に解説したジョブを発行します。

スクリーンショット 2020-05-07 18.47.28.png

A、B、Cの順番でボタンを押すとLaravelログの出力は以下のようになりました。

[2020-05-07 18:50:57] local.INFO: jobAstart  
[2020-05-07 18:50:57] local.INFO: JobA:0  
[2020-05-07 18:50:58] local.INFO: jobBstart  
[2020-05-07 18:50:58] local.INFO: jobCstart  
[2020-05-07 18:50:58] local.INFO: JobC:0  
[2020-05-07 18:50:58] local.INFO: JobB:0  
[2020-05-07 18:50:58] local.INFO: JobA:1  
[2020-05-07 18:50:59] local.INFO: JobC:1  
[2020-05-07 18:50:59] local.INFO: JobB:1  
[2020-05-07 18:50:59] local.INFO: JobA:2  
[2020-05-07 18:51:00] local.INFO: JobC:2  
[2020-05-07 18:51:00] local.INFO: JobB:2  
[2020-05-07 18:51:00] local.INFO: JobA:3  
[2020-05-07 18:51:01] local.INFO: JobC:3  
[2020-05-07 18:51:01] local.INFO: JobB:3  
[2020-05-07 18:51:01] local.INFO: JobA:4  
[2020-05-07 18:51:02] local.INFO: JobB:4  
[2020-05-07 18:51:02] local.INFO: JobC:4  
[2020-05-07 18:51:02] local.INFO: JobA:5  
[2020-05-07 18:51:03] local.INFO: JobB:5  
[2020-05-07 18:51:03] local.INFO: JobC:5  
[2020-05-07 18:51:03] local.INFO: JobA:6  
[2020-05-07 18:51:04] local.INFO: JobC:6  
[2020-05-07 18:51:04] local.INFO: JobB:6  
[2020-05-07 18:51:04] local.INFO: JobA:7  
[2020-05-07 18:51:05] local.INFO: JobC:7  
[2020-05-07 18:51:05] local.INFO: JobB:7  
[2020-05-07 18:51:05] local.INFO: JobA:8  
[2020-05-07 18:51:06] local.INFO: JobC:8  
[2020-05-07 18:51:06] local.INFO: JobB:8  
[2020-05-07 18:51:06] local.INFO: JobA:9  
[2020-05-07 18:51:07] local.INFO: JobC:9  
[2020-05-07 18:51:07] local.INFO: JobB:9  
[2020-05-07 18:51:07] local.INFO: jobAend  
[2020-05-07 18:51:08] local.INFO: jobCend  
[2020-05-07 18:51:08] local.INFO: jobBend

完全に並列ではありませんが、3つのジョブが同時に動いているのが確認できます。
今回はボタンがトリガーとなってジョブ発行していますが、バッチ等でスケジューリングした時間で発行したり、APIがcallされたタイミングで発行したり色んな方法で使えると思います。

気づいたこと

  • ジョブ処理終了までjobsテーブルに対象のレコードが残り、処理終了と同時にレコードが消える
  • $this->dispatchで自分自身の再dispatchも可能(新しいインスタンスとなるため、パラメータとかは再度渡す必要あり) ← 命名:再帰dispatch

ハマったとこ

最初に遅延ディスパッチが指定した時間に動かなかった。
原因は、config/app.phptimezoneがUTCのままだったため。。
jobsテーブルでセットされている実行予定時間を確認して気がつきました。
以下のように修正すればOK。

app.php
'timezone' => 'Asia/Tokyo',

設計例

自分の開発していた案件では、以下の2つのAPIがあり、

  • API甲・・・POSTで処理要求を行い、払い出されたIDをクエリとして付けてGETで定期的にポーリングしてステータス取得する
  • API乙・・・API甲とは全く違うAPI。でも使い方は、API甲と同じ。

ジョブとキューをこんな感じで設計しました。

ジョブ

名称 内容
ジョブ1 API甲をPOSTでCall
ジョブ2 API甲をGETでCall
ジョブ3 API甲終了後に実行する処理
ジョブ4 API乙をPOSTでCall
ジョブ5 API乙をGETでCall
ジョブ6 API乙終了後に実行する処理

キュー

名称 内容
キューA ジョブ1とジョブ2処理用
キューB ジョブ3処理用
キューC ジョブ4とジョブ5処理用
キューD ジョブ6処理用

処理フロー(エラー処理については省略)
トリガーを受けてジョブ1キューAに向けて発行

ジョブ1からジョブ2キューAに向けて発行

ジョブ2API甲のステータスが"完了"になるまでジョブ2(自分自身)をキューAに向けて一定間隔で発行し続ける

ジョブ2API甲のステータスが"完了"になったらジョブ3キューBに向けて発行

終了

この一連を流れをAPI乙の分も平行して動かしていた感じです。

絵があった方がよかった・・・

まとめ

今回は解説用にジョブの実装はログ出力だけでしたが、本来は時間がかかる処理に対して非同期となるよう設計するのが一般的かと思います。
どう設計するかは必要とされてるシステムの要件によって様々ですが、この記事が何かのご参考になれば幸いです。

41
26
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?