59
47

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Mastering Laravel/Lumen Queue

Last updated at Posted at 2017-12-12

オープンロジアドベントカレンダー13日目です。がんばって書いてみたよ :innocent:
ちなみに12月13日は俺の誕生日だよ:birthday: 祝ってくれてええんやで :tada:

はじめに

LaravelLumen は複雑なシステムにも利用することができるとても優れたフレームワークです。

個人的にLaravel/Lumenにおいて重要な機能のひとつがQueueです。

今回は、Queueにフォーカスして、Laravel/Lumenの内部でどのように処理が行われているのかを追っかけたいと思います。

要点としては、標準のQueueは比較的シンプルで若干物足りないのですが、ちゃんと拡張するためのAPIが用意されているので、それらを利用することで、大抵の目的を達することはできますよ、ということをお伝えしたいと思っています。

対象の環境

今回は以下の環境を元に説明したいと思います。

Laravel/Lumen ともに中の処理の多くは共通なので、基本的には一度理解すれば応用が効きます。
Laravel/Lumen での差異についても文中に可能な限り記載してますので、Laravelしかわからない、Lumenしかわからないという方どちらも参考になるかと思います。

Queueを構成する要素

一口にQueueといっても、単一のクラスや機能のことを指すのではなく、複数の要素から構成された機能群のことを指します。

  • Queue
    • Job
    • Listener
    • Worker
  • Bus
    • Dispatcher

細かくは他にもあるのですが、大きくこれらのことを理解できれば、Queueには勝ったも同然です。

Queueという単語は一般用語なので様々なモノを指しますが、今回の文中のQueueが指すものは、上記のQueueやBusといったLaravelにおけるQueueの機能のことを指してると思ってください。

Queueを使って何ができるのか

まずは、Queueをそれほど利用していない方に向けて、Queueというものがいかに優れているか、というのを説明したいと思います。
若干ポエム味があるので、「Queueは便利だよね。知ってる知ってる。」という方は読み飛ばしてください。

使いみちその1. 処理の非同期化

まず、最も簡単なメリットとして処理を非同期で実行することができるということがあげられます。

たとえば、大量のレコードを更新する処理であったり、大きなCSVデータをパースする処理であったりと、HTTPリクエストを元に非常に時間のかかる処理を行う場合、タイムアウトを伸ばすのにも限界がありますし、ユーザー体験的にも良いものではありません。

そこで、リクエスト自体は受けつけ、処理を非同期で実行し、結果はPush通知なりポーリングなり、何らかの方法で非同期に返す、といった対応をすることで、サーバー側の負荷コントロールや、ユーザー体験の改善など、様々なメリットを享受できます。

Queueを利用することで、処理をJobという単位にまとめ、バックグラウンドでWorkerが処理し、結果を通知する、といったことが簡単に実装できます。

たとえば以下のようなフローです。
大きなCSVを元にデータの登録処理を行い、その結果を非同期で受け取る、というものです。

diagram-3889143951074083208.png

ユーザー体験的に良いのは、タイムアウトを気にせずに処理できるというところですが、サーバー側の負荷軽減にも役立ちますし、負荷に対する対策を行いやすいアーキテクチャです。

サーバー側の負荷に対して非同期化することで得られるメリットとは

  • 処理数の上限がWorkerの数に限られる
    • 100個のリクエストが同時にきたとしても、Queueに積まれるだけで、処理は1つずつ行われる
  • パフォーマンスをあげたい場合
    • 時間あたりの処理数を伸ばしたい場合、Workerを増やせばよい
    • (DBなどの共有リソースを除き)Workerは比較的簡単に増やすことができる

このように、非同期化したい場合にQueueを使う、というのが最初の動機としては多いのではないでしょうか。

使いみちその2. 同期的に処理することができる

先の使いみちとは全く逆のことですが、同期的に処理することができる点も、Queueの魅力です。

大きい処理は非同期化することでユーザー体験を向上させることができますが、全ての処理を非同期化することは現実的ではありません。
たとえば 100ms かかる処理の場合を考えてみましょう。

WEBサーバーで同期的に処理する場合

  • HTTPリクエスト: 30ms
  • WEBサーバーが処理する時間: 100ms
  • HTTPレスポンス: 30ms

合計: 160ms

これを非同期化すると、そこに様々な遅延が発生します

非同期で処理する場合

  • HTTPリクエスト: 30ms
  • WEBサーバーがキューに登録する時間: 30ms
  • HTTPレスポンス: 30ms
  • WorkerがキューからJobを拾うまでの時間: 300ms
  • Workerが処理する時間: 100ms
  • Workerが結果をPush通知システムに送る時間: 50ms
  • Push通知システムがユーザーに通知するまでの時間: 100ms

合計: 640ms

各時間は適当なので、実際にこの程度の差になる場合もあれば、数秒単位で遅いことも多いです。
これは一連の処理の中で発生する通信の数や、各システムの初期化コストなど、分担したことで発生する時間が必ず存在します。

つまり、何がなんでもすべてを非同期化することで、ハッピーになれるかというと、そうではない、という点は認識しておいてください。

そこで、Queueですが、処理をJobという単位にまとめています。
たとえば以下のようなJobを作ります。

app/Jobs/CreateHoge.php
class CreateHoge extends Job
{
    /** @var array */
    private $hogeData;

    public function __construct(array $hogeData)
    {
        $this->hogeData = $hogeData;
    }

    public function handle(HogeRepository $hogeRepository)
    {
        // Hogeモデルのインスタンスを生成
        $hoge = new Hoge();
        $hoge->name = $this->hogeData['name'];
        $hoge->description = $this->hogeData['description'];

        // Hogeモデルを保存する
        $hogeRepository->store($hoge);

        // HogeモデルのIDを返す
        return $hoge->id;
    }
}

この例は$hogeDataを元に Hoge というモデルを作成、保存している処理を行うJobになっています。
このJobを呼び出すには、例えばコントローラなどで、以下のように dispatch します

app/Http/Controllers/HogeController.php
class HogeController extends Controller
{
  public function create(Request $request)
  {
     $this->dispatch(new CreateHoge($request->get('hoge')));
     return [];
  }
}

new CreateHoge() でJobのインスタンスを生成し、dispatch()関数でQueueに載せます。
このように dispatch() することで非同期でのJob実行を行うことができます。

しかし、今回のような非常に軽量な処理の場合、同期的に処理したい場合がほとんどですし、
非同期で実行する場合にはJobの実行結果を取得できません

return $hoge->id; と、新しく作成した Hoge のIDを返しているのですが、上記のようにdispatchした場合にはその結果を取得することができないのです。

  • dispatch() では 同期的にJobを実行することができない
  • 非同期実行の場合には、結果をHTTPレスポンスに含めることができない

というわけで、同期的にJobを実行したいのですが、どのようにすればよいのでしょうか。

たとえば、CreateHoge Jobの処理を別のServiceなどに移管して、Controllerからそれを呼び出せるようにする、というのも確かに手段として全く無いわけではないですが、Jobにおいてはそのような煩雑なことをする必要がありません。

同期的にJobを実行する dispatchNow()

dispatch() というヘルパ関数(Laravel/Lumen)や、そのdispatch()を呼び出すController::dispatch()メソッド (laravel/Lumen) とは別に dispatch_now() あるいは dispatchNow() という関数/メソッドが存在します。

Laravelにおいては dispatch_now() ヘルパ関数が最初から定義されているので知っている方もいるかと思いますが、Lumenにおいてはヘルパ関数が定義されていないので、ご存知無い方も多いのではないでしょうか。

結論から先にいうと、 dispatch() の代わりに dispatch_now() を使うことで非同期に作られたJobを同期的に実行することができます

先の例でいうと、以下のような感じです。

app/Http/Controllers/HogeController.php
class HogeController extends Controller
{
  public function create(Request $request)
  {
     $hogeId = $this->dispatchNow(new CreateHoge($request->get('hoge')));
     return ['id' => $hogeId];
  }
}

dispatchの代わりにdispatchNowを使っています。
同期的に結果も取得でき、そのままHTTPレスポンスに新しく保存したモデルのIDを返すことができました。

この dispatchdispatch_now の実装がどうなっているかは後で詳しくみてみましょう。

さて、このように同期的に実行するか、非同期で実行するか、というのは実行時に選択することができ、処理を Job という単位にまとめられる点が、Queueの最も魅力的な部分です

使いみちその3. リーダブルな大きさに処理を区切る

ファットコントローラやファットモデルなど、サービスの成長とともに肥大化していくことはままありますが、いわゆるControllerに書いていたような処理や、ドメインロジックを、単一のJobというファイル(クラス)に閉じ込めることができます。

この単一のファイルの中に閉じ込める、というのが存外に重要で、特定の処理が肥大化した場合に、そのJob以外に影響を与えません。ControllerやModelの特定のaction/methodが肥大化して、関係ない他の部分が読みづらくなる、といったことが起きません。

これはJobが目指したものとは特に関係のない、副作用的な性質だとは思うのですが、この一点だけとっても処理をJobにまとめていく価値はあると思っています。

では、Job自体が肥大化するのを防ぐにはどうしたらよいのか。

Jobを分割すればOKです。

肥大化したJobというのは往々にして、複数の小さい処理をたくさん呼んでしまっているものです。
その小さい処理を、小さいJobとして切り出しておくことで、再利用性も高まりますし、可読性もあがります。

たとえば次のような肥大化したJobがあったとします。

app/Jobs/FatJob.php
class FatJob extends Job {
  public function handle()
  {
    //略
    return DB::transaction(function(){
      $bar = $this->fatProcessA($foo);
      //略
      return $this->fatProcessB($bar);
    });
  }
  private function fatProcessA($foo)
  {
  }
  private function fatProcessB($bar)
  {
  }
}

このJobの「とても肥大化した処理A」と「とても肥大化した処理B」を切り出してみましょう。

app/Jobs/FatJob.php
class FatJob extends Job {
  public function handle()
  {
    return DB::transaction(function() {
      //略
      $bar = $this->dispatchNow(new SmartJobA($foo));
      //略
      return $this->dispatchNow(new SmartJobB($bar));
    });
  }
}
app/Jobs/SmartJobA.php
class SmartJobA extends Job {
  public function __construct($foo)
  {
    $this->foo = $foo;
  }
  public function handle()
  {
    DB::transaction(function() {
      //とても肥大化した処理A
    });
  }
}
app/Jobs/SmartJobB.php
class SmartJobB extends Job {
  public function __construct($bar)
  {
    $this->bar = $bar;
  }
  public function handle()
  {
    DB::transaction(function() {
      //とても肥大化した処理B
    });
  }
}

肥大化した private メソッドをJobという別のファイルに切り出しただけではあるのですが、これはとてもよいアプローチだと考えています。

  • dispatch(new SmartJobA())のように、切り出したJob単体での実行ができるので、再利用性が非常に高い
    • 切り出したJobをその部分だけ別途、非同期で実行する、といったこともできる
  • ServiceやModelといった部分に共通化する必要が無いので、それらが肥大化することを抑えられる
  • Jobを別に切り出したとしても、トランザクションは同一のものを利用できるので処理に影響が無い

dispatchNow ではなく dispatch を使う場合にはトランザクション(というか処理系)が別になるといったことを考慮する必要が出てきますが、 dispatchNow を使っている間は、細かいこと気にしなくてバッチグーです。

というわけで、多くの処理を Job という単位にまとめることで、使い勝手も良く、可読性も高い状況を維持することができます。

標準のQueueの不満点

長々とQueueのメリットを書きましたが、標準のQueueのままでは辛いことがいくつかあります。

  • Jobの実行結果が残らない
  • 大きい引数をJobに渡すとキューのストレージが肥大化する
  • コンテキスト情報をJobに渡しづらい

他にもあるかもしれませんが、大きなところだとこれらは標準のQueueの機能では解決してくれません。

Jobの実行結果が残らない

dispatchNow() の場合はJobの実行結果を受け取ることができますが、 dispatch の場合に返ってくる値はJobIDと呼ばれる、Jobをキューストレージ(RedisやSQS、DBなど)に保存した際に発行されるIDが返ってくるだけです。

このIDがわかればJobの実行結果を取得できるのか、というとそうではなく、あくまでキューストレージのIDでしかなく、このIDのデータはWorkerが処理するタイミングで削除されるため、あとからそのIDで問い合わせても既にデータが無い状態になります。

diagram-7417746117248357530.png

一連の処理として、Workerが実行結果をPush通知システム経由でクライアントに伝える、といったことが可能ですが、通知のペイロードのサイズには限界があります。
例えばPusherの場合は1KB以上のペイロードを載せることができません。

これではせっかく非同期にしたのに、結果をユーザーに伝えることができないので困ってしまいます。

もちろんこれには解決手段がありますが、標準ではこのような動作になってしまう、という点を覚えておいてください。

大きい引数をJobに渡すとキューのストレージが肥大化する

Jobをdispatchしたときに、RedisやSQSといったキューストレージにJobの情報が残るわけですが、このときに内部では、簡単にいえば serialize($job) といったことが行われています。

dispatch() するControllerと、それを実行するWorkerとの間で処理を実行するには、シリアライズが必要なのは当然です。(コードベースがズレると目も当てられませんが :sob:)

つまり、たとえば dispatch( new SomethingJob( HogeModel::all() ) のような大量のモデル情報をコンストラクタに突っ込んでdispatchするとどうなるかというと、大量のモデルすべてがシリアライズされたものがキューストレージに保存されるわけです。

基本的にはシリアライズすると、シリアライズ前よりデータ量が肥大化しますので、数KB程度ならまだ問題ないかもしれませんが、何も考えずに利用すると数MBのデータを毎回キューストレージに書き込んでる、といったことが起きないとも限りません。

これはパフォーマンス低下の原因にもなりますし、シリアライズ・デシリアライズのコストも馬鹿になりません。

なので、基本的にはプロパティにはできるだけプリミティブな値を保存するといったことは念頭においてください。
intやstringといったプリミティブな値であれば、容量を圧迫することもあまり無いでしょう。

先の例でいえば、 HogeModel::all() ではなく HogeModel::all()->pluck('id') のようにIDを渡して、Jobのhandle()の中で再度HogeModel::whereIn('id', $this->ids)->get() し直すことで、このペイロードの肥大化を防ぐことができます。

ただ、用途によっては、Job実行時ではなく、dispatch時のデータを元に処理したい、といったことであったり、パフォーマンス上の都合で再度fetchし直すのが得策でなかったり、プロパティのサイズを気にするなんて本質的ではない部分を考慮して開発したくなかったり、まぁいろいろあると思います。

その場合は、Jobをシリアライズ/デシリアライズするときに工夫すればいいのですが、なかなか面倒な感じではあるので、これについては自分は諦めています。

Jobは基本的には短時間のうちにワーカーが拾って処理してくれるはずなので、多少肥大化したところでどうということは無いよね、という印象です。

基本的にはやはり、Jobのコンストラクタで設定するプロパティは、できるだけプリミティブな値を保持することを推奨します。

コンテキスト情報をJobに渡しづらい

たとえばControllerからdispatchNowで同期的にJobを実行する場合、そのJobがどのユーザーからのリクエストによる実行なのかを知ることは簡単です。

class HogeJob extends Job
{
  public function handle(Request $request)
  {
    $user = $request->user();
  }
}

こういったJobはHTTPリクエストに依存しているので、アンチパターンなJobになりますが、可能か不可能かでいえば可能です。

しかし非同期に実行した場合は Request が空っぽなので、こういったことはできません。
そのためこういったことを行う場合には、

class HogeJob extends Job
{
  private $userId;
  public function __construct(User $user)
  {
    $this->userId = $user->id;
  }
  public function handle()
  {
    $user = User::find($this->userId);
  }
}

のようにコンストラクタにUserを渡してあげる感じになります。

「Jobにユーザー情報なんか要らんでしょ」と言われればそのとおりなのですが、あとで説明する「Jobの実行ログを残したい」といった用途において、「誰が」「いつ」リクエストしたJobを実行しているのかといった情報がほしいことがあります。

しかしながらdispatchしたWEBサーバーと、Jobを実行するWorkerとではコンテキスト情報の共有までは当然行わないので、実行時にコンテキスト情報を取得することができません。

これに関しても、ContextをWorker実行時に引き回すための拡張方法がありますので、解決することができます。

不満点を解決したい

実際にJobを使って開発を行うと、上記のような不満点にぶつかることと思います。
きっと将来においてはある程度標準で解決してくれることもあるでしょうが、現時点では不満が残ります。

これらを解決するためにも、まずはQueueの処理がどのようになっているのかを理解しましょう。

Queueの処理を理解する

もちろんすべてのコードを見て、理解することができればそれに越したことは無いのですが、実際に押さえておくべきポイントはそれほど多くありません。

  • dispatch と dispatchNow の処理の差異
  • 非同期JobをWorkerが実行するまでの流れ
  • Dispatcher::pipeThrough() による拡張
  • Dispatcher::dispatch() の拡張
  • Jobの正体

ここですべてを詳細に説明するとあまりにも膨大になり、読む方も書く方も辛いので、要点だけ説明します。

1. dispatch() すると何が起きるのか

何度も登場した dispatch() ですが、この実態は

function dispatch($job)
{
    return app(\Illuminate\Contracts\Bus\Dispatcher::class)->dispatch($job);
}

実際のソース: Laravel / Lumen

という、 Dispatcherdispatch() を呼び出しているだけです。

余談
ポイントとしては Dispatcher の実装ではなく Contract に紐付いている点です。

Dispatcherの実装である Illuminate\Bus\Dispatcher が標準で使われるわけですが、あくまでContractに紐付いているので、これを簡単に拡張することができます。

というわけで、このDispatcher::dispatch()の中身を見てみましょう。

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}

protected function commandShouldBeQueued($command)
{
    return $command instanceof ShouldQueue;
}

実際のソース: Dispatcher.php

  • queueResolverが設定してあり、ShouldQueueがimplementsされるJobなら dispatchToQueue
  • そうでない場合は dispatchNow を呼び出す

といった非常にシンプルなメソッドになっています。

説明していませんでしたが、この ShouldQueue があるJobは非同期で実行することができ、これを実装していないと同期的にしか実行することができません。(dispatchToQueueを呼び出すことで可能ではありますが)

現在のバージョンにおいては、たとえばLumenの基底JobクラスではデフォルトでShouldQueuをimplementsしていますし、基底クラスがないLaravelにおいてもphp artisan make:jobコマンドで生成されるJobはデフォルトでShouldQueueをimplementsしています。

queueResolverについても、QUEUE_DRIVERの設定があれば基本的にはデフォルトで設定されているので、実質的にはキューが利用可能な環境において dispatch を呼び出すと、非同期実行になります。

2. dispatchNow で実行されるまで

それでは先に同期的に実行される dispatchNow について読んでみましょう。

public function dispatchNow($command, $handler = null)
{
    if ($handler || $handler = $this->getCommandHandler($command)) {
        $callback = function ($command) use ($handler) {
            return $handler->handle($command);
        };
    } else {
        $callback = function ($command) {
            return $this->container->call([$command, 'handle']);
        };
    }
    return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}

実際のソース: Dispatcher.php

通常の呼び出しでは $handler が定義されることは無いので、

return $this->pipeline->send($command)->through($this->pipes)->then(function ($command) {
    return $this->container->call([$command, 'handle']);
});

というコードが実行されることになります。

デフォルトだと$this->pipesには何も登録されていないので、実質的に $command->handle() が実行されるだけになります。

ただ、これがPipeline経由で実行されている、というのが非常に重要で、これによってスマートにJobの実行前後の処理をカスタマイズすることができます

急に登場したPipelineですが、ルータのMiddlewareの処理などでも使われている、非常に優れたクラスです。

HttpMiddlwareでは以下のようなコードはおなじみだと思います。

class SomethingMiddleware
{
  public function handle($request, Closure $next)
  {
    // 事前処理を定義
    \Log::info('リクエストしてきたユーザーの名前: ' . $request->user()->name);
    $start = microtime(true);

    // 処理を実行
    $response = $next($request);

    // 事後処理を定義
    $end = microtime(true);
    \Log::info('レスポンスにかかった時間: ' . ($end - $start) . '秒');
    return $response;
  }
}

このように事前処理や事後処理をスマートに定義することができます。
もちろん事前処理だけ、事後処理だけ、といったことも定義可能です。

この機能を利用することで、Jobの実行前・実行後の処理を実装することが可能で、これによって先述の不満をいくつか解消することができます。これについては後述します。

ともかく、dispatchNow()が実行されることで、Pipelineを経由してJobが実行され、結果を返すことができているわけです。

3. dispatchToQueueでキューに積まれるまで

それでは非同期の場合の処理である diespatchToQueue を見てみましょう。
このメソッドでは、Jobの実行は行わず、キューストレージにJobを積むとこまでがお仕事になっています。

public function dispatchToQueue($command)
{
    $connection = $command->connection ?? null;
    $queue = call_user_func($this->queueResolver, $connection);
    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }
    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    } else {
        return $this->pushCommandToQueue($queue, $command);
    }
}

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    return $queue->push($command);
}

実際のソースコード: Dispatcher.php

Jobに queue メソッドを実装している場合にはそれば呼び出されますが、基本的には pushCommandToQueue メソッドで処理されます。

pushCommandToQueue の中ではJobにqueueプロパティが設定されていれば、それを利用し、delayプロパティが設定されてれば遅延実行を登録します。
laterOn pushOn later push いずれもキューストレージ(RedisやSQS)に登録するための処理なので、ここでキューストレージに積まれていることがわかります。

return $queue->push($command) の結果として、キューストレージのID(JobID)を受け取ることができているわけです。

4. キューストレージに積まれたJobを実行するには

キューストレージに積まれたJobを実行するには、Workerを起動します。

細かいオプションは公式のドキュメントを見てもらうとして、

php artisan queue:work

といったコマンドを実行します。
このコマンドが何をしているかというと、Workerクラスを実行しています。
呼び出し部分は以下のようになっています。

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
    $connection, $queue, $this->gatherWorkerOptions()
);

実際のソース: WorkCommand.php

ここで呼び出されている runNextJob メソッドによってキューからJobを取り出し、Jobを実行しています。
Workerの中身を見ていきましょう。

public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
    $job = $this->getNextJob($this->manager->connection($connectionName), $queue);
    if ($job) {
        return $this->runJob($job, $connectionName, $options);
    }
    $this->sleep($options->sleep);
}

protected function runJob($job, $connectionName, WorkerOptions $options)
{
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {
      //略
    }
}

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);
        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );
        $job->fire();
        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        //略
    }
}

実際のソース: Worker.php

  • runNextJob() の中で getNextJob() をしてキューからJobを取得
  • runJob() にJobを渡すと中で process() を実行
  • process() の中で $job->fire() を実行

といった流れです。

重要なのは $job->fire() で、これがJobを実行する実際の中身になります。

普段のJobの実装時に handle を実装することはあっても fire を実装することはありませんが、このfireとは何なのか、次にこれを見ていきましょう。

5. Job::fire() でJobのhandleメソッドが呼ばれるまで

さて、 $job->fire() が呼びだれていますが、この $job というのは、いわゆる我々が実装したJobではありません。
では、この $job が何なのかというと、 RedisJobSqsJob といった、各種キューストレージに対応した「キューストレージにJobを保存し、キューストレージから取り出されるもの」のことです。

基本的な実装は基底クラスのJob.php に実装されています。

$job->fire() で実行されるのも、このJob.phpに実装されているfireメソッドです。

public function fire()
{
    $payload = $this->payload();
    list($class, $method) = JobName::parse($payload['job']);
    ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}

public function payload()
{
    return json_decode($this->getRawBody(), true);
}

protected function resolve($class)
{
    return $this->container->make($class);
}

実際のソース: Job.php

キューストレージに保存されていたJSONをパースして、クラス名やメソッドが定義されている 'job' と、シリアライズされた 'data' を取得します。
この payload に何が入っているかは、 Queue クラスを見てみましょう。

protected function createObjectPayload($job)
{
    return [
        'displayName' => $this->getDisplayName($job),
        'job' => 'Illuminate\Queue\CallQueuedHandler@call',
        'maxTries' => $job->tries ?? null,
        'timeout' => $job->timeout ?? null,
        'timeoutAt' => $this->getJobExpiration($job),
        'data' => [
            'commandName' => get_class($job),
            'command' => serialize(clone $job),
        ],
    ];
}

実際のソース: Queue.php

つまり、Job::fire()の中では、

$class = 'Illuminate\Queue\CallQueuedHandler';
$method = 'call';
$payload['data'] = [
    'commandName' => 実装したJobのクラス名,
    'command' => シリアライズされたJob,
];

といった形になり、我々が実装したJobのhandleメソッドではなく、 CallQueuedHandler::call() が呼び出されているのです。

ようやく核心に近づいてきました。後少しでJobが実行されます。

それでは CallQueueHandler::call() の中身を見ていきましょう。

public function call(Job $job, array $data)
{
    try {
        $command = $this->setJobInstanceIfNecessary($job, unserialize($data['command']));
    } catch (ModelNotFoundException $e) {
        return $this->handleModelNotFound($job, $e);
    }

    $this->dispatcher->dispatchNow(
        $command, $this->resolveHandler($job, $command)
    );

    if (! $job->hasFailed() && ! $job->isReleased()) {
        $this->ensureNextJobInChainIsDispatched($command);
    }
    if (! $job->isDeletedOrReleased()) {
        $job->delete();
    }
}

実際のソース: CallQueueHandler.php

なんと、最終的には dispatchNow() が呼び出されていることがわかります。

さぁ、これで非同期Jobが実行されるまでの流れがつかめたので、おさらいです。

  1. dispatch()すると、シリアライズされたJobがキューストレージに積まれる
  2. WorkerがキューストレージからJobの情報を引っ張ってくる
  3. Jobの情報をデシリアライズして、Jobのインスタンスに変換し、dispatchNow()を実行する

これであなたもQueueマスター見習いです

ここまでの処理を理解できていれば、Queueの8割はあなたのものです。

なんとなく使ってる dispatch()dispatchNow() の先に何が行われているのか、もうわかったも同然です。

というわけでこの記事もようやく終盤に差し掛かりました。

最後に、途中で述べた不満点を解消する方法を説明して、Queueの拡張性の高さを感じてください。

  • Jobの実行結果が残らない
  • 大きい引数をJobに渡すとキューのストレージが肥大化する
  • コンテキスト情報をJobに渡しづらい

この3つの不満点をそれぞれ解決していきます。

解決編: Jobの実行結果が残らない

実行結果を残したい、といったときの解決方法は、DispatcherPipelineを使って解決できます。
Dispatcher::pipeThrough()というメソッドがあり、ここにJobの実行前後の処理を定義することができます。

Dispatcherはシングルトンですし、どのタイミングでどこで定義しても大丈夫なのですが、おそらく最も適しているのは BusServiceProviderboot メソッドでしょう。

以下のようなBusServiceProviderを定義してみます。

<?php

namespace App\Providers;

use Illuminate\Bus\BusServiceProvider as Original;
use Illuminate\Contracts\Bus\Dispatcher;

class BusServiceProvider extends Original
{
    public function boot(Dispatcher $dispatcher)
    {
        $dispatcher->pipeThrough(array_map(function (string $middlewareClassName) {
            return app($middlewareClassName);
        }, [
            \App\Jobs\Middleware\LogJobProcess::class,
        ]));
    }
}

Laravelの場合はBusServiceProviderの拡張は簡単ですが、Lumenの場合はApplicationクラス自体を拡張し、registerBusBindingsを書き換える必要があります。これについては末尾に記載します。

App\Jobs\Middleware\LogJobProcess という自作のMiddlewareをDispatcherのpipesの中に登録しているわけです。

app/Jobs/Middleware/LogJobProcess.php
<?php
namespace App\Jobs\Middleware;
class LogJobProcess
{
    public function handle($command, \Closure $next)
    {
        $startTime = microtime(true);
        $result = $next($command);
        $finishTime = microtime(true);

        // 実行結果をログに残す
        \Log::info(implode("\t", [
            get_class($command),
            $finishTime - $startTime,
            is_string($result) ? $result : serialize($result),
        ]));

        return $result;
    }
}

とりあえずの例として、Logを使ってログファイルに結果を書き込むようなMiddlewareを作ってみました。
実際の業務ではもう少し複雑なものを利用しています。

たとえばこのMiddlewareだと、前半で紹介した「Jobの結果を後から取得する」といったことはできません。JobのIDがわからないからです。

ではどのようにするかというと、 Dispatcher::dispatch()Job を拡張して、JobにユニークなIDを持たせてることで解決できます。

JobにユニークなIDをもたせる

ポイントは3つです。

  1. Job自体に dispatchId などの何らかのIDをセット・ゲットするためのインターフェースを用意する
  2. dispatchするタイミングで、dispatchIdを生成、セットする
  3. 非同期の場合に、標準のJobIDではなく、dispatchIDを返す

dispatchID という名前は適当なものなので、 $jikkouId でも $hidoukiId でもなんでも好きな名前にしてください。

app/Jobs/Job.php
<?php
namespace App\Jobs;
abstract class Job implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    private $dispatchId;

    public function getDispatchId()
    {
        return $this->dispatchId;
    }
    public function setDispatchId(string $dispatchId)
    {
        $this->dispatchId = $dispatchId;
    }
}

このような各Jobクラスの基底クラスとして、オレオレJobクラスを実装してあげます。
Lumenの場合はデフォルトで用意されていますが、Laravelはデフォルトでは無いので適当に作ってあげてください。

次に、Dispatcherを拡張します。

以下のように、標準のDispatcherを継承したApp\Dispatcherを作り、これをBusServiceProviderで登録してください。

app/Dispatcher.php
namespace App;
use App\Jobs\Job;
use Illuminate\Bus\Dispatcher as Original;
use Ramsey\Uuid\Uuid;
class Dispatcher extends Original
{
    public function dispatchToQueue($command)
    {
        if (!$command instanceof Job | !$command->getDispatchId()) {
            $command->setDispatchId(Uuid::uuid4()->toString());
        }
        parent::dispatchToQueue($command);
        // デフォルトのUIDの
        return $command instanceof Job ? $command->getDispatchId() : null;
    }
}
BusServiceProvicerの例
<?php

namespace App\Providers;

use App\Dispatcher;
use Illuminate\Bus\BusServiceProvider as Original;
use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract;
use Illuminate\Contracts\Queue\Factory as QueueFactoryContract;
use Illuminate\Contracts\Bus\QueueingDispatcher as QueueingDispatcherContract;

class BusServiceProvider extends Original
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });
        $this->app->alias(Dispatcher::class, DispatcherContract::class);
        $this->app->alias(Dispatcher::class, QueueingDispatcherContract::class);
    }
    public function boot(){ /* 略 */ }
}

実際はもう少し複雑な処理をいれたりするのですが、概要としてはこんなところです。

parent::dispatch() parent::dispatchNow() する前に、 $command に大して dispatchId という何らかのユニークな値を設定してあげることで、Jobがキューに積まれる際に、dispatchIdも一緒にシリアライズされるので、Workerでの実行時にもそのdispatchIdを取得することができるようになります。

どういうことかというと、先程の LogJobProcess Middlewareで、JobのdispatchIDと併せて実行結果を保存することができるようになり、そうすることによって後から結果を取得する、といったことが可能になります。

たとえば、以下のような、Jobの実行結果をキャッシュに保存しておくMiddlewareを追加することができます。

app/Jobs/Middleware/SaveResultCache.php
<?php
namespace App\Jobs\Middleware;
use App\Jobs\Job;
class SaveResultCache
{
    public function handle($command, \Closure $next)
    {
        $result = $next($command);
        
        $dispatchId = $command instanceof Job ? $command->getDispatchId() : null;
        if ($dispatchId) {
            // 1日間結果をキャッシュに保存しておく
            Cache::put('JobResult@' . $dispatchId, $result, 60 * 24);
        }

        return $result;
    }
}

こうすると、Jobの結果を後から取得する、といったことが可能になります。
以下のようなControllerを作ればいいわけです。

public function getJobResult($dispatchId)
{
    return Cache::get('JobResult@' . $dispatchId);
}

このレベルまで使いこなせれば、もうほぼほぼQueueマスターを名乗っていいのでは、と思います。

ただ、上記のまま使うと、DispatchIDを推測されると、Jobの結果を他の人が推測できちゃったりするので、実際はこのあと説明するコンテキストなどと組み合わせて、Jobをdispatchしたユーザー以外にはその結果を返さないようにする、などの工夫が必要です。

解決編: 大きい引数をJobに渡すとキューのストレージが肥大化する

結局自分は面倒で諦めちゃったのですが、ようはserialize/deserializeのときに、工夫してあげればOKです。

とはいえそこまで簡単な話ではなくて、Jobは SerializeModels という各種プロパティをシリアライズするTraitをuseしていて、このtraitが __sleep()__wakeup() でいろいろとゴニョゴニョしています。
最初は Serializable インターフェースをimplementsして serialize() unserialize() を実装すればいいのかなぁと思ってたのですが、Serializableを使うと__sleep __wakeup が呼び出されないので、SerializesModels を活かしつつ serialize() unserialize() するのが難しいなぁという印象でした。

もちろん SerializesModels の処理をコピペったりuseしたりしつつ、ゴニョゴニョすることで、なんとかなりはするのですが、そこまでして解決したい問題でもないな!というのが最終的な結論でした。

どうしても軽量化したい・・・!という人は、 SerializesModels の実装を参考に Serializable を実装して、たとえば gzencode() / gzdecode() といったgzip系の関数など用いたりすると解決できるのかなぁと思ってます。

解決編: コンテキスト情報をJobに渡しづらい

さっきのDispatchIDの拡張と同じように、コンテキスト情報をJobにセットしてあげればOKです。

app/Dispatcher.php
namespace App;
use App\Jobs\Job;
use Illuminate\Bus\Dispatcher as Original;
use Illuminate\Http\Request;
use Ramsey\Uuid\Uuid;
class Dispatcher extends Original
{
    public function dispatchToQueue($command)
    {
        if ($command instanceof Job && !$command->getDispatchId()) {
            $command->setDispatchId(Uuid::uuid4()->toString());
        }
        if ($command instanceof Job && !$command->getUserId() && app(Request::class)->user()) {
            $command->setUserId(app(Request::class)->user()->id);
        }
        parent::dispatchNow($command);
        return $command->getDispatchId();
    }
}
app/Jobs/Job.php
<?php
namespace App\Jobs;
abstract class Job implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    private $dispatchId;
    private $userId;

    public function getDispatchId()
    {
        return $this->dispatchId;
    }
    public function setDispatchId(string $dispatchId)
    {
        $this->dispatchId = $dispatchId;
    }
    public function getUserId()
    {
        return $this->userId;
    }
    public function setUserId(int $userId)
    {
        $this->userId = $userId;
    }
}
app/Jobs/Middleware/LogJobProcess.php
<?php
namespace App\Jobs\Middleware;
use App\Jobs\Job;
class SaveResultCache
{
    public function handle($command, \Closure $next)
    {
        $result = $next($command);
        
        $dispatchId = $command instanceof Job ? $command->getDispatchId() : null;
        $userId = $command instanceof Job ? $command->getUserId() : null;
        if ($dispatchId && $userId) {
            // 1日間結果をキャッシュに保存しておく
            Cache::put('JobResult@' . $userId . '@' . $dispatchId, $result, 60 * 24);
        }

        return $result;
    }
}
public function getJobResult($dispatchId, Request $request)
{
    return Cache::get('JobResult@' . $request->user()->id . '@' . $dispatchId);
}

先程の例を改修すると上記のようになります。

実際にはJobがJobをdispatchするケースなどを考慮すると、もっと複雑なことをやらなきゃならないのですが、概要としてはこんなところです。

まとめ

今回ご紹介した部分で、Queueに関する重要な部分の多くを理解できたかと思います。

もちろん、今回ご紹介できなかった部分も多いですし、たとえば非同期のListenerの処理などはまた少しクセがあったりします。
しかしながら重要な点についてはご紹介できたかなと思いますので、これを基礎知識として、Queueの世界に潜ってもらえれば幸いです。

最後に要点をまとめると、以下になります。

  • Jobは便利
  • Queueは拡張できる

なんとたったの2行でまとまりました。

長々と書いたので最後まで読んでくれた方は、本当にお疲れ様でした。
それでは良いQueueライフを :pray:

おまけ:LumenでのBusServiceProviderの拡張

app/Application.php
<?php
namespace App;
use App\Providers\BusServiceProvider;
use Illuminate\Contracts\Bus\Dispatcher;
use Laravel\Lumen\Application as LumenApplication;

class Application extends LumenApplication
{
    /**
     * @see LumenApplication::registerBusBindings();
     */
    protected function registerBusBindings()
    {
        $this->singleton(Dispatcher::class, function () {
            $this->register(BusServiceProvider::class);

            return $this->make(Dispatcher::class);
        });
    }
}

このような Laravel\Lumen\Application を拡張した App\Application クラスを用意しておき、
bootstrap/app.php の中で Laravel\Lumen\Application を初期化しているところを、

bootstrap/app.php
$app = new App\Application(dirname(__DIR__) . '/');

このように書き換えます。
他の Laravel\Lumen\Application 内でbindされている諸々も、同様の方法で App\Application を拡張する(各Bindingメソッドをオーバーライドする)ことで実現することができます。

59
47
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
59
47

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?