はじめに
業務用のアプリを作ってるとCSVファイルからデータベースに何かを入れたり更新するというのは日常的に出てくる要望です
みんな大好きなMicrosoft Excelで編集できるというのが一番のメリットでしょうか
MongoDBの場合、無加工でもいい状態ならmongoimportでもある程度はまかなえたりしますが通常何も考えずにExcelから書き出されるCSVファイルというのは文字コードがShift JISだったり文字列ゆえの日付や時刻データのハンドリング、あるいはアプリケーション固有のデータに落とし込むなどの何かしらの変換が必要で、そのためのプログラムを書くなんていうのもまたよくある話です
今回のお題
サンプルデータとしてなんちゃって個人情報さんから全フィールド入り5000件を6回生成して合体させた30000件のCSVを食わせてMongoDBに突っ込むというスクリプトを作ってみます
CSVファイルを変換しつつ読み込む
CSVファイルの文字コードがShift JISでデータベースはUTF-8というのはよくある話です
さらにCSVファイルの中には半角カナ、全角英数字が入り乱れているのも非開発部門から来るファイルによくみられる特徴です
PHPの場合、mb_convert_encoding()で文字コードを変換してあげて、mb_convert_kana()を使って全角半角の統一などの変換をすることになるのですが、ファイルが巨大な場合は一度全部ファイルの内容をメモリに読み込んで変換する方法はなるべくとりたくありません
1行ずつ読み込んで都度変換・・・というのもクオート処理もろもろで面倒くさいことこの上なくなります
PHPにはfgetcsv()という行ベースのシーケンシャルアクセスでCSVを処理するにはもってこいの関数があるのですが、マルチバイト環境では地雷原のようなものでした(fgetcsvでぐぐるとハマった先人たちの苦闘の記録がたくさんあります)
そんな苦闘の中から福音がもたらされることになります
ストリームフィルタを使って等価的にマルチバイト系の変換をしながらfgetcsv()で読み込めばいいのです
実装は下記にあります
今回はGitHubからzipとしてダウンロードしてメインのファイル直下にsrcフォルダ以下のStreamディレクトリを配置しました
composerで入れることもできるらしいのですが何やらひと手間かかる模様です
参考→ fgetcsvを使ってSJISのCSVをロードするとき
BulkWriteを使ってみる
MongoDBにはBulkWriteというコレクションに対してINSERT/UPDATE/DELETE/UPSERTをバッチ的に行える機能があります
今回はPHP7 + MongoDB driver + mongodb/mongo-php-libraryでこのBulkWriteを使ってみます
mongodb-php-libraryでのBulkWriteの使い方はこちらで説明されています
参考→ BulkWrite
実際に動かす
愚直にINSERTしていくバージョンとBulkWriteバージョンのPHPソースと必要なライブラリをそろえるためのcomposer.jsonを準備します
今回は進捗状況を表示するためにguiguiboy/PHP-CLI-Progress-Barを利用しました
{
"mongodb/mongodb": "^1.0.0",
"guiguiboy/php-cli-progress-bar": "dev-master"
}
通常INSERT版
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
while ($columns = fgetcsv($fp, 4096)){
$collection->insertOne([
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]);
$bar->advance();
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
BulkWrite版
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
$bulks = [];
while ($columns = fgetcsv($fp, 4096)){
$bulks[] = [
'insertOne' => [[
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]]
];
// 1000件ずつ書き込んでおいたほうが安全らしい
if(count($bulks) > 999){
$collection->bulkWrite($bulks);
$bulks = [];
}
$bar->advance();
}
// 残りのBulkwriteを処理する
if(count($bulks) > 0){
$collection->bulkWrite($bulks);
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
同一スペックのマシンで3回ほど流してみた結果(単位は秒)
通常INSERT | Bulk Write | |
---|---|---|
1回目 | 40.45 | 24.52 |
2回目 | 41.43 | 26.05 |
3回目 | 40.09 | 25.15 |
実務でも日次バッチで巨大な20万行ほどのCSVファイル食わせる処理を書いていたのですが、もともとのCSVファイルの処理方法のマズさもあって(たぶんメモリ全読み込みのせいでスワップしてたと思われる)えらく時間がかかっていたのですが、ストリームフィルタ読みとBulkWrite書き込みに変更したところ劇的に速くなりました