8
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MongoDBにCSVから大量のデータを流し込む

Last updated at Posted at 2016-08-23

はじめに

業務用のアプリを作ってるとCSVファイルからデータベースに何かを入れたり更新するというのは日常的に出てくる要望です
みんな大好きなMicrosoft Excelで編集できるというのが一番のメリットでしょうか

MongoDBの場合、無加工でもいい状態ならmongoimportでもある程度はまかなえたりしますが通常何も考えずにExcelから書き出されるCSVファイルというのは文字コードがShift JISだったり文字列ゆえの日付や時刻データのハンドリング、あるいはアプリケーション固有のデータに落とし込むなどの何かしらの変換が必要で、そのためのプログラムを書くなんていうのもまたよくある話です

今回のお題

サンプルデータとしてなんちゃって個人情報さんから全フィールド入り5000件を6回生成して合体させた30000件のCSVを食わせてMongoDBに突っ込むというスクリプトを作ってみます

CSVファイルを変換しつつ読み込む

CSVファイルの文字コードがShift JISでデータベースはUTF-8というのはよくある話です
さらにCSVファイルの中には半角カナ、全角英数字が入り乱れているのも非開発部門から来るファイルによくみられる特徴です

PHPの場合、mb_convert_encoding()で文字コードを変換してあげて、mb_convert_kana()を使って全角半角の統一などの変換をすることになるのですが、ファイルが巨大な場合は一度全部ファイルの内容をメモリに読み込んで変換する方法はなるべくとりたくありません

1行ずつ読み込んで都度変換・・・というのもクオート処理もろもろで面倒くさいことこの上なくなります

PHPにはfgetcsv()という行ベースのシーケンシャルアクセスでCSVを処理するにはもってこいの関数があるのですが、マルチバイト環境では地雷原のようなものでした(fgetcsvでぐぐるとハマった先人たちの苦闘の記録がたくさんあります)

そんな苦闘の中から福音がもたらされることになります
ストリームフィルタを使って等価的にマルチバイト系の変換をしながらfgetcsv()で読み込めばいいのです

実装は下記にあります

今回はGitHubからzipとしてダウンロードしてメインのファイル直下にsrcフォルダ以下のStreamディレクトリを配置しました
composerで入れることもできるらしいのですが何やらひと手間かかる模様です

参考→ fgetcsvを使ってSJISのCSVをロードするとき

BulkWriteを使ってみる

MongoDBにはBulkWriteというコレクションに対してINSERT/UPDATE/DELETE/UPSERTをバッチ的に行える機能があります
今回はPHP7 + MongoDB driver + mongodb/mongo-php-libraryでこのBulkWriteを使ってみます

mongodb-php-libraryでのBulkWriteの使い方はこちらで説明されています

参考→ BulkWrite

実際に動かす

愚直にINSERTしていくバージョンとBulkWriteバージョンのPHPソースと必要なライブラリをそろえるためのcomposer.jsonを準備します
今回は進捗状況を表示するためにguiguiboy/PHP-CLI-Progress-Barを利用しました

参考→PHPのCLIスクリプトでプログレスバー

composer.json
{
    "mongodb/mongodb": "^1.0.0",
    "guiguiboy/php-cli-progress-bar": "dev-master"
}

通常INSERT版

linewriter.php
<?php

require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';

stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');

$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();

sleep(1);

$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);


stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');

$time_start = microtime(true);
while ($columns = fgetcsv($fp, 4096)){
    $collection->insertOne([
        'name' => $columns[0],
        'kana' => $columns[1],
        'email' => $columns[2],
        'gender' => $columns[3],
        'age' => $columns[4],
        'birth' => $columns[5],
        'bride' => $columns[6],
        'blood' => $columns[7],
        'prefecture' => $columns[8],
        'tel1' => $columns[9],
        'tel2' => $columns[10],
        'carrier' => $columns[11],
        'curry' => $columns[12]
    ]);
    $bar->advance();
}
$time_end = microtime(true);
$time = $time_end - $time_start;

echo $time . " seconds\n";

BulkWrite版

bulkwriter.php
<?php

require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';

stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');

$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();

sleep(1);

$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);


stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');

$time_start = microtime(true);
$bulks = [];
while ($columns = fgetcsv($fp, 4096)){
    $bulks[] = [
        'insertOne' => [[
            'name' => $columns[0],
            'kana' => $columns[1],
            'email' => $columns[2],
            'gender' => $columns[3],
            'age' => $columns[4],
            'birth' => $columns[5],
            'bride' => $columns[6],
            'blood' => $columns[7],
            'prefecture' => $columns[8],
            'tel1' => $columns[9],
            'tel2' => $columns[10],
            'carrier' => $columns[11],
            'curry' => $columns[12]
        ]]
    ];
    // 1000件ずつ書き込んでおいたほうが安全らしい
    if(count($bulks) > 999){
        $collection->bulkWrite($bulks);
        $bulks = [];
    }
    $bar->advance();
}

// 残りのBulkwriteを処理する
if(count($bulks) > 0){
    $collection->bulkWrite($bulks);
}

$time_end = microtime(true);
$time = $time_end - $time_start;

echo $time . " seconds\n";

同一スペックのマシンで3回ほど流してみた結果(単位は秒)

通常INSERT Bulk Write
1回目 40.45 24.52
2回目 41.43 26.05
3回目 40.09 25.15

実務でも日次バッチで巨大な20万行ほどのCSVファイル食わせる処理を書いていたのですが、もともとのCSVファイルの処理方法のマズさもあって(たぶんメモリ全読み込みのせいでスワップしてたと思われる)えらく時間がかかっていたのですが、ストリームフィルタ読みとBulkWrite書き込みに変更したところ劇的に速くなりました

8
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?