48
35

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Laravel】 Redis Queue Worker のソースコードリーディング

Last updated at Posted at 2019-01-16

全体の流れ

./artisan queue:work redis

というコマンドを実行したとする。(Supervisor などでデーモナイズすることを想定)

WorkCommand の実行

WorkCommand::handle()WorkCommand::runWorker() という流れでワーカーのループ準備に入る。 --once オプションを渡していないので,Worker::daemon() メソッドが呼ばれることになる。(ここから無限ループ処理に入る)

以下に示す daemon() メソッドがループの中核となる。ここからこの処理を1つずつ追っていく。

/**
 * Listen to the given queue in a loop.
 *
 * @param  string  $connectionName
 * @param  string  $queue
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 */
public function daemon($connectionName, $queue, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        $this->listenForSignals();
    }

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        // Before reserving any jobs, we will make sure this queue is not paused and
        // if it is we will just pause this worker for a given amount of time and
        // make sure we do not need to kill this worker process off completely.
        if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
            $this->pauseWorker($options, $lastRestart);

            continue;
        }

        // First, we will attempt to get the next job off of the queue. We will also
        // register the timeout handler and reset the alarm for this job so it is
        // not stuck in a frozen state forever. Then, we can fire off this job.
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue
        );

        if ($this->supportsAsyncSignals()) {
            $this->registerTimeoutHandler($job, $options);
        }

        // If the daemon should run (not in maintenance mode, etc.), then we can run
        // fire off this job for processing. Otherwise, we will need to sleep the
        // worker so no more jobs are processed until they should be processed.
        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {
            $this->sleep($options->sleep);
        }

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        $this->stopIfNecessary($options, $lastRestart, $job);
    }
}

簡単のためにやや不正確になるが,大雑把に全体の流れをまとめると

  1. SIGTERM シグナルハンドラを登録する。
  2. 最も古いジョブをキューから取得する。そして,ジョブの $timeout プロパティまたはワーカーの --timeout 設定値に合わせて SIGALRM シグナルハンドラを登録する。(2回目以降は上書き)
  3. ジョブを取得できたかどうかで分岐。
  • ジョブが取得できた場合, 4 に進む。
  • ジョブが取得できなかった場合, ワーカーの --sleep 設定値ぶんだけスリープし, 5 に進む。
  1. ジョブを実行し,成功したかどうかで分岐。
  • 成功したら 5 に進む。
  • 最大試行回数内で例外が発生もしくはタイムアウトしたら,ジョブをもう一度キューに投入する。例外があればスローする。そして 5 に進む。
  • 最大試行回数を超えて例外が発生したら,その例外をスローし, 5 に進む。
  • 最大試行回数を超えてタイムアウトしたら, MaxAttemptsExceededException をスローし, 5 に進む。
  1. ワーカーの終了判定を行う。SIGTERMを受け取るなどして終了すべきと判定された場合はここで終了し,それ以外は 2 に戻って次のループに入る。

ループの下準備

共通シグナルハンドラの登録

if ($this->supportsAsyncSignals()) {
    $this->listenForSignals();
}
/**
 * Enable async signals for the process.
 *
 * @return void
 */
protected function listenForSignals()
{
    pcntl_async_signals(true);

    pcntl_signal(SIGTERM, function () {
        $this->shouldQuit = true;
    });

    pcntl_signal(SIGUSR2, function () {
        $this->paused = true;
    });

    pcntl_signal(SIGCONT, function () {
        $this->paused = false;
    });
}
  • SIGTERMkill コマンドなどのデフォルトの終了処理であるが,この登録を行っておくことによって, 強制終了シグナルの SIGKILL が飛んでこない限りは Graceful にシャットダウンすることができるようになる。設定された $this->shouldQuit は再起動判定時に利用される。
  • SIGUSR2 はユーザ定義シグナルである。なにやらポーズ処理が入っているように見えるが,これは Laravel Horizon 用のシグナルなので,ワーカーを単独で使用する分には無関係だ。
  • SIGCONT は本来は SIGSTOP のあとにプロセスを再開するためのシグナルだが, ここでは SIGUSR2 に対応する Laravel Horizon 用のシグナル として使われている。

前回の再起動時刻の取得

$lastRestart = $this->getTimestampOfLastQueueRestart();
/**
 * Get the last queue restart timestamp, or null.
 *
 * @return int|null
 */
protected function getTimestampOfLastQueueRestart()
{
    if ($this->cache) {
        return $this->cache->get('illuminate:queue:restart');
    }
}

何やらキャッシュから値を取得しているが,これは RestartCommand によってセットされる値である。要するに,プロセス間通信の手段としてキャッシュストレージを利用しているわけだ。この $lastRestart も再起動判定時に利用される。

<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Support\InteractsWithTime;

class RestartCommand extends Command
{
    use InteractsWithTime;

    /**
     * The console command name.
     *
     * @var string
     */
    protected $name = 'queue:restart';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Restart queue worker daemons after their current job';

    /**
     * Execute the console command.
     *
     * @return void
     */
    public function handle()
    {
        $this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());

        $this->info('Broadcasting queue restart signal.');
    }
}

ループの開始

メンテナンスモード判定

// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

    continue;
}
/**
 * Determine if the daemon should process on this iteration.
 *
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  string  $connectionName
 * @param  string  $queue
 * @return bool
 */
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
    return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
        $this->paused ||
        $this->events->until(new Events\Looping($connectionName, $queue)) === false);
}

以下の場合に pauseWorker() が実行されることがわかる。大きく注意すべきはメンテナンスモードではオプションを渡さない限りワーカーが動かないことであろうか。

  • ./artisan down によってメンテナンスモードになっていて,かつ起動時に --force オプションが渡されていない場合
  • Laravel Horizon 制御下において SIGUSR2 を受け取った場合
  • Queue::looping() で登録されたリスナーのどれかが false をリターンして以後のリスナー実行を中断した場合
/**
 * Pause the worker for the current loop.
 *
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  int  $lastRestart
 * @return void
 */
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);
}

pauseWorker() の実行は,スリープ処理が入る点を除けば stopIfNecessary() の実装そのままである。このメソッドはループの最後でも使用されており,後ほど確認する。

ジョブの取得

// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

もし ---queue=foo,bar のように優先度をつけて対象キューを複数設定していた場合,ここで優先度順にポップ処理が走る。

/**
 * Get the next job from the queue connection.
 *
 * @param  \Illuminate\Contracts\Queue\Queue  $connection
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
 */
protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
    }
}

$connection->pop($queue) は, QueueManager::__call() を通じて RedisQueue::pop() に委譲される。

/**
 * Pop the next job off of the queue.
 *
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
 */
public function pop($queue = null)
{
    $this->migrate($prefixed = $this->getQueue($queue));

    if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
        return;
    }

    [$job, $reserved] = $nextJob;

    if ($reserved) {
        return new RedisJob(
            $this->container, $this, $job,
            $reserved, $this->connectionName, $queue ?: $this->default
        );
    }
}

発火時刻が来た遅延ジョブ・リトライジョブをセットからキューにまとめて移動

/**
 * Migrate any delayed or expired jobs onto the primary queue.
 *
 * @param  string  $queue
 * @return void
 */
protected function migrate($queue)
{
    $this->migrateExpiredJobs($queue.':delayed', $queue);

    if (! is_null($this->retryAfter)) {
        $this->migrateExpiredJobs($queue.':reserved', $queue);
    }
}

この呼出はLuaスクリプトになっており,PHPの模擬変数を使うと以下のように表現できる。

-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', "{$queue}", '-inf', "{$currentTimestamp}")

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', "{$queue}:delayed", 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', "{$queue}", unpack(val, i, math.min(i+99, #val)))
    end
end

return val

queues:キュー名:delayed は発火時刻をスコア,値をジョブペイロードとする ZSET(ソート済みセット型)であり,以下のような処理が行われている。

  1. ZRANGEBYSCORE で現在時刻までにスケジュールされたジョブを全件取得し,変数に入れたあとは ZRANGEBYRANK でクリアする
  2. 変数に入れたジョブを100件ずつ RPUSH でキューに移動する

queues:キュー名:reserved に関しても同様で,こちらは遅延ジョブではなくリトライジョブが対象になる。発火時刻を算出するパラメータとして queue.connections.redis.retry_after で設定した秒数が用いられる。

キューからジョブを1件取得し,例外発生時またはタイムアウト時の再試行を予約

/**
 * Retrieve the next job from the queue.
 *
 * @param  string  $queue
 * @return array
 */
protected function retrieveNextJob($queue)
{
    if (! is_null($this->blockFor)) {
        return $this->blockingPop($queue);
    }

    return $this->getConnection()->eval(
        LuaScripts::pop(), 2, $queue, $queue.':reserved',
        $this->availableAt($this->retryAfter)
    );
}

queue.connections.redis.block_for の設定があるかどうかによって処理が分岐される。

block_forを設定している場合
/**
 * Retrieve the next job by blocking-pop.
 *
 * @param  string  $queue
 * @return array
 */
protected function blockingPop($queue)
{
    $rawBody = $this->getConnection()->blpop($queue, $this->blockFor);

    if (! empty($rawBody)) {
        $payload = json_decode($rawBody[1], true);

        $payload['attempts']++;

        $reserved = json_encode($payload);

        $this->getConnection()->zadd($queue.':reserved', [
            $reserved => $this->availableAt($this->retryAfter),
        ]);

        return [$rawBody[1], $reserved];
    }

    return [null, null];
}
block_forを設定していない場合(Luaスクリプト)
-- Pop the first job off of the queue...
local job = redis.call('lpop', "{$queue}")
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', "{$queue}:reserved", "{$this->availableAt($this->retryAfter)}", reserved)
end

return {job, reserved}

どちらもやっていることは同じだが,「ジョブを取得できるまでブロックしてポーリング処理を最適化」ということが実現できない後者のみ,負荷軽減のためにLuaスクリプトによる高速化を行っているのか…?(不明)正直どちらもLuaスクリプト統一でいいような気はする。

さて,ここでまた queues:キュー名:reserved セットが登場している。

  1. ペイロードに含まれる試行回数 "attempts" をインクリメント
  2. リトライ時刻を算出し,それをスコアとしてペイロードを queues:キュー名:reserved セットに追加

リトライ発火時刻は以下のように算出される。要するに, queue.connections.redis.retry_after の値に現在時刻を足しているだけだ。

/**
 * Get the "available at" UNIX timestamp.
 *
 * @param  \DateTimeInterface|\DateInterval|int  $delay
 * @return int
 */
protected function availableAt($delay = 0)
{
    $delay = $this->parseDateInterval($delay);

    return $delay instanceof DateTimeInterface
                        ? $delay->getTimestamp()
                        : Carbon::now()->addSeconds($delay)->getTimestamp();
}
/**
 * If the given value is an interval, convert it to a DateTime instance.
 *
 * @param  \DateTimeInterface|\DateInterval|int  $delay
 * @return \DateTimeInterface|int
 */
protected function parseDateInterval($delay)
{
    if ($delay instanceof DateInterval) {
        $delay = Carbon::now()->add($delay);
    }

    return $delay;
}

これによって,ジョブ実行中にHTTPリクエストがハングしたりバグで無限ループに陥ったりしても,前述の仕組みと組み合わせて,プロセスを強制終了後に勝手に再試行してくれるようになるのだ。

ジョブ取得時の例外対応

先ほどは省略していた部分を見てみよう。

/**
 * Get the next job from the queue connection.
 *
 * @param  \Illuminate\Contracts\Queue\Queue  $connection
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
 */
protected function getNextJob($connection, $queue)
{
    try {
        /* ... */
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);

        $this->sleep(1);
    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));

        $this->stopWorkerIfLostConnection($e);

        $this->sleep(1);
    }
}
/**
 * Stop the worker if we have lost connection to a database.
 *
 * @param  \Throwable  $e
 * @return void
 */
protected function stopWorkerIfLostConnection($e)
{
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;
    }
}

以下のような処理が行われている。

  1. エラーレポートを行う
  2. データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約
    (Redis ドライバを使っている今回は無関係)
  3. 1秒間スリープ

SIGALRM を利用してタイムアウトハンドラを登録

if ($this->supportsAsyncSignals()) {
    $this->registerTimeoutHandler($job, $options);
}

最初に設定した SIGTERM に加えて,サイクル1回ごとに SIGALRM ハンドラも繰り返し上書き登録されていく。ジョブの $timeout 設定またはワーカーの --timeout オプションからタイムアウト秒数を拾い,ハンドラとして SIGKILL による強制終了処理を登録している。

/**
 * Register the worker timeout handler.
 *
 * @param  \Illuminate\Contracts\Queue\Job|null  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 */
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    // We will register a signal handler for the alarm signal so that we can kill this
    // process if it is running too long because it has frozen. This uses the async
    // signals supported in recent versions of PHP to accomplish it conveniently.
    pcntl_signal(SIGALRM, function () {
        $this->kill(1);
    });

    pcntl_alarm(
        max($this->timeoutForJob($job, $options), 0)
    );
}
/**
 * Kill the process.
 *
 * @param  int  $status
 * @return void
 */
public function kill($status = 0)
{
    $this->events->dispatch(new Events\WorkerStopping($status));

    if (extension_loaded('posix')) {
        posix_kill(getmypid(), SIGKILL);
    }

    exit($status);
}
/**
 * Get the appropriate timeout for the given job.
 *
 * @param  \Illuminate\Contracts\Queue\Job|null  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return int
 */
protected function timeoutForJob($job, WorkerOptions $options)
{
    return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}

ジョブの実行 (またはスリープ)

さて,これだけ下準備してようやく実行に入れる。

// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
    $this->runJob($job, $connectionName, $options);
} else {
    $this->sleep($options->sleep);
}
/**
 * Process the given job.
 *
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 */
protected function runJob($job, $connectionName, WorkerOptions $options)
{
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
    }
}
/**
 * Process the given job from the queue.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 *
 * @throws \Throwable
 */
public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        // First we will raise the before job event and determine if the job has already ran
        // over its maximum attempt limits, which could primarily happen when this job is
        // continually timing out and not actually throwing any exceptions from itself.
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        // Here we will fire off the job and let it process. We will catch any exceptions so
        // they can be reported to the developers logs, etc. Once the job is finished the
        // proper events will be fired to let any listeners know this job has finished.
        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
    }
}

実際にジョブを実行しているのは

$job->fire();

の部分であるが,これは キュー・イベント・ブロードキャストに関する補足資料 で既に解説済みであるため大部分は割愛する。

ジョブが成功したとき

CallQueuedHandler の実装を見ると,ジョブを実行し終わったときに $job->delete() が実行されているのがわかる。

/**
 * Handle the queued job.
 *
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  array  $data
 * @return void
 */
public function call(Job $job, array $data)
{
    try {
        $command = $this->setJobInstanceIfNecessary(
            $job, unserialize($data['command'])
        );
    } catch (ModelNotFoundException $e) {
        return $this->handleModelNotFound($job, $e);
    }

    $this->dispatcher->dispatchNow(
        $command, $this->resolveHandler($job, $command)
    );

    if (! $job->hasFailed() && ! $job->isReleased()) {
        $this->ensureNextJobInChainIsDispatched($command);
    }

    if (! $job->isDeletedOrReleased()) {
        $job->delete();
    }
}

基底抽象クラス Job を見ると

/**
 * Delete the job from the queue.
 *
 * @return void
 */
public function delete()
{
    $this->deleted = true;
}

という実装になっているが, これは RedisJob によって以下のように継承されている。

/**
 * Delete the job from the queue.
 *
 * @return void
 */
public function delete()
{
    parent::delete();

    $this->redis->deleteReserved($this->queue, $this);
}
/**
 * Delete a reserved job from the queue.
 *
 * @param  string  $queue
 * @param  \Illuminate\Queue\Jobs\RedisJob  $job
 * @return void
 */
public function deleteReserved($queue, $job)
{
    $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}

これにより,成功したときだけ,あらかじめ予約しておいた例外発生時・タイムアウト時の再試行を無効化している。

ジョブ実行中に例外が発生したとき

先ほどは省略していた部分を見てみよう。

/**
 * Process the given job.
 *
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 */
protected function runJob($job, $connectionName, WorkerOptions $options)
{
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));

        $this->stopWorkerIfLostConnection($e);
    }
}
/**
 * Process the given job from the queue.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 *
 * @throws \Throwable
 */
public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        /* ... */
        $job->fire();
        /* ... */
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    } catch (Throwable $e) {
        $this->handleJobException(
            $connectionName, $job, $options, new FatalThrowableError($e)
        );
    }
}

大きく見ると,以下の流れになる。

  1. handleJobException() メソッドで例外をハンドルする
    (このメソッドの中では必ず再スローされる)
  2. データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約
    (今回はジョブ実行に関するものなので,キュードライバにデータベースを使用していなくても関係がある)

では handleJobException() メソッドを見ていこう。ここから先は複雑なので,気合を入れて読んでいきたい。

/**
 * Handle an exception that occurred while the job was running.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  \Exception  $e
 * @return void
 *
 * @throws \Exception
 */
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
    try {
        // First, we will go ahead and mark the job as failed if it will exceed the maximum
        // attempts it is allowed to run the next time we process it. If so we will just
        // go ahead and mark it as failed now so we do not have to release this again.
        if (! $job->hasFailed()) {
            $this->markJobAsFailedIfWillExceedMaxAttempts(
                $connectionName, $job, (int) $options->maxTries, $e
            );
        }

        $this->raiseExceptionOccurredJobEvent(
            $connectionName, $job, $e
        );
    } finally {
        // If we catch an exception, we will attempt to release the job back onto the queue
        // so it is not lost entirely. This'll let the job be retried at a later time by
        // another listener (or this same one). We will re-throw this exception after.
        if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
            $job->release($options->delay);
        }
    }

    throw $e;
}
/**
 * Mark the given job as failed if it has exceeded the maximum allowed attempts.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  int  $maxTries
 * @param  \Exception  $e
 * @return void
 */
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
        $this->failJob($connectionName, $job, $e);
    }

    if ($maxTries > 0 && $job->attempts() >= $maxTries) {
        $this->failJob($connectionName, $job, $e);
    }
}
/**
 * Mark the given job as failed and raise the relevant event.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Exception  $e
 * @return void
 */
protected function failJob($connectionName, $job, $e)
{
    return FailingJob::handle($connectionName, $job, $e);
}

FailingJob::handle() メソッドは以下のように実装されている。

/**
 * Delete the job, call the "failed" method, and raise the failed job event.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\Jobs\Job  $job
 * @param  \Exception $e
 * @return void
 */
public static function handle($connectionName, $job, $e = null)
{
    $job->markAsFailed();

    if ($job->isDeleted()) {
        return;
    }

    try {
        // If the job has failed, we will delete it, call the "failed" method and then call
        // an event indicating the job has failed so it can be logged if needed. This is
        // to allow every developer to better keep monitor of their failed queue jobs.
        $job->delete();

        $job->failed($e);
    } finally {
        static::events()->dispatch(new JobFailed(
            $connectionName, $job, $e ?: new ManuallyFailedException
        ));
    }
}

まとめると,以下のような流れになる。

  1. 以下に該当する場合にジョブに失敗フラグを立てて,キューからジョブを削除する。
  • 今回の試行が最大試行回数目となるとき
  • アプリケーションジョブの $timeoutAt プロパティまたは retryUntil() メソッドで返された**時刻がきてしまい,これ以上の再試行がアプリケーション的に「無意味」「有効期限が過ぎているので必ず失敗」などと判断されるとき
    時間を指定する $timeout あるいは --timeout とは別物)**
  1. Queue::exceptionOccurred() で登録されたリスナーを実行する。ここでジョブの削除,強制的に失敗フラグを立てる,といったことも行うことができる。

  2. ジョブがまだ失敗しておらず,キューからも削除されておらず, queues:キュー名:reserved セットに残っている場合に,queue.connections.redis.retry_after 秒数ぶんだけ遅延させるために queues:キュー名:delayed セットに移動する。

$job->release($options->delay) が初見だと何をやっているのか分かりづらいが,Luaスクリプトの中身を見れば一目瞭然である。Laravel はこの処理を「リリース」と呼んでいるらしい。

-- Remove the job from the current queue...
redis.call('zrem', "{$queue}:reserved", "{$job->getReservedJob()}")

-- Add the job onto the "delayed" queue...
redis.call('zadd', "{$queue}:delayed", "{$job->availableAt($options->delay)}", "{$job->getReservedJob()}")

return true

ジョブがタイムアウトしたとき

SIGALRM ハンドラにより

$this->kill(1);

が実行され,ワーカーはそのまま強制終了される。ポイントは**再起動時にタイムアウトジョブがどうなるか?**という点である。注目すべきは process() メソッドにあるこの記述だ。

/**
 * Process the given job from the queue.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 *
 * @throws \Throwable
 */
public function process($connectionName, $job, WorkerOptions $options)
{
    try {

        /* ... */

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        $job->fire();

        /* ... */

    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
    }
}
/**
 * Mark the given job as failed if it has exceeded the maximum allowed attempts.
 *
 * This will likely be because the job previously exceeded a timeout.
 *
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  int  $maxTries
 * @return void
 */
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
    $timeoutAt = $job->timeoutAt();

    if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
        return;
    }

    if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
        return;
    }

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        $job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.'
    ));

    throw $e;
}

さっきと似たようなメソッドがあるが,微妙に名前が異なる。

  • markJobAsFailedIfWillExceedMaxAttempts()

    今回の試行が最大試行回数目となるとき,ジョブに失敗フラグを立てる。例外スローは呼び出し元に任せる。
  • markJobAsFailedIfAlreadyExceedsMaxAttempts()
    前回の試行で最大試行回数目であったとき,ジョブに失敗フラグを立てて, MaxAttemptsExceededException をスローする。

タイムアウト時は即座にプロセスをキルして,再開時に後始末を任せることにしているため,このような設計になっている。

ワーカーの終了判定

最後に,このままループを継続するか,プロセスを終了するかの判定が行われる。

// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
/**
 * Stop the process if necessary.
 *
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  int  $lastRestart
 * @param  mixed  $job
 */
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
    if ($this->shouldQuit) {
        $this->stop();
    } elseif ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    } elseif ($options->stopWhenEmpty && is_null($job)) {
        $this->stop();
    }
}

終了されるのは以下のパターン。

  • SIGTERM を受け取っていたとき
  • データベースコネクションが切断されたとき
    • プロセスを起動したまま長時間データベース通信を何も行わないと,TCPコネクションが切断されてしまう。
    • とはいえ,Laravel はデータベースサービス側で切断を検知して再接続処理はやってくれていたはず。なぜこっちで再起動する必要が…?(謎
  • メモリ使用量がワーカーの --memory 設定値を超えたとき
    • php.ini の memory_limit よりも余裕を持たせておくと,メモリ不足で身動きできなくなる前に余裕を持って再起動できる。メモリリーク対策として非常に有用。
    • プロセス終了ステータスに 12 を使っている理由は不明。 Laravel Horizon 関係か?
  • ./artisan illuminate:queue:restart が実行されていたとき
    • 最初に取得しておいた $lastRestart と違う値が取得され,終了すべきと判定される。
  • --stop-when-empty が有効でかつ,すべてのジョブ消化が終わったとき
    • 特殊な環境でのバッチ処理向けか?動かし続けるWebサービス向けではない気がする。
/**
 * Determine if the queue worker should restart.
 *
 * @param  int|null  $lastRestart
 * @return bool
 */
protected function queueShouldRestart($lastRestart)
{
    return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
 * Determine if the memory limit has been exceeded.
 *
 * @param  int   $memoryLimit
 * @return bool
 */
public function memoryExceeded($memoryLimit)
{
    return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}
/**
 * Stop listening and bail out of the script.
 *
 * @param  int  $status
 * @return void
 */
public function stop($status = 0)
{
    $this->events->dispatch(new Events\WorkerStopping($status));

    exit($status);
}

お疲れ様でした。フローチャートでも書こうかなと思ったけど,細かいこと描こうとすると肥大化しそうなので今回は見送りかな…

48
35
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
48
35

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?