2019-12-31 追記: ライブラリ化しました!インストール&サービスプロバイダに1個登録するだけで簡単に使えるようになっています!
概要
MySQL や Aurora などで,マスタスレーブ構成にして
-
SELECT
はスレーブに対して実行 -
INSERT
UPDATE
DELETE
はマスタに対して実行
という設定にすることはよくある。 Laravel はフレームワークレベルでこの設定をサポートしており,自動的に振り分けを行なってくれる。さらに設定で sticky
オプションを有効化している場合, 自分が INSERT
UPDATE
DELETE
を実行した同一HTTPリクエスト上では以後の SELECT
をマスタに対して行ってくれる。
ところがこの「同一HTTPリクエスト上」というのがかなりネックで,次のリクエストが連続して飛んでくる場合,マスタからスレーブへの同期が完了していない場合が多い。そのため,以後5秒間のリクエストにまで sticky
を適用したい。
現時点 (v5.7) の素の Laravel ではここまでの面倒は見てくれないので,自前で機能を実装する。
実装
おおまかな実装方針
- Laravel は
INSERT
UPDATE
DELETE
を実行したときに自動的にConnection::recordsHaveBeenModified()
というメソッドを呼んでいる。これによってプロパティConnection::$recordsModified
が書き換えられる。sticky
設定の場合,Connection::$recordsModified
を見てマスタを見るかどうかの判定が為される。 - このメソッドを継承して直接処理を書きなぐると保守性が下がるので,「イベント」「イベントリスナ」「HTTPミドルウェア」の3つに分解して実装を行う。
- バックグラウンドワーカーでは,デーモンモードで動いている場合プロセスがリフレッシュされないため,何もしないと前のジョブの影響を受けてしまう。そのため,以下のような実装を採る。
- バックグラウンドワーカー起動と同時に
Connection::recordsHaveBeenModified()
が呼ばれるようにする - 処理するジョブに,そのジョブのトリガーとなった処理と連続性を保証する必要がない場合のみ,
Connection::unmodifiedSession()
という一時的に変更が無かったことにするメソッドを呼ばせる
- バックグラウンドワーカー起動と同時に
コード
イベント
<?php
declare(strict_types=1);
namespace App\Events;
/**
* Class RecordsHaveBeenModified
*
* データベースのレコードが更新されたことを示します。
*/
class RecordsHaveBeenModified
{
}
継承コネクション
ここで実装したメソッドは DB
ファサードから使用することができます。
<?php
declare(strict_types=1);
namespace App\Database;
use App\Events\RecordsHaveBeenModified;
use Illuminate\Database\MySqlConnection as BaseMySqlConnection;
use Illuminate\Support\Facades\Event;
/**
* Class MySqlConnection
*/
class MySqlConnection extends BaseMySqlConnection
{
/**
* INSERT / UPDATE / DELETE されたことを示します。
* イベントは発火しません。
*
* @param bool $value
*/
public function recordsHaveBeenModifiedWithoutEvent($value = true): void
{
parent::recordsHaveBeenModified($value);
}
/**
* INSERT / UPDATE / DELETE されたことを示します。
* true にセットされたタイミングで RecordsHaveBeenModified が発火します。
*
* @param bool $value
*/
public function recordsHaveBeenModified($value = true): void
{
if (!$this->recordsModified) {
$this->recordsHaveBeenModifiedWithoutEvent($value);
if ($value) {
Event::dispatch(new RecordsHaveBeenModified());
}
}
}
/**
* 一時的に変更が無かったと見なし,クロージャの中で更に変更が
* 走らない限りは必ずスレーブから読み出せることを保証します。
* """連続性の無い""" バックグラウンドジョブで使用することを想定しています。
*
* @param \Closure $closure
* @param array ...$args
* @return mixed
*/
public function unmodifiedSession(\Closure $closure, ...$args)
{
$currentState = $this->recordsModified;
try {
$this->recordsModified = false;
return $closure(...$args);
} finally {
$this->recordsHaveBeenModifiedWithoutEvent($currentState);
}
}
}
<?php
declare(strict_types=1);
namespace App\Providers;
use App\Database\MySqlConnection;
use Illuminate\Database\Connection;
use Illuminate\Support\ServiceProvider;
class DatabaseServiceProvider extends ServiceProvider
{
public function boot(): void
{
Connection::resolverFor('mysql', function (...$parameters) {
return new MySqlConnection(...$parameters);
});
}
}
HTTPミドルウェア
このクラスは2つの使われ方をします。
-
handle()
メソッドはHTTPリクエストを受けるときに呼ばれます。 -
forceReferringToMasterForFiveSeconds()
メソッドは後述のイベントリスナーによって呼ばれます。
<?php
declare(strict_types=1);
namespace App\Http\Middleware;
use App\Database\MySqlConnection;
use App\User;
use Closure;
use Illuminate\Cache\Repository;
use Illuminate\Contracts\Auth\Guard;
use Illuminate\Database\Connection;
use Illuminate\Http\Request;
/**
* Class ShouldReferToMasterWhenRecordsModified
*/
class ShouldReferToMasterWhenRecordsModified
{
/**
* @var Connection|MySqlConnection
*/
protected $db;
/**
* @var Guard
*/
protected $guard;
/**
* @var Repository
*/
protected $cache;
/**
* ShouldReferToMasterWhenRecordsModified constructor.
*
* @param Connection|MySqlConnection $db
*/
public function __construct(Connection $db, Guard $guard, Repository $cache)
{
$this->db = $db;
$this->guard = $guard;
$this->cache = $cache;
}
/**
* リクエストをハンドルします。
*
* @param Request $request
* @param Closure $next
* @return mixed
*/
public function handle(Request $request, Closure $next)
{
if ($this->guard->hasUser() && $this->hasCacheForUser($this->guard->user())) {
$this->db->recordsHaveBeenModifiedWithoutEvent();
}
return $next($request);
}
/**
* 現在ログイン中のユーザに対し,5秒間,
* 後続のリクエストでデータベースマスタを参照することを強制します。
*/
public function forceReferringToMasterForFiveSeconds(): void
{
if ($this->guard->hasUser()) {
$this->cache->set($this->getCacheKeyForUser($this->guard->user()), true, 5 / 60);
}
}
/**
* マスタ参照フラグのキャッシュを保持しているか調べます。
*
* @param User $user
* @return bool
*/
protected function hasCacheForUser(User $user): bool
{
return (bool)$this->cache->get($this->getCacheKeyForUser($user));
}
/**
* キャッシュのキーを取得します。
*
* @param User $user
* @return string
*/
protected function getCacheKeyForUser(User $user): string
{
return "middleware:ShouldReferToMasterWhenRecordsModified/user_id={$user->id}";
}
}
イベントリスナ
handle()
メソッドは RecordsHaveBeenModified
イベントが発火したときに,このイベントリスナが登録済みであれば呼ばれます。
<?php
declare(strict_types=1);
namespace App\Listeners;
use App\Events\RecordsHaveBeenModified;
use App\Http\Middleware\ShouldReferToMasterWhenRecordsModified;
/**
* Class ForceReferringToMaster
*/
class ForceReferringToMaster
{
/**
* @var ShouldReferToMasterWhenRecordsModified
*/
protected $middleware;
/**
* ForceReferringToMaster constructor.
*
* @param ShouldReferToMasterWhenRecordsModified $middleware
*/
public function __construct(ShouldReferToMasterWhenRecordsModified $middleware)
{
$this->middleware = $middleware;
}
/**
* Execute the listener.
*
* @param mixed $event
*/
public function handle(RecordsHaveBeenModified $event): void
{
$this->middleware->forceReferringToMasterForFiveSeconds();
}
}
起動と同時に登録したい場合は EventServiceProvider
を以下のようにします。
<?php
declare(strict_types=1);
namespace App\Providers;
use App;
use App\Events;
use App\Listeners;
use Illuminate\Foundation\Support\Providers\EventServiceProvider as ServiceProvider;
class EventServiceProvider extends ServiceProvider
{
/**
* The event listener mappings for the application.
*
* @var array
*/
protected $listen = [
Events\RecordsHaveBeenModified::class => Listeners\ForceReferringToMaster::class,
];
}
ジョブおよびコマンド
バックグラウンドワーカーの先頭ジョブとして利用します。
<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Database\MySqlConnection;
use Illuminate\Bus\Queueable;
use Illuminate\Database\Connection;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\DB;
/**
* Class MarkRecordsAsModified
*
* キューに投入されるイベントの多くは SerializesModels を使用しています。
* 多くの場合は作成・更新・削除に直ちに反応する必要があり,レイテンシは一切許容されません。
* そのため,ワーカープロセスではデフォルトでマスターのみを見るようにしなければなりません。
* このジョブはワーカーの稼動開始前に先頭ジョブとして仕込んで使用します。
*
* もしワーカープロセスでスレーブを使った負荷分散を行う場合には,
* 明示的にその場所で DB::unmodifiedSession() を利用します。
*
* @codeCoverageIgnore
*/
class MarkRecordsAsModified implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable;
/**
* MarkRecordsAsModified constructor.
*
* @param string $queue
*/
public function __construct(string $queue)
{
$this->queue = $queue;
}
/**
* Execute the job.
*
* @param Connection|MySqlConnection $db
*/
public function handle(Connection $db): void
{
$db->recordsHaveBeenModifiedWithoutEvent();
}
/**
* Get display name of the job.
*
* @return string
*/
public function displayName(): string
{
return static::class . "<queue={$this->queue}>";
}
}
<?php
declare(strict_types=1);
namespace App\Console\Commands\DB;
use App\Jobs\MarkRecordsAsModified;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Bus;
/**
* Class MarkRecordsAsModifiedCommand
*
* @codeCoverageIgnore
*/
class MarkRecordsAsModifiedCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'db:mark-records-as-modified {queue=default}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'ワーカープロセスで DB::recordsHaveBeenModified() を直ちに実行します。';
/**
* Execute the console command.
*/
public function handle(): void
{
Bus::dispatch(new MarkRecordsAsModified($queue));
}
}
使い方
HTTPミドルウェアとして
ユーザを取得する必要があるので,必ず auth
が前に来るようにする。
<?php
declare(strict_types=1);
namespace App\Http;
use Illuminate\Foundation\Http\Kernel as HttpKernel;
class Kernel extends HttpKernel
{
/**
* The application's route middleware.
*
* These middleware may be assigned to groups or used individually.
*
* @var array
*/
protected $routeMiddleware = [
'auth' => \Illuminate\Auth\Middleware\Authenticate::class,
'master' => \App\Http\Middleware\ShouldReferToMasterWhenRecordsModified::class,
// ...
];
/**
* ミドルェアの優先順位をデフォルトから変更します。
*
* @var array
*/
protected $middlewarePriority = [
\Illuminate\Auth\Middleware\Authenticate::class,
\App\Http\Middleware\ShouldReferToMasterWhenRecordsModified::class,
// ...
];
/**
* The application's route middleware groups.
*
* @var array
*/
protected $middlewareGroups = [
'api' => [
'auth',
'master',
],
//
];
}
ワーカーとして
Supervisor 等から起動時に呼ばれるコマンドを以下のようにする。 MarkAsRecordsModified
ジョブのディスパッチ先のキューと,その後に起動されるワーカーが参照するキューを揃えておく。
sh -c '
run php /var/www/html/artisan db:mark-records-as-modified default;
exec run php /var/www/html/artisan queue:work redis --queue=default --sleep=3 --tries=3
'
基本的にワーカーはマスタを見るが,スレーブを見ても問題ないジョブは以下のようにしておくと,可能な範囲で負荷分散することができる。
<?php
declare(strict_types=1);
namespace App\Jobs;
use App;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\DB;
/**
* Class ExampleJob
*/
class ExampleJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Executed the job.
*/
public function handle(): void
{
DB::unmodifiedSession(function () {
//
});
}
}
まとめ
長くなってしまったが,これを実践すると,不整合が発生しない範囲で最大限にスレーブを負荷分散に活用することができる。