全体の流れ
./artisan queue:work redis
というコマンドを実行したとする。(Supervisor などでデーモナイズすることを想定)
WorkCommand
の実行
WorkCommand::handle()
→ WorkCommand::runWorker()
という流れでワーカーのループ準備に入る。 --once
オプションを渡していないので,Worker::daemon()
メソッドが呼ばれることになる。(ここから無限ループ処理に入る)
以下に示す daemon()
メソッドがループの中核となる。ここからこの処理を1つずつ追っていく。
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
}
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
}
}
簡単のためにやや不正確になるが,大雑把に全体の流れをまとめると
-
SIGTERM
シグナルハンドラを登録する。 - 最も古いジョブをキューから取得する。そして,ジョブの
$timeout
プロパティまたはワーカーの--timeout
設定値に合わせてSIGALRM
シグナルハンドラを登録する。(2回目以降は上書き) - ジョブを取得できたかどうかで分岐。
- ジョブが取得できた場合, 4 に進む。
- ジョブが取得できなかった場合, ワーカーの
--sleep
設定値ぶんだけスリープし, 5 に進む。
- ジョブを実行し,成功したかどうかで分岐。
- 成功したら 5 に進む。
- 最大試行回数内で例外が発生もしくはタイムアウトしたら,ジョブをもう一度キューに投入する。例外があればスローする。そして 5 に進む。
- 最大試行回数を超えて例外が発生したら,その例外をスローし, 5 に進む。
- 最大試行回数を超えてタイムアウトしたら,
MaxAttemptsExceededException
をスローし, 5 に進む。
- ワーカーの終了判定を行う。
SIGTERM
を受け取るなどして終了すべきと判定された場合はここで終了し,それ以外は 2 に戻って次のループに入る。
ループの下準備
共通シグナルハンドラの登録
if ($this->supportsAsyncSignals()) {
$this->listenForSignals();
}
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
-
SIGTERM
はkill
コマンドなどのデフォルトの終了処理であるが,この登録を行っておくことによって, 強制終了シグナルのSIGKILL
が飛んでこない限りは Graceful にシャットダウンすることができるようになる。設定された$this->shouldQuit
は再起動判定時に利用される。 -
SIGUSR2
はユーザ定義シグナルである。なにやらポーズ処理が入っているように見えるが,これは Laravel Horizon 用のシグナルなので,ワーカーを単独で使用する分には無関係だ。 -
SIGCONT
は本来はSIGSTOP
のあとにプロセスを再開するためのシグナルだが, ここではSIGUSR2
に対応する Laravel Horizon 用のシグナル として使われている。
前回の再起動時刻の取得
$lastRestart = $this->getTimestampOfLastQueueRestart();
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
何やらキャッシュから値を取得しているが,これは RestartCommand
によってセットされる値である。要するに,プロセス間通信の手段としてキャッシュストレージを利用しているわけだ。この $lastRestart
も再起動判定時に利用される。
<?php
namespace Illuminate\Queue\Console;
use Illuminate\Console\Command;
use Illuminate\Support\InteractsWithTime;
class RestartCommand extends Command
{
use InteractsWithTime;
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:restart';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Restart queue worker daemons after their current job';
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
$this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());
$this->info('Broadcasting queue restart signal.');
}
}
ループの開始
メンテナンスモード判定
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
/**
* Determine if the daemon should process on this iteration.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param string $connectionName
* @param string $queue
* @return bool
*/
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->events->until(new Events\Looping($connectionName, $queue)) === false);
}
以下の場合に pauseWorker()
が実行されることがわかる。大きく注意すべきはメンテナンスモードではオプションを渡さない限りワーカーが動かないことであろうか。
./artisan down
によってメンテナンスモードになっていて,かつ起動時に--force
オプションが渡されていない場合- Laravel Horizon 制御下において
SIGUSR2
を受け取った場合 -
Queue::looping()
で登録されたリスナーのどれかがfalse
をリターンして以後のリスナー実行を中断した場合
/**
* Pause the worker for the current loop.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @return void
*/
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
}
pauseWorker()
の実行は,スリープ処理が入る点を除けば stopIfNecessary()
の実装そのままである。このメソッドはループの最後でも使用されており,後ほど確認する。
ジョブの取得
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
もし ---queue=foo,bar
のように優先度をつけて対象キューを複数設定していた場合,ここで優先度順にポップ処理が走る。
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
/* ... */
} catch (Throwable $e) {
/* ... */
}
}
$connection->pop($queue)
は, QueueManager::__call()
を通じて RedisQueue::pop()
に委譲される。
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
発火時刻が来た遅延ジョブ・リトライジョブをセットからキューにまとめて移動
/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
この呼出はLuaスクリプトになっており,PHPの模擬変数を使うと以下のように表現できる。
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', "{$queue}", '-inf', "{$currentTimestamp}")
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', "{$queue}:delayed", 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', "{$queue}", unpack(val, i, math.min(i+99, #val)))
end
end
return val
queues:キュー名:delayed
は発火時刻をスコア,値をジョブペイロードとする ZSET
(ソート済みセット型)であり,以下のような処理が行われている。
-
ZRANGEBYSCORE
で現在時刻までにスケジュールされたジョブを全件取得し,変数に入れたあとはZRANGEBYRANK
でクリアする - 変数に入れたジョブを100件ずつ
RPUSH
でキューに移動する
queues:キュー名:reserved
に関しても同様で,こちらは遅延ジョブではなくリトライジョブが対象になる。発火時刻を算出するパラメータとして queue.connections.redis.retry_after
で設定した秒数が用いられる。
キューからジョブを1件取得し,例外発生時またはタイムアウト時の再試行を予約
/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @return array
*/
protected function retrieveNextJob($queue)
{
if (! is_null($this->blockFor)) {
return $this->blockingPop($queue);
}
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
$this->availableAt($this->retryAfter)
);
}
queue.connections.redis.block_for
の設定があるかどうかによって処理が分岐される。
block_for
を設定している場合/**
* Retrieve the next job by blocking-pop.
*
* @param string $queue
* @return array
*/
protected function blockingPop($queue)
{
$rawBody = $this->getConnection()->blpop($queue, $this->blockFor);
if (! empty($rawBody)) {
$payload = json_decode($rawBody[1], true);
$payload['attempts']++;
$reserved = json_encode($payload);
$this->getConnection()->zadd($queue.':reserved', [
$reserved => $this->availableAt($this->retryAfter),
]);
return [$rawBody[1], $reserved];
}
return [null, null];
}
block_for
を設定していない場合(Luaスクリプト)-- Pop the first job off of the queue...
local job = redis.call('lpop', "{$queue}")
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', "{$queue}:reserved", "{$this->availableAt($this->retryAfter)}", reserved)
end
return {job, reserved}
どちらもやっていることは同じだが,「ジョブを取得できるまでブロックしてポーリング処理を最適化」ということが実現できない後者のみ,負荷軽減のためにLuaスクリプトによる高速化を行っているのか…?(不明)正直どちらもLuaスクリプト統一でいいような気はする。
さて,ここでまた queues:キュー名:reserved
セットが登場している。
- ペイロードに含まれる試行回数
"attempts"
をインクリメント - リトライ時刻を算出し,それをスコアとしてペイロードを
queues:キュー名:reserved
セットに追加
リトライ発火時刻は以下のように算出される。要するに, queue.connections.redis.retry_after
の値に現在時刻を足しているだけだ。
/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @return int
*/
protected function availableAt($delay = 0)
{
$delay = $this->parseDateInterval($delay);
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addSeconds($delay)->getTimestamp();
}
/**
* If the given value is an interval, convert it to a DateTime instance.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @return \DateTimeInterface|int
*/
protected function parseDateInterval($delay)
{
if ($delay instanceof DateInterval) {
$delay = Carbon::now()->add($delay);
}
return $delay;
}
これによって,ジョブ実行中にHTTPリクエストがハングしたりバグで無限ループに陥ったりしても,前述の仕組みと組み合わせて,プロセスを強制終了後に勝手に再試行してくれるようになるのだ。
ジョブ取得時の例外対応
先ほどは省略していた部分を見てみよう。
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
try {
/* ... */
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
}
}
/**
* Stop the worker if we have lost connection to a database.
*
* @param \Throwable $e
* @return void
*/
protected function stopWorkerIfLostConnection($e)
{
if ($this->causedByLostConnection($e)) {
$this->shouldQuit = true;
}
}
以下のような処理が行われている。
- エラーレポートを行う
- データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約
(Redis ドライバを使っている今回は無関係) - 1秒間スリープ
SIGALRM
を利用してタイムアウトハンドラを登録
if ($this->supportsAsyncSignals()) {
$this->registerTimeoutHandler($job, $options);
}
最初に設定した SIGTERM
に加えて,サイクル1回ごとに SIGALRM
ハンドラも繰り返し上書き登録されていく。ジョブの $timeout
設定またはワーカーの --timeout
オプションからタイムアウト秒数を拾い,ハンドラとして SIGKILL
による強制終了処理を登録している。
/**
* Register the worker timeout handler.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping($status));
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}
ジョブの実行 (またはスリープ)
さて,これだけ下準備してようやく実行に入れる。
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
/* ... */
} catch (Throwable $e) {
/* ... */
}
}
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
// First we will raise the before job event and determine if the job has already ran
// over its maximum attempt limits, which could primarily happen when this job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
/* ... */
} catch (Throwable $e) {
/* ... */
}
}
実際にジョブを実行しているのは
$job->fire();
の部分であるが,これは キュー・イベント・ブロードキャストに関する補足資料 で既に解説済みであるため大部分は割愛する。
ジョブが成功したとき
CallQueuedHandler
の実装を見ると,ジョブを実行し終わったときに $job->delete()
が実行されているのがわかる。
/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
$this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
);
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
}
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
基底抽象クラス Job
を見ると
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
$this->deleted = true;
}
という実装になっているが, これは RedisJob
によって以下のように継承されている。
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}
これにより,成功したときだけ,あらかじめ予約しておいた例外発生時・タイムアウト時の再試行を無効化している。
ジョブ実行中に例外が発生したとき
先ほどは省略していた部分を見てみよう。
/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
}
}
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
/* ... */
$job->fire();
/* ... */
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
大きく見ると,以下の流れになる。
-
handleJobException()
メソッドで例外をハンドルする
(このメソッドの中では必ず再スローされる) - データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約
(今回はジョブ実行に関するものなので,キュードライバにデータベースを使用していなくても関係がある)
では handleJobException()
メソッドを見ていこう。ここから先は複雑なので,気合を入れて読んでいきたい。
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @param \Exception $e
* @return void
*
* @throws \Exception
*/
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
try {
// First, we will go ahead and mark the job as failed if it will exceed the maximum
// attempts it is allowed to run the next time we process it. If so we will just
// go ahead and mark it as failed now so we do not have to release this again.
if (! $job->hasFailed()) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
}
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
} finally {
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
$job->release($options->delay);
}
}
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param \Exception $e
* @return void
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
$this->failJob($connectionName, $job, $e);
}
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connectionName, $job, $e);
}
}
/**
* Mark the given job as failed and raise the relevant event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}
FailingJob::handle()
メソッドは以下のように実装されている。
/**
* Delete the job, call the "failed" method, and raise the failed job event.
*
* @param string $connectionName
* @param \Illuminate\Queue\Jobs\Job $job
* @param \Exception $e
* @return void
*/
public static function handle($connectionName, $job, $e = null)
{
$job->markAsFailed();
if ($job->isDeleted()) {
return;
}
try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
$job->failed($e);
} finally {
static::events()->dispatch(new JobFailed(
$connectionName, $job, $e ?: new ManuallyFailedException
));
}
}
まとめると,以下のような流れになる。
- 以下に該当する場合にジョブに失敗フラグを立てて,キューからジョブを削除する。
- 今回の試行が最大試行回数目となるとき
-
アプリケーションジョブの
$timeoutAt
プロパティまたはretryUntil()
メソッドで返された**時刻がきてしまい,これ以上の再試行がアプリケーション的に「無意味」「有効期限が過ぎているので必ず失敗」などと判断されるとき
(時間を指定する$timeout
あるいは--timeout
とは別物)**
-
Queue::exceptionOccurred()
で登録されたリスナーを実行する。ここでジョブの削除,強制的に失敗フラグを立てる,といったことも行うことができる。 -
ジョブがまだ失敗しておらず,キューからも削除されておらず,
queues:キュー名:reserved
セットに残っている場合に,queue.connections.redis.retry_after
秒数ぶんだけ遅延させるためにqueues:キュー名:delayed
セットに移動する。
$job->release($options->delay)
が初見だと何をやっているのか分かりづらいが,Luaスクリプトの中身を見れば一目瞭然である。Laravel はこの処理を「リリース」と呼んでいるらしい。
-- Remove the job from the current queue...
redis.call('zrem', "{$queue}:reserved", "{$job->getReservedJob()}")
-- Add the job onto the "delayed" queue...
redis.call('zadd', "{$queue}:delayed", "{$job->availableAt($options->delay)}", "{$job->getReservedJob()}")
return true
ジョブがタイムアウトしたとき
SIGALRM
ハンドラにより
$this->kill(1);
が実行され,ワーカーはそのまま強制終了される。ポイントは**再起動時にタイムアウトジョブがどうなるか?**という点である。注目すべきは process()
メソッドにあるこの記述だ。
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
/* ... */
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
$job->fire();
/* ... */
} catch (Exception $e) {
/* ... */
} catch (Throwable $e) {
/* ... */
}
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* This will likely be because the job previously exceeded a timeout.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @return void
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeoutAt();
if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
return;
}
if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
return;
}
$this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
$job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.'
));
throw $e;
}
さっきと似たようなメソッドがあるが,微妙に名前が異なる。
-
markJobAsFailedIfWillExceedMaxAttempts()
→
今回の試行が最大試行回数目となるとき,ジョブに失敗フラグを立てる。例外スローは呼び出し元に任せる。 -
markJobAsFailedIfAlreadyExceedsMaxAttempts()
→ 前回の試行で最大試行回数目であったとき,ジョブに失敗フラグを立てて,MaxAttemptsExceededException
をスローする。
タイムアウト時は即座にプロセスをキルして,再開時に後始末を任せることにしているため,このような設計になっている。
ワーカーの終了判定
最後に,このままループを継続するか,プロセスを終了するかの判定が行われる。
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
/**
* Stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @param mixed $job
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
if ($this->shouldQuit) {
$this->stop();
} elseif ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
} elseif ($options->stopWhenEmpty && is_null($job)) {
$this->stop();
}
}
終了されるのは以下のパターン。
SIGTERM
を受け取っていたとき-
データベースコネクションが切断されたとき
- プロセスを起動したまま長時間データベース通信を何も行わないと,TCPコネクションが切断されてしまう。
- とはいえ,Laravel はデータベースサービス側で切断を検知して再接続処理はやってくれていたはず。なぜこっちで再起動する必要が…?(謎
-
メモリ使用量がワーカーの
--memory
設定値を超えたとき- php.ini の memory_limit よりも余裕を持たせておくと,メモリ不足で身動きできなくなる前に余裕を持って再起動できる。メモリリーク対策として非常に有用。
- プロセス終了ステータスに 12 を使っている理由は不明。 Laravel Horizon 関係か?
-
./artisan illuminate:queue:restart
が実行されていたとき- 最初に取得しておいた
$lastRestart
と違う値が取得され,終了すべきと判定される。
- 最初に取得しておいた
-
--stop-when-empty
が有効でかつ,すべてのジョブ消化が終わったとき- 特殊な環境でのバッチ処理向けか?動かし続けるWebサービス向けではない気がする。
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
*/
public function stop($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping($status));
exit($status);
}
お疲れ様でした。フローチャートでも書こうかなと思ったけど,細かいこと描こうとすると肥大化しそうなので今回は見送りかな…