0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Laravel】558万行のCSVを2分でDBに取り込むCommandクラスの作り方

Last updated at Posted at 2025-02-15

大容量CSVのテストデータとして国税庁 法人番号公表サイトのデータを使用して、取込処理を作成していきます。
Unicodeの全国のデータをダウンロードすると2025.01.31のデータで558万件データがあります。
容量も1.2GBなのでいい感じのテストデータです。

ダウンロードが終わったらstorage\app\privateディレクトリに保存します。

CSVの内容

00_zenkoku_all_20250131.csv
// サンプルデータ
1,1000012160153,01,1,2018-04-02,2015-10-05,"釧路検察審査会",,101,"北海道","釧路市","柏木町4-7",,01,206,0850824,,,,,,,2015-10-05,1,"Kushiro Committee for the Inquest of Prosecution","Hokkaido","4-7, Kashiwagicho, Kushiro shi",,"クシロケンサツシンサカイ",0

内容を記載したドキュメントが不足しているので、だいたい分かる情報をまとめるとおそらく下記のような内容になります。

列番号 内容 サンプルデータ
1 法人番号 1000012160153
6 法人名 釧路検察審査会
9 都道府県 北海道
10 市区町村 釧路市
11 番地 柏木町4-7
13 都道府県コード 01
14 市区町村コード 206
15 郵便番号 0850824
28 法人名(カナ) クシロケンサツシンサカイ

テーブルの作成

composer
    php artisan make:model -m Company
xxxx_xx_xx_xxxxx_create_companies_table.php
    public function up(): void
    {
        Schema::create('companies', function (Blueprint $table) {
            $table->id();
            $table->string('corporate_number')->unique();
            $table->string('name');
            $table->string('name_kana')->nullable(); #法人名(カナ)がない場合がある為
            $table->string('postal_code');
            $table->string('prefecture');
            $table->string('city');
            $table->string('address');
            $table->datetimes();
        });
    }
Company.php

class Company extends Model
{
    protected $fillable = [
        'corporate_number',
        'name',
        'name_kana',
        'postal_code',
        'prefecture',
        'city',
        'address'
    ];
}

コマンドクラスの作成

composer
    php artisan make:command ImportCompany
app/Console/Commands/ImportCompany
<?php

namespace App\Console\Commands;

use App\Models\Company;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\{
    DB,
    Storage
};

class ImportCompany extends Command
{
    protected $signature = 'import:company';

    private mixed $stream;
    private array $insertParams = [];
    private int $totalCount = 0;
    private Carbon $dateTime;

    const BATCH_SIZE = 5000;

    const CORPORATE_NUMBER = 1;
    const NAME = 6;
    const CITY = 10;
    const ADDRESS = 11;
    const PREFECTURE = 13;
    const POSTAL_CODE = 15;
    const NAME_KANA = 28;
    
    public function handle()
    {
        $this->dateTime = now();

        $this->stream = Storage::readStream('00_zenkoku_all_20250131.csv');

        while ($data = fgetcsv($this->stream, 0, ',')) {
            $companyParam = [];
            
            $companyParam['name']             = $data[self::NAME];
            $companyParam['name_kana']        = $data[self::NAME_KANA];
            $companyParam['postal_code']      = $data[self::POSTAL_CODE];
            $companyParam['corporate_number'] = $data[self::CORPORATE_NUMBER];
            $companyParam['prefecture']       = $data[self::PREFECTURE];
            $companyParam['city']             = $data[self::CITY];
            $companyParam['address']          = $data[self::ADDRESS];
            $companyParam['created_at']       = $this->dateTime;
            $companyParam['updated_at']       = $this->dateTime;

            $this->insertParams[] = $companyParam;
            ++$this->totalCount;

            if ($this->totalCount % self::BATCH_SIZE === 0) {
                $this->bulkInsert();
            }
        }
        if (!empty($this->insertParams)) {
            $this->bulkInsert();
        }

        fclose($this->stream);
    }

    private function bulkInsert()
    {
        DB::beginTransaction();
        try {
            Company::insert($this->insertParams);
            DB::commit();
        } catch(\Exception $e) {
            DB::rollBack();
        } finally {
            $this->insertParams = [];
        }
    }
}

composer
    php artisan import:company

これで一旦処理が完成しました。

処理のポイント

1. Storage::readStreamを使用する
大容量CSVを読み込む際、Storage::get()等で取得することで一気にCSVを読み込んでしまいます。そうすることで確実に変数のメモリ不足のエラーが発生します。
Streamを使用することで1行ずつ読み込んでいくことができるのでお勧めです。

2. バルクインサートを意識する
Model::create()ではその都度DBにクエリが走ります。
insertメソッドでは配列を渡すことで、1度のクエリで複数同時にインサートができます。
このバルクインサートの粒度はサーバーのスペックや配列(カラム)の要素数次第で変更することをお勧めします。
定数BATCH_SIZEの値を増減させて対応させてください。

3. $insertParamsの要素数の取得
メンバ変数に$totalCountを宣言して対応しています。
count()メソッドで$insertParams内の要素数を数えるよりもパフォーマンスが上がります。
もっと細かい点を挙げると、インクリメントでカウントをとっていますが、前置インクリメントの方が後置インクリメントより処理速度が早いです。

注意点

1. システムカラムについて
insert()メソッドではcreated_at, updated_atが自動で作成されません。
その為、手動で設定する必要があります。

2. BATCH_SIZEの上限について
メモリ以外にも、各DBエンジンにはプレースホルダーの上限数があります。
それを超えないくらいの粒度で行う必要があります。

3. fclose()について
処理の最後にfclose()でストリームを閉じています。
Commandクラスでは処理終了時にPHPのプロセス自体終了する為、無かったとしても一部例外を除いて問題は起きません。
ただ、一部例外として、S3などの外部ストレージから取得する際、fclose()で閉じられなければ、外部ストレージ側は完全にリクエストが終了したか分からず接続を維持し続けてしまう可能性が出てきます。
処理実行中に例外やエラーが発生した場合、上記のCommandクラスではその問題が発生します。
その為、リスク管理として$streamを取得する所から処理全体をtry文で囲み、finallyブロックでfclose()することがベストプラクティスです。

app/Console/Commands/ImportCompany
        try {
            $this->stream = Storage::readStream('00_zenkoku_all_20250131.csv');
            # ...
            # ...
            # ...
        } finaly {
            # 正常終了でも例外発生でもストリームを閉じる
            fclose($this->stream);
        }

ベンチマーク測定

LaravelのCommandクラスのベンチマークを測定する新しめのライブラリが公開されていたので、そちらを使用して測定してみたいと思います。
※ PHP8.3を要求されます。

インストール

composer
    composer require christophrumpel/artisan-benchmark

実行結果

BATCH_SIZEが5000の場合

composer
    php artisan benchmark import:company

    ⚡  TIME: 2m 6s   MEM: 2960.36MB   SQL: 1117 

結果は2分6秒でした。メモリ使用量も出力してくれるのはありがたいですね。
ベンチマークを測るだけかと思いきや、コマンドクラスの処理自体はしっかり実行されるのでその点だけ注意してください。

差分取込について

DBの設計自体が変わるリプレイスのデータ移管作業や、システムをスケールアップする場合の移管作業などを実務で行う場合、サービスを長期間止めない為、メンテナンス前にその時点までのデータ(以降:初回データ)と、メンテナンス中に残りのデータ(以降:差分データ)を渡され、移管していくという状況になると思います。
その際、差分データが初回のデータが含まれた状態で渡される場合もあるので、その場合に対応できるコマンドを作成します。

要件

  • 初回データに含まれていたデータは新しくレコードを作成しない
  • 差分データは新しくレコードを作成する
  • 複数回呼び出せる処理にする
  • リプレイスのメンテナンス期間中では、DBの移行から始まり、そこから最終テストを行い、リリースという流れになるので、ここが終わらなければ全て遅れる為、実行速度も意識する

再現方法

国税庁 法人番号公表サイトの北海道のCSVをダウンロードします。
北海道のCSVから取り込み、その後に全国のCSVを取り込みます。
既にある北海道のデータは作成せずに、他のデータを取り込んでいきます。
テストに使う北海道のCSVもstorage\app\privateディレクトリに保存します。

コマンドクラスの作成

最初に作成したクラスに手を加えます。

app/Console/Commands/ImportCompany.php
<?php

namespace App\Console\Commands;

use App\Models\Company;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\{
    DB,
    Storage
};

class ImportCompany extends Command
{
    protected $signature = 'import:company {--path=}';

    private mixed $stream;
    private array $insertParams = [];
    private array $updateParams = [];
    private int $totalCount = 0;
    private Carbon $dateTime;

    private array $companyIds;

    const BATCH_SIZE = 5000;

    const CORPORATE_NUMBER = 1;
    const NAME = 6;
    const CITY = 10;
    const ADDRESS = 11;
    const PREFECTURE = 13;
    const POSTAL_CODE = 15;
    const NAME_KANA = 28;

    public function handle()
    {
        if(is_null($this->option('path'))){
            $this->error('--pathオプションを指定してください。');
            return;
        }
        
        $this->dateTime = now();
        $this->stream = Storage::readStream($this->option('path'));

        if(is_null($this->stream)){
            $this->error('CSVファイルが見つかりませんでした。');
            return;
        }

        # インポート済みかを判別する存在確認用の連想配列
        $this->companyIds = DB::table('companies')
            ->pluck('id', 'corporate_number')
            ->toArray();

        while ($data = fgetcsv($this->stream, 0, ',')) {
            $companyParam = [];

            $companyParam['name']             = $data[self::NAME];
            $companyParam['name_kana']        = $data[self::NAME_KANA];
            $companyParam['postal_code']      = $data[self::POSTAL_CODE];
            $companyParam['corporate_number'] = $data[self::CORPORATE_NUMBER];
            $companyParam['prefecture']       = $data[self::PREFECTURE];
            $companyParam['city']             = $data[self::CITY];
            $companyParam['address']          = $data[self::ADDRESS];

            # 法人番号が存在確認用の連想配列のキーにあるかどうか
            if (isset($this->companyIds[$data[self::CORPORATE_NUMBER]])) {
                # 既に取込済みなのでidカラムの値を追加
                $companyParam['id'] = $this->companyIds[$data[self::CORPORATE_NUMBER]];
                # アップデート用の配列に追加
                $this->updateParams[] = $companyParam;
            } else {
                # 取込んでいないデータなのでシステムカラムに値を追加
                $companyParam['created_at'] = $this->dateTime;
                $companyParam['updated_at'] = $this->dateTime;
                # インサート用の配列に追加
                $this->insertParams[] = $companyParam;
            }

            ++$this->totalCount;

            if ($this->totalCount % self::BATCH_SIZE === 0) {
                $this->bulkSave();
            }
        }

        if (!empty($this->insertParams) || !empty($this->updateParams)) {
            $this->bulkSave();
        }

        fclose($this->stream);
    }

    private function bulkSave()
    {
        DB::beginTransaction();
        try {
            Company::insert($this->insertParams);
            # id,法人番号を検索対象にしてバルクアップデート
            Company::upsert($this->updateParams, ['id', 'corporate_number']);
            DB::commit();
        } catch(\Exception $e) {
            DB::rollBack();
        } finally {
            $this->insertParams = [];
            $this->updateParams = [];
        }
    }
}

実行

初回データ(北海道のみのデータ)の取込

composer
    php artisan import:company --path=01_hokkaido_all_20250131.csv

差分データ(全国のデータ)の取込

composer
    ddev artisan import:company --path=00_zenkoku_all_20250131.csv

処理のポイント

1. 存在確認用の配列について

        $this->companyIds = DB::table('companies')
            ->pluck('id', 'corporate_number')
            ->toArray();
        # 中身
#        [
#          1000011000005 => 1241513
#          1000012010003 => 1241514
#          1000012010011 => 1241515
#          1000012010028 => 1241516
#          ...
#        ]

上記の処理でCSV内の一意な値(今回は法人番号)のキーとテーブルのIDがバリューの連想配列が出来上がります。
この配列はテーブル内に既にデータが存在するかを判別する為に使用します。
この配列を作成する際の注意点になります。
上記の処理は下記の処理と出力される中身が同じです。

    # Eloquentの場合(ダメな例)
    Company::query()
        ->get()
        ->pluck('id', 'corporate_number')
        ->toArray();

ですが、Eloquentとクエリビルダでは処理速度が大幅に変わります。
EloquentではModelインスタンスに1度マッピングする処理が入る為、オーバーヘッドが発生します。
データ量やデータ構造によっては処理速度に10倍以上の差が出ます。

2. 存在確認について

今回はisset()関数を使用していますが、PHPでは配列に値が含まれているかを確認する関数としてin_array()関数もあります。
なぜin_array()ではなくisset()を使用するかというと、PHPの連想配列にはハッシュアルゴリズムが実装されており、高速な検索が可能になっているからです。
したがって配列ではなく値自体を検索しにいくin_array()は使用しないでください。
こちらは100倍では済まないくらい遅くなります。

## ダメな例
if (in_array($data[self::CORPORATE_NUMBER], array_keys($this->companyIds)))

3. upsertメソッド

insert()同様、複数同時にアップデートする際のメソッドがupsert()になります。
insert()upsert()もEloquentは遅いっていう癖に、クエリビルダじゃなくてCompanyからしてるじゃんYo。」ってなりますが、複数挿入時のメソッドでは大きな差は発生しません。
upsert()はシステムカラムのupdated_atを自動で更新します。

4. switchはなるべく避ける

本来はinsert(),upsert()bulkSave()内でEnumなどを元にswitchするのがベストだとも思えますが、処理は遅くなります。
insert(),upsert()共に空の配列で行われたとしても例外は発生しない為、速度を重視するのであればまとめて書く方がメリットはあります。

5. どうしてバルクアップデートしてるの?という点

既にあるレコードは別にupdateかけない方が早いよねというのはもちろんそうです。
ただ、実際のDB移管作業で、初回データ取込後に取り込んだテーブルの情報が変わっているなんて事が考えられます。
-- 1月1日に初回データを抽出、取込を実施
-- 1月10日にメンテナンス及び差分データを抽出、取込を実施
という移管スケジュールだった場合、初回データ取込時に取り込んだユーザーが1月3日にメールアドレスを変更したりあるかと思います。その場合に対応させる為です。

ベンチマーク測定

悲報、benchmarkライブラリ Commandの引数オプションに対応していませんでした。
その為、処理が終了する前に下記コードを追加して実行速度を測定します。
結果は秒単位の処理速度、処理したデータ数が出力されます。

app/Console/Commands/ImportCompany.php
        $this->info(number_format($this->dateTime->diffInSeconds(now())) . '秒');
        $this->info(number_format($this->totalCount) . '件');

初回データ(北海道)取込

実行結果
5秒
227,423件

初回データ(北海道)取込後の状態に差分データ(全国)の取込

実行結果
118秒
5,583,642件

早いのは早いのでいいのですが、差分取込しないバージョンのCommandクラスより速度が上がっちゃいました。
新規作成するデータが、北海道の22万件分減った事が原因かと考えられます。

初回データ(北海道)取込後の状態に、差分データ(全国)の取込。その状態の後さらにもう1度全国データを取込

実行結果
174秒
5,583,642件

伸びました。
原因はただ1つでcompaniesテーブル内に558万件とデータが増えたことにより、$companyIdsの取得に時間がかかっている為です。
なお、全国データの取込は2回目なのでデータは増えません。

初回データ(北海道)取込後の状態に、差分データ(全国)の取込。その状態の後さらにもう1度全国データを取込。その状態で、さらにクエリビルダではなくEloquentを使った場合

処理のポイントでダメな例として挙げたEloquentを使用して存在確認用の配列を作成する処理を作成します。

    $this->companyIds = Company::query()
        ->get()
        ->pluck('id', 'corporate_number')
        ->toArray();
実行結果
1,092秒
5,583,642件

オソスンギ

大量データを取得する際はEloquentよりクエリビルダの使用をお勧めします。

他言語に比べ処理が遅いと言われるPHPですが、その中でも最適化できたと考えています。
Laravelでの移管作業の参考にして頂けると幸いです。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?