本記事は、サムザップ Advent Calendar 2019 #1 の12/14の記事です
#モチベーション
EloquentORMでの水平分割を実装する機会があり、結構難航したので記事にまとめてみました。
Eloquentの水平分割はほんと情報が少ないので参考までに。
#前提
方針としては以下を掲げて作業を開始しました。
- なるべくlumenまたはEloquentの機能に寄せる
- なるべく現状の使い勝手(メソッドの呼び出し方等)のまま使えるようにする
- バルクインサート系については別途実装しているので一旦考えない
- userテーブルは分割せず、それ以外のものをuser_idをキーにして分割する
#実装
簡潔に
まずは簡潔に実装した内容をまとめました。
ざっくりとした実装方針のみ知りたい方向け。
- insert、update、delete時
- それぞれ、xxxingというイベントを発行するので、そこにフックしてuser_idからconnectionを判別、nullならエラーを吐く。
- 参考 : Eloquentモデルのイベントをフックする4つの方法
- select時
- Builderでは、getModelsメソッドのオーバーライドをし、条件節にuser_idが存在すればそれに応じたconnectionを、存在しなければ全てのconnectionに対して処理を実行し、結合した結果を返す。
- 参考 : illuminate/database/Eloquent/Builder.php#L514:L525
前提知識
まず、Eloquentは大まかに下記の3つから構成されています
Builderが2種類あってなんともわかりづらいですね。
それぞれの役割はこんな感じです。
- Eloquent\Model
- EloquentをORMたらしめているもの。
- DBの1レコードをオブジェクトとして表現する。
- Eloquent\Builder
- ModelとQuery\Builderを繋ぐ役割。
- Query\Builderに機能を追加して拡張したり、ModelやCollectionにマッピングしたりしてくれている。
- Query\Builder
- その名の通り、QueryをBuildしてくれている。
- 実際にQueryの形にマッピングしたり、クエリを発行したりしている。
- 今回管理しなければいけないConnectionの根本を持ってる。
わかりづらいので以下では
- Eloquent\BuilderをBuilder
- Query\BuilderをQuery
と呼ぶこととします。
検討したもの
今回実装するにあたって検討した方針は下記になります。
- select系
- Modelのグローバルスコープを使用する方法
- BuilderのgetModelsメソッドをオーバーライドする方法 ← 採用
- insert、update、delete系
- BuilderのsetModelメソッドをオーバーライドする方法
- Modelのeventを使用した方法 ← 採用
最終的な実装
シャーディング方法
今回はuser_idをキーにしてそれをシャード数で剰余算することで分割を行いました。
また、シャーディンググループという概念を追加することで、テーブルごとにシャーディング数を変更できるようにしました。
設定は下記のようなイメージ。
$db_shard_list = [
// これがシャーディンググループ
'default' => [
'mysql_1',
'mysql_2',
'mysql_3',
]
];
//~~~~~~~~~~~~~~~~~ 省略 ~~~~~~~~~~~~~~~~~~~~~
$connections = [
'mysql' => $mysql_setting,
];
foreach (collect($db_shard_list)->flatten()->unique()->toArray() as $dbname) {
$connections[$dbname] = $db_trivials + $db_setting;
$connections[$dbname]['host'] = $dbname;
$connections[$dbname]['database'] = $dbname;
}
return [
'default' => 'mysql',
'migrations' => 'migrations',
'connections' => $connections,
'db_shard_list' => $db_shard_list,
];
Connectionの判別、Migration
Connectionの判別は、Managerを作り、そこで一括して行なっています。
<?php
namespace App\Services;
class ShardingManager
{
// $shard_key、$shard_groupからconnection_nameの判別
public static function referConnectionName(int $shard_key, string $shard_group): string
{
$db_list = self::getShardList($shard_group);
$index = $shard_key % count($db_list);
return $db_list[$index];
}
// $shard_groupから、シャードグループの取得
public static function getShardList(?string $shard_group = null): array
{
$db_list = config('database.db_shard_list');
if (!key_exists('default', $db_list) || empty($db_list['default'])) {
$db_list['default'] = [config('database.default')];
}
if (is_null($shard_group)) {
return collect($db_list)->flatten()->unique()->toArray();
}
if (!key_exists($shard_group, $db_list)) {
throw new \Exception("shard group not found");
}
$db_list = $db_list[$shard_group];
if (empty($db_list)) {
throw new \Exception("shard list is empty");
}
return $db_list;
}
// 複数の$shard_keyから[ 'connection_name' => [ ...ids ]]の形にして返す
public static function referConnectionNameList(array $shard_keys, string $shard_group): array
{
$connections = [];
if (empty($shard_keys)) {
//シャーディングキーが指定されていなければ全てのコネクションを検索する
$connections = self::getShardList($shard_group);
} else {
//条件節に含まれるシャーディングキーから、検索が必要なコネクションを割り出す
$connections = collect($shard_keys)->reduce(function (array $carry, int $id) use ($shard_group) {
$connection = self::referConnectionName($id, $shard_group);
if (!in_array($connection, $carry)) {
$carry[] = $connection;
}
return $carry;
}, []);
}
return $connections;
}
// $datasetに含まれている$shard_keyから、[ 'connection_name' => $dataset ]の形にして返す
public static function parseBulkInsertData(array $dataset, string $shard_key, string $shard_group): array
{
$parsed = [];
foreach ($dataset as $row) {
if (!key_exists($shard_key, $row)) {
throw new \Exception($shard_key . " is not set in data to bulk insert");
}
$name = self::referConnectionName($row[$shard_key], $shard_group);
$parsed[$name][] = $row;
}
return $parsed;
}
}
Migrationに関しては、クラス1つ作ってラップしているだけです。
<?php
namespace App\Console\Commands;
use App\Services\ShardingManager;
use Illuminate\Database\Migrations\Migration;
abstract class ShardMigration extends Migration
{
public function up()
{
$db_shard_list = ShardingManager::getShardList();
foreach ($db_shard_list as $db) {
$this->connection = $db;
$this->upForSharding();
}
}
public function down()
{
$db_shard_list = ShardingManager::getShardList();
foreach ($db_shard_list as $db) {
$this->connection = $db;
$this->downForSharding();
}
}
abstract protected function upForSharding();
abstract protected function downForSharding();
}
select系
まず、Model側でnewEloquentBuilderを上書きします。
ついでに、is_shardingとshard_groupのプロパティ、getterを追加しておきます。
ここで一旦自分のプロパティに入れる形にしている理由は後述します。
<?php
namespace App\Models;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
class BaseModel extends Model
{
use SoftDeletes;
protected $is_sharding = true;
protected $shard_key = "user_id";
protected $shard_group = "default";
/** @var ExtendedBuilder $builder */
public $builder;
/**
* @return bool
*/
public function isSharding(): bool
{
return $this->is_sharding;
}
/**
* @return string
*/
public function getShardKey(): string
{
return $this->shard_key;
}
/**
* @return string
*/
public function getShardGroup(): string
{
return $this->shard_group;
}
/**
* @param \Illuminate\Database\Query\Builder $query
* @return ExtendedBuilder|\Illuminate\Database\Eloquent\Builder|Model
*/
public function newEloquentBuilder($query)
{
$this->builder = new ExtendedBuilder($query);
return $this->builder;
}
}
ここは主にWhere系のメソッドのオーバーライドとgetModelsのオーバーライド、集計関数系のオーバーライドを行いました。
where系のメソッドでは、shard_keyを抜いて、その後getModelsの際にそのselectに必要なconnecitonを割り出しています。
集計関数はもう地道に書くしかなかったですね、、、、。
あと、findに関しては一意に特定できなくなるため禁止しています。
<?php
namespace App\Models;
use App\Services\ShardingManager;
use Closure;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Database\Eloquent\Model;
class ExtendedBuilder extends Builder
{
protected $shard_keys = [];
/*************************:
* クエリ実行時の分散、集約
*************************/
/**
* @param int $perPage
* @param array $columns
* @param string $pageName
* @param null $page
* @return \Illuminate\Contracts\Pagination\LengthAwarePaginator|LengthAwarePaginator
*/
public function paginate($perPage = null, $columns = ['*'], $pageName = 'page', $page = null)
{
if (!$this->model->isSharding()) {
return parent::paginate(...func_get_args());
}
$page = $page ?: Paginator::resolveCurrentPage($pageName);
$shard_num = count(ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup()));
$perPage = $perPage ?: $this->model->getPerPage();
$perRow = round($perPage / $shard_num);
$results = ($total = $this->count())
? $this->forPage($page, $perRow)->get($columns)->sortBy('created_at')
: $this->model->newCollection();
return $this->paginator($results, $total, $perPage, $page, [
'path' => Paginator::resolveCurrentPath(),
'pageName' => $pageName,
]);
}
/**
* Get the hydrated models without eager loading.
*
* @param array $columns
* @return \Illuminate\Database\Eloquent\Model[]|static[]
*/
public function getModels($columns = ['*'])
{
if (!$this->model->isSharding()) {
return $this->model->hydrate(
$this->query->get($columns)->all()
)->all();
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$result = array_merge($result, $this->query->get($columns)->all());
}
return $this->model->hydrate(
$result
)->all();
}
/**************************:
* 集計関数実行時の分散、集約
**************************/
/**
* @param string $column
* @return int
*/
public function sum($column): int
{
if (!$this->model->isSharding()) {
return parent::sum($column);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$result[] = parent::sum($column);
}
return array_sum($result);
}
/**
* @param string $columns
* @return int
*/
public function count($columns = '*'): int
{
if (!$this->model->isSharding()) {
return parent::count($columns);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$result[] = parent::count($columns);
}
return array_sum($result);
}
/**
* @param string $column
* @return int
*/
public function avg($column): int
{
if (!$this->model->isSharding()) {
return parent::avg($column);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$count = [];
$sum = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$count[] = parent::count($column);
$sum[] = parent::sum($column);
}
return array_sum($sum) / array_sum($count);
}
/**
* @param string $column
* @return int|mixed
*/
public function average($column)
{
return $this->avg($column);
}
/**
* @param string $column
* @return mixed
*/
public function min($column)
{
if (!$this->model->isSharding()) {
return parent::min($column);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$result[] = parent::min($column);
}
return min($result);
}
/**
* @param string $column
* @return mixed
*/
public function max($column)
{
if (!$this->model->isSharding()) {
return parent::max($column);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->query->connection = $connection;
$result[] = parent::max($column);
}
return max($result);
}
/*************************:
* 検索の際のshard_key取得
*************************/
/**
* @param array|Closure|string $column
* @param null $operator
* @param null $value
* @param string $boolean
* @return Builder
* @throws \Exception
*/
public function where($column, $operator = null, $value = null, $boolean = 'and')
{
if (!$this->model->isSharding()) {
return parent::where(...func_get_args());
}
if ($column instanceof Closure || $value instanceof Closure) {
throw new \Exception("can not use closure in argument");
}
if (is_array($column)) {
foreach ($column as $row) {
if ($row[0] == $this->model->getShardKey() && $row[1] == '=') {
$this->shard_keys[] = $row[2];
}
}
} else {
if ($column == $this->model->getShardKey()) {
if (is_null($value)) {
$this->shard_keys[] = $operator;
} else if ($operator == '=') {
$this->shard_keys[] = $value;
}
}
}
return parent::where(...func_get_args());
}
/**
* @param string $column
* @param mixed $values
* @param string $boolean
* @param bool $not
* @return Builder
*/
public function whereIn($column, $values, $boolean = 'and', $not = false)
{
if (!$this->model->isSharding()) {
return parent::whereIn($column, $values, $boolean, $not);
}
if ($column == $this->model->getShardKey()) {
$this->shard_keys = array_merge($this->shard_keys, $values);
}
return parent::whereIn($column, $values, $boolean, $not);
}
/***********************:
* 禁止するメソッド
***********************/
/**
* @param mixed $id
* @param array $columns
* @return Builder|Builder[]|\Illuminate\Database\Eloquent\Collection|Model|null
* @throws \Exception
*/
public function find($id, $columns = ['*'])
{
if ($this->model->isSharding()) {
throw new \Exception("can not call find to sharding table");
}
return parent::find($id, $columns);
}
}
insert、update、delete系
ModelのEventに対する紐付けを行い、その内部でconnectionの書き換えを行なっています。
その際に、modelがBuilderを所持していないため、書き換えが行えませんでした。
newEloquentBuilderメソッドのオーバーライドを行なった際に自身のプロパティに所持する形にしたのはそのためです。
また、同じようにBuilder->queryのアクセスができなかったため、protected → publicに変更しています。
これがイベントの登録部分。
<?php
namespace App\Models;
use App\Events\ModelEvent;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
class BaseModel extends Model
{
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
protected $dispatchesEvents = [
'saving' => ModelEvent::class,
'creating' => ModelEvent::class,
'updating' => ModelEvent::class,
'deleting' => ModelEvent::class,
];
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
実際のイベントでの処理。
ほんとはリスナー使ってやった方が綺麗だけど、これだけのためにServiceProviderで登録させるのもったいなかったのでイベント単体で使用しました。
<?php
namespace App\Events;
use App\Models\BaseModel;
use App\Services\ShardingManager;
use Illuminate\Queue\SerializesModels;
class ModelEvent extends Event
{
use SerializesModels;
public function __construct(BaseModel $model)
{
if (!$model->isSharding()) {
$model->beginTransactionIfNotBegin();
return true;
}
if (is_null($model->{$model->getShardKey()})) {
throw new \Exception("can not execute query without " . $model->getShardKey());
}
$name = ShardingManager::referConnectionName($model->{$model->getShardKey()}, $model->getShardGroup());
$connection = $model->setConnection($name)->getConnection();
$model->beginTransactionIfNotBegin();
// deleteだと、最後の最後にQuery\Builderが生成される為、回避
if (is_null($model->builder)) {
return true;
}
$model->builder->query->connection = $connection;
}
}
この形にした際に、ModelのSaveメソッドだけは、対応できなかった( 参照 : Eloquent/Model.php)ため、saveメソッドをオーバーライドしました。
<?php
namespace App\Models;
use App\Events\ModelEvent;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
class BaseModel extends Model
{
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Save the model to the database.
*
* @param array $options
* @return bool
*/
public function save(array $options = [])
{
$this->newModelQuery();
if ($this->fireModelEvent('saving') === false) {
return false;
}
if ($this->exists) {
$saved = $this->isDirty() ?
$this->performUpdate($this->builder) : true;
} else {
$saved = $this->performInsert($this->builder);
if (!$this->getConnectionName() &&
$connection = $this->builder->getConnection()) {
$this->setConnection($connection->getName());
}
}
if ($saved) {
$this->finishSave($options);
}
return $saved;
}
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
また、update、deleteはwhereでチェインされて呼ばれる可能性があるため、Builder側でオーバーライドします。
namespace App\Models;
use App\Services\ShardingManager;
use Closure;
use \Illuminate\Database\Eloquent\Builder;
use Illuminate\Database\Eloquent\Model;
class ExtendedBuilder extends Builder
{
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
/**
* @return float|int
*/
public function delete()
{
if (!$this->model->isSharding()) {
return parent::delete();
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->model->beginTransactionIfNotBegin();
$this->query->connection = $connection;
$result[] = parent::delete();
}
return array_sum($result);
}
/**
* @return int
*/
public function forceDelete(): int
{
if (!$this->model->isSharding()) {
return parent::forceDelete();
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->model->beginTransactionIfNotBegin();
$this->query->connection = $connection;
$result[] = parent::forceDelete();
}
return array_sum($result);
}
/**
* @param array $values
* @return float|int
*/
public function update(array $values)
{
if (!$this->model->isSharding()) {
return parent::update($values);
}
$connections = ShardingManager::referConnectionNameList($this->shard_keys, $this->model->getShardGroup());
$result = [];
// 分割したコネクションに対してそれぞれクエリを実行、結果をマージ
foreach ($connections as $name) {
$connection = $this->model->setConnection($name)->getConnection();
$this->model->beginTransactionIfNotBegin();
$this->query->connection = $connection;
$result[] = parent::update($values);
}
return array_sum($result);
}
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
}
上記でupdate(の大元にいるsave)メソッド、deleteメソッドをオーバーライドした影響で、Modelから直接update、deleteを呼ばれた際に分割しているとwhere句でのshard_keyの指定ができず、全てのconnectionにupdate、deleteが流れてしまうため、Model側でsaveメソッドにフラグを折るロジックの追加と、deleteメソッドのオーバーライドをします。
<?php
namespace App\Models;
use App\Events\ModelEvent;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\SoftDeletes;
class BaseModel extends Model
{
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Save the model to the database.
*
* @param array $options
* @return bool
*/
public function save(array $options = [])
{
$this->newModelQuery();
if ($this->fireModelEvent('saving') === false) {
return false;
}
if ($this->exists) {
$old_is_sharding = $this->is_sharding; //追記部分
$this->is_sharding = false; //追記部分
$saved = $this->isDirty() ?
$this->performUpdate($this->builder) : true;
$this->is_sharding = $old_is_sharding; //追記部分
} else {
$saved = $this->performInsert($this->builder);
if (!$this->getConnectionName() &&
$connection = $this->builder->getConnection()) {
$this->setConnection($connection->getName());
}
}
if ($saved) {
$this->finishSave($options);
}
return $saved;
}
/**
* Delete the model from the database.
*
* @return bool|null
*
* @throws \Exception
*/
public function delete()
{
if (is_null($this->getKeyName())) {
throw new Exception('No primary key defined on model.');
}
if (!$this->exists) {
return;
}
if ($this->fireModelEvent('deleting') === false) {
return false;
}
$this->touchOwners();
$old_is_sharding = $this->is_sharding;
$this->is_sharding = false;
$this->performDeleteOnModel();
$this->is_sharding = $old_is_sharding;
$this->fireModelEvent('deleted', false);
return true;
}
//~~~~~~~~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~~~~~~~~~
以上で水平分割に使用したコードは全部です。
感想
Eloquent自体はかなり便利で拡張しやすいですが、いざ複雑なことをやろうとすると結構難しいな、という印象です。
何より水平分割をしようとした際のドキュメントの少なさには困らされました。
ただ、拡張しやすい機能はたくさんあるので、複雑ですが、理解して使いこなせるとスッキリ書けるようになるなと思います。
この記事が、同じこと試みている人の一助になれば幸いです。
おまけ
artisanコマンドを使用したDBのcreate
<?php
namespace App\Console\Commands;
class CreateDatabase extends BaseCommand
{
protected $signature = 'db:create';
protected $description = 'create database if not exists';
public function __construct()
{
parent::__construct();
}
public function handle()
{
$connections = config('database.connections');
foreach ($connections as $db) {
$database = $db['database'];
try {
$pdo = $this->getPDOConnection($db['host'], $db['port'], $db['username'], $db['password']);
$pdo->exec(sprintf(
'CREATE DATABASE IF NOT EXISTS %s CHARACTER SET %s COLLATE %s;',
$database,
$db['charset'],
$db['collation']
));
$this->info(sprintf('Successfully created %s database', $database));
} catch (\PDOException $exception) {
$this->error(sprintf('Failed to create %s database, %s', $database, $exception->getMessage()));
}
}
}
/**
* @param $host
* @param $port
* @param $username
* @param $password
* @return \PDO
*/
private function getPDOConnection($host, $port, $username, $password)
{
return new \PDO(sprintf('mysql:host=%s;port=%d;', $host, $port), $username, $password);
}
}
artisanコマンドを使用したDBのdrop
<?php
namespace App\Console\Commands;
class DropDatabase extends BaseCommand
{
protected $signature = 'db:drop';
protected $description = 'drop database';
public function __construct()
{
parent::__construct();
}
public function handle()
{
$connections = config('database.connections');
foreach ($connections as $db) {
$database = $db['database'];
try {
$pdo = $this->getPDOConnection($db['host'], $db['port'], $db['username'], $db['password']);
$pdo->exec(sprintf(
'DROP DATABASE %s ;',
$database
));
$this->info(sprintf('Successfully created %s database', $database));
} catch (\PDOException $exception) {
$this->error(sprintf('Failed to create %s database, %s', $database, $exception->getMessage()));
}
}
}
/**
* @param $host
* @param $port
* @param $username
* @param $password
* @return \PDO
*/
private function getPDOConnection($host, $port, $username, $password)
{
return new \PDO(sprintf('mysql:host=%s;port=%d;', $host, $port), $username, $password);
}
}
明日は、@Gaku_Ishiiさんの記事です。