大容量CSVのテストデータとして国税庁 法人番号公表サイトのデータを使用して、取込処理を作成していきます。
Unicodeの全国のデータをダウンロードすると2025.01.31のデータで558万件データがあります。
容量も1.2GBなのでいい感じのテストデータです。
ダウンロードが終わったらstorage\app\private
ディレクトリに保存します。
CSVの内容
// サンプルデータ
1,1000012160153,01,1,2018-04-02,2015-10-05,"釧路検察審査会",,101,"北海道","釧路市","柏木町4-7",,01,206,0850824,,,,,,,2015-10-05,1,"Kushiro Committee for the Inquest of Prosecution","Hokkaido","4-7, Kashiwagicho, Kushiro shi",,"クシロケンサツシンサカイ",0
内容を記載したドキュメントが不足しているので、だいたい分かる情報をまとめるとおそらく下記のような内容になります。
列番号 | 内容 | サンプルデータ |
---|---|---|
1 | 法人番号 | 1000012160153 |
6 | 法人名 | 釧路検察審査会 |
9 | 都道府県 | 北海道 |
10 | 市区町村 | 釧路市 |
11 | 番地 | 柏木町4-7 |
13 | 都道府県コード | 01 |
14 | 市区町村コード | 206 |
15 | 郵便番号 | 0850824 |
28 | 法人名(カナ) | クシロケンサツシンサカイ |
テーブルの作成
php artisan make:model -m Company
public function up(): void
{
Schema::create('companies', function (Blueprint $table) {
$table->id();
$table->string('corporate_number')->unique();
$table->string('name');
$table->string('name_kana')->nullable(); #法人名(カナ)がない場合がある為
$table->string('postal_code');
$table->string('prefecture');
$table->string('city');
$table->string('address');
$table->datetimes();
});
}
class Company extends Model
{
protected $fillable = [
'corporate_number',
'name',
'name_kana',
'postal_code',
'prefecture',
'city',
'address'
];
}
コマンドクラスの作成
php artisan make:command ImportCompany
<?php
namespace App\Console\Commands;
use App\Models\Company;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\{
DB,
Storage
};
class ImportCompany extends Command
{
protected $signature = 'import:company';
private mixed $stream;
private array $insertParams = [];
private int $totalCount = 0;
private Carbon $dateTime;
const BATCH_SIZE = 5000;
const CORPORATE_NUMBER = 1;
const NAME = 6;
const CITY = 10;
const ADDRESS = 11;
const PREFECTURE = 13;
const POSTAL_CODE = 15;
const NAME_KANA = 28;
public function handle()
{
$this->dateTime = now();
$this->stream = Storage::readStream('00_zenkoku_all_20250131.csv');
while ($data = fgetcsv($this->stream, 0, ',')) {
$companyParam = [];
$companyParam['name'] = $data[self::NAME];
$companyParam['name_kana'] = $data[self::NAME_KANA];
$companyParam['postal_code'] = $data[self::POSTAL_CODE];
$companyParam['corporate_number'] = $data[self::CORPORATE_NUMBER];
$companyParam['prefecture'] = $data[self::PREFECTURE];
$companyParam['city'] = $data[self::CITY];
$companyParam['address'] = $data[self::ADDRESS];
$companyParam['created_at'] = $this->dateTime;
$companyParam['updated_at'] = $this->dateTime;
$this->insertParams[] = $companyParam;
++$this->totalCount;
if ($this->totalCount % self::BATCH_SIZE === 0) {
$this->bulkInsert();
}
}
if (!empty($this->insertParams)) {
$this->bulkInsert();
}
fclose($this->stream);
}
private function bulkInsert()
{
DB::beginTransaction();
try {
Company::insert($this->insertParams);
DB::commit();
} catch(\Exception $e) {
DB::rollBack();
} finally {
$this->insertParams = [];
}
}
}
php artisan import:company
これで一旦処理が完成しました。
処理のポイント
1. Storage::readStream
を使用する
大容量CSVを読み込む際、Storage::get()
等で取得することで一気にCSVを読み込んでしまいます。そうすることで確実に変数のメモリ不足のエラーが発生します。
Streamを使用することで1行ずつ読み込んでいくことができるのでお勧めです。
2. バルクインサートを意識する
Model::create()
ではその都度DBにクエリが走ります。
insert
メソッドでは配列を渡すことで、1度のクエリで複数同時にインサートができます。
このバルクインサートの粒度はサーバーのスペックや配列(カラム)の要素数次第で変更することをお勧めします。
定数BATCH_SIZE
の値を増減させて対応させてください。
3. $insertParams
の要素数の取得
メンバ変数に$totalCount
を宣言して対応しています。
count()
メソッドで$insertParams
内の要素数を数えるよりもパフォーマンスが上がります。
もっと細かい点を挙げると、インクリメントでカウントをとっていますが、前置インクリメントの方が後置インクリメントより処理速度が早いです。
注意点
1. システムカラムについて
insert()
メソッドではcreated_at
, updated_at
が自動で作成されません。
その為、手動で設定する必要があります。
2. BATCH_SIZE
の上限について
メモリ以外にも、各DBエンジンにはプレースホルダーの上限数があります。
それを超えないくらいの粒度で行う必要があります。
3. fclose()
について
処理の最後にfclose()
でストリームを閉じています。
Commandクラスでは処理終了時にPHPのプロセス自体終了する為、無かったとしても一部例外を除いて問題は起きません。
ただ、一部例外として、S3などの外部ストレージから取得する際、fclose()
で閉じられなければ、外部ストレージ側は完全にリクエストが終了したか分からず接続を維持し続けてしまう可能性が出てきます。
処理実行中に例外やエラーが発生した場合、上記のCommandクラスではその問題が発生します。
その為、リスク管理として$stream
を取得する所から処理全体をtry
文で囲み、finally
ブロックでfclose()
することがベストプラクティスです。
try {
$this->stream = Storage::readStream('00_zenkoku_all_20250131.csv');
# ...
# ...
# ...
} finaly {
# 正常終了でも例外発生でもストリームを閉じる
fclose($this->stream);
}
ベンチマーク測定
LaravelのCommandクラスのベンチマークを測定する新しめのライブラリが公開されていたので、そちらを使用して測定してみたいと思います。
※ PHP8.3を要求されます。
インストール
composer require christophrumpel/artisan-benchmark
実行結果
BATCH_SIZE
が5000の場合
php artisan benchmark import:company
⚡ TIME: 2m 6s MEM: 2960.36MB SQL: 1117
結果は2分6秒でした。メモリ使用量も出力してくれるのはありがたいですね。
ベンチマークを測るだけかと思いきや、コマンドクラスの処理自体はしっかり実行されるのでその点だけ注意してください。
差分取込について
DBの設計自体が変わるリプレイスのデータ移管作業や、システムをスケールアップする場合の移管作業などを実務で行う場合、サービスを長期間止めない為、メンテナンス前にその時点までのデータ(以降:初回データ)と、メンテナンス中に残りのデータ(以降:差分データ)を渡され、移管していくという状況になると思います。
その際、差分データが初回のデータが含まれた状態で渡される場合もあるので、その場合に対応できるコマンドを作成します。
要件
- 初回データに含まれていたデータは新しくレコードを作成しない
- 差分データは新しくレコードを作成する
- 複数回呼び出せる処理にする
- リプレイスのメンテナンス期間中では、DBの移行から始まり、そこから最終テストを行い、リリースという流れになるので、ここが終わらなければ全て遅れる為、実行速度も意識する
再現方法
国税庁 法人番号公表サイトの北海道のCSVをダウンロードします。
北海道のCSVから取り込み、その後に全国のCSVを取り込みます。
既にある北海道のデータは作成せずに、他のデータを取り込んでいきます。
テストに使う北海道のCSVもstorage\app\private
ディレクトリに保存します。
コマンドクラスの作成
最初に作成したクラスに手を加えます。
<?php
namespace App\Console\Commands;
use App\Models\Company;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\{
DB,
Storage
};
class ImportCompany extends Command
{
protected $signature = 'import:company {--path=}';
private mixed $stream;
private array $insertParams = [];
private array $updateParams = [];
private int $totalCount = 0;
private Carbon $dateTime;
private array $companyIds;
const BATCH_SIZE = 5000;
const CORPORATE_NUMBER = 1;
const NAME = 6;
const CITY = 10;
const ADDRESS = 11;
const PREFECTURE = 13;
const POSTAL_CODE = 15;
const NAME_KANA = 28;
public function handle()
{
if(is_null($this->option('path'))){
$this->error('--pathオプションを指定してください。');
return;
}
$this->dateTime = now();
$this->stream = Storage::readStream($this->option('path'));
if(is_null($this->stream)){
$this->error('CSVファイルが見つかりませんでした。');
return;
}
# インポート済みかを判別する存在確認用の連想配列
$this->companyIds = DB::table('companies')
->pluck('id', 'corporate_number')
->toArray();
while ($data = fgetcsv($this->stream, 0, ',')) {
$companyParam = [];
$companyParam['name'] = $data[self::NAME];
$companyParam['name_kana'] = $data[self::NAME_KANA];
$companyParam['postal_code'] = $data[self::POSTAL_CODE];
$companyParam['corporate_number'] = $data[self::CORPORATE_NUMBER];
$companyParam['prefecture'] = $data[self::PREFECTURE];
$companyParam['city'] = $data[self::CITY];
$companyParam['address'] = $data[self::ADDRESS];
# 法人番号が存在確認用の連想配列のキーにあるかどうか
if (isset($this->companyIds[$data[self::CORPORATE_NUMBER]])) {
# 既に取込済みなのでidカラムの値を追加
$companyParam['id'] = $this->companyIds[$data[self::CORPORATE_NUMBER]];
# アップデート用の配列に追加
$this->updateParams[] = $companyParam;
} else {
# 取込んでいないデータなのでシステムカラムに値を追加
$companyParam['created_at'] = $this->dateTime;
$companyParam['updated_at'] = $this->dateTime;
# インサート用の配列に追加
$this->insertParams[] = $companyParam;
}
++$this->totalCount;
if ($this->totalCount % self::BATCH_SIZE === 0) {
$this->bulkSave();
}
}
if (!empty($this->insertParams) || !empty($this->updateParams)) {
$this->bulkSave();
}
fclose($this->stream);
}
private function bulkSave()
{
DB::beginTransaction();
try {
Company::insert($this->insertParams);
# id,法人番号を検索対象にしてバルクアップデート
Company::upsert($this->updateParams, ['id', 'corporate_number']);
DB::commit();
} catch(\Exception $e) {
DB::rollBack();
} finally {
$this->insertParams = [];
$this->updateParams = [];
}
}
}
実行
初回データ(北海道のみのデータ)の取込
php artisan import:company --path=01_hokkaido_all_20250131.csv
差分データ(全国のデータ)の取込
ddev artisan import:company --path=00_zenkoku_all_20250131.csv
処理のポイント
1. 存在確認用の配列について
$this->companyIds = DB::table('companies')
->pluck('id', 'corporate_number')
->toArray();
# 中身
# [
# 1000011000005 => 1241513
# 1000012010003 => 1241514
# 1000012010011 => 1241515
# 1000012010028 => 1241516
# ...
# ]
上記の処理でCSV内の一意な値(今回は法人番号)のキーとテーブルのIDがバリューの連想配列が出来上がります。
この配列はテーブル内に既にデータが存在するかを判別する為に使用します。
この配列を作成する際の注意点になります。
上記の処理は下記の処理と出力される中身が同じです。
# Eloquentの場合(ダメな例)
Company::query()
->get()
->pluck('id', 'corporate_number')
->toArray();
ですが、Eloquentとクエリビルダでは処理速度が大幅に変わります。
EloquentではModelインスタンスに1度マッピングする処理が入る為、オーバーヘッドが発生します。
データ量やデータ構造によっては処理速度に10倍以上の差が出ます。
2. 存在確認について
今回はisset()
関数を使用していますが、PHPでは配列に値が含まれているかを確認する関数としてin_array()
関数もあります。
なぜin_array()
ではなくisset()
を使用するかというと、PHPの連想配列にはハッシュアルゴリズムが実装されており、高速な検索が可能になっているからです。
したがって配列ではなく値自体を検索しにいくin_array()
は使用しないでください。
こちらは100倍では済まないくらい遅くなります。
## ダメな例
if (in_array($data[self::CORPORATE_NUMBER], array_keys($this->companyIds)))
3. upsertメソッド
insert()
同様、複数同時にアップデートする際のメソッドがupsert()
になります。
「insert()
もupsert()
もEloquentは遅いっていう癖に、クエリビルダじゃなくてCompanyからしてるじゃんYo。」ってなりますが、複数挿入時のメソッドでは大きな差は発生しません。
upsert()
はシステムカラムのupdated_at
を自動で更新します。
4. switchはなるべく避ける
本来はinsert()
,upsert()
をbulkSave()
内でEnumなどを元にswitchするのがベストだとも思えますが、処理は遅くなります。
insert()
,upsert()
共に空の配列で行われたとしても例外は発生しない為、速度を重視するのであればまとめて書く方がメリットはあります。
5. どうしてバルクアップデートしてるの?という点
既にあるレコードは別にupdateかけない方が早いよねというのはもちろんそうです。
ただ、実際のDB移管作業で、初回データ取込後に取り込んだテーブルの情報が変わっているなんて事が考えられます。
-- 1月1日に初回データを抽出、取込を実施
-- 1月10日にメンテナンス及び差分データを抽出、取込を実施
という移管スケジュールだった場合、初回データ取込時に取り込んだユーザーが1月3日にメールアドレスを変更したりあるかと思います。その場合に対応させる為です。
ベンチマーク測定
悲報、benchmarkライブラリ Commandの引数オプションに対応していませんでした。
その為、処理が終了する前に下記コードを追加して実行速度を測定します。
結果は秒単位の処理速度、処理したデータ数が出力されます。
$this->info(number_format($this->dateTime->diffInSeconds(now())) . '秒');
$this->info(number_format($this->totalCount) . '件');
初回データ(北海道)取込
5秒
227,423件
初回データ(北海道)取込後の状態に差分データ(全国)の取込
118秒
5,583,642件
早いのは早いのでいいのですが、差分取込しないバージョンのCommandクラスより速度が上がっちゃいました。
新規作成するデータが、北海道の22万件分減った事が原因かと考えられます。
初回データ(北海道)取込後の状態に、差分データ(全国)の取込。その状態の後さらにもう1度全国データを取込
174秒
5,583,642件
伸びました。
原因はただ1つでcompaniesテーブル内に558万件とデータが増えたことにより、$companyIds
の取得に時間がかかっている為です。
なお、全国データの取込は2回目なのでデータは増えません。
初回データ(北海道)取込後の状態に、差分データ(全国)の取込。その状態の後さらにもう1度全国データを取込。その状態で、さらにクエリビルダではなくEloquentを使った場合
処理のポイントでダメな例として挙げたEloquentを使用して存在確認用の配列を作成する処理を作成します。
$this->companyIds = Company::query()
->get()
->pluck('id', 'corporate_number')
->toArray();
1,092秒
5,583,642件
オソスンギ
大量データを取得する際はEloquentよりクエリビルダの使用をお勧めします。
他言語に比べ処理が遅いと言われるPHPですが、その中でも最適化できたと考えています。
Laravelでの移管作業の参考にして頂けると幸いです。