10
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

要件

IBMホストサーバから巨大な固定長テキストファイルがきます。例えば、2000万件契約データ、1レコード1000バイト、合計サイズ18.6GBです。このファイルは、担当組織別で50000件のCSVファイルに振り分けます。このような膨大なデータを処理するときにどのようなバグや課題が発生するのか今回の記事で紹介したいです。※以下のプログラムはefwのjsイベントの書き方です。

サンプルデータ

要件どおりのテストデータを手動で作成することは手間がかかるため、windows環境で取り扱いできるサンプルにして、文字コードはIBMCp930とCp939ではなくMS932、レコードサイズは1000バイトではなく20バイト、レコード件数も100にします。このサンプルファイルを使って様々なデータの処理方法を探りましょう。
image.png
サンプルファイルがサイズ小さいため、エディターで開くことができます。10バイトがID、10バイトが名称のように繰り返しです。

例1) 各種制限を考慮しない無邪気な場合

var ary=new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).readAllLines();//全部レコードを一括で読み取る
for(var i=0;i<ary.length;i++){
    //IDで保存先を特定する。
    var writer= new CSVWriter("text&csv/seperated/"+ary[i][0]+".csv", ",", "\"", "MS932");
    writer.writeLine(ary[i]);//レコードを書き込む
    writer.close();
}

この例で問題になるのは、readAllLinesです。一気にデータファイル全体をメモリに保存するため、データファイルサイズがギガバイトになるとメモリオーバーする恐れがあります。確かに、2千万行はまだjavaの配列制限を超えていないし、すごいサーバーならメモリに格納できる(かもしれない)。
だが職人精神で改善しましょう!※プログラマは職人といえるかしら?

例2) 1件ずつ処理を行う慎重派の場合

new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    //IDで保存先を特定する。
    var writer= new CSVWriter("text&csv/seperated/"+fields[0]+".csv", ",", "\"", "MS932");
    writer.writeLine(fields);//レコードを書き込む
    writer.close();
});

loopAllLinesで1件ずつ読み込んでその都度ファイルに書き込みます。そのためメモリの圧迫は絶対に発生しません。しかし、読み込みと書き込みは混ざり込んで発生するため、ハードディスクIOはボトルネックになりそうです。

例3) ロット別でIOを分ける場合

ロット(lot)という言葉は、鋼鉄生産時、一回で大量に同じ品質のものを作ることを意味しています。以下のソースではロット数を10にしていますが、実際のプログラムではロット数を20万行つまり200MBぐらいのサイズにしています。

var buffer=[];//ロット処理のバッファー
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    buffer.push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var i=0;i<buffer.length;i++){
        //IDで保存先を特定する。
        var writer= new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
        writer.writeLine(buffer[i]);//レコードを書き込む
        writer.close();
    }
    buffer=[];//バッファーを初期化する
}

読み込みと書き込みの分離を実現できます。だが、書き込み時一回のファイル開閉で1行データしか保存しません。無駄があるといえます。

例4) ライターの使いまわし例

例3をベースに、ライターの重複利用を考慮します。

var buffer=[];//ロット処理のバッファー
var writers={};//ライターを格納するマップ
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    buffer.push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
saveWriters();//ライターを一括で閉じる
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var i=0;i<buffer.length;i++){
        //IDで保存先を特定する。
        var writer=writers[buffer[i][0]];
        if (writer==null){
            writer=new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
            writers[buffer[i][0]]=writer;
        }
        writer.writeLine(buffer[i]);//レコードを書き込む
    }
    buffer=[];//バッファーを初期化する
}
//--------ライターを一括で閉じる関数
function saveWriters(){
    for(var key in writers){
        if (key=="debug")continue;
        writers[key].close();
    }
}

一つのライターは複数行データの保存に使うことができ、効率がよくなります。だが、計算してみると、組織別のCSVファイルが5万件あるため、処理の最後に5万件のCSVファイルを同時に開くことになり、バグの発生リスクがあります。具体的な制限数は見つかっていませんが、テストで数千ファイル同時に開ける程度だとわかりました。

例5) バッファーの配列をID別に分ける例

ライターを常時開くことができないため、バッファー配列を分割して保存しやすいようにします。

var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    //もしID別の配列がまだ存在しない場合、その配列を初期化する
    if (buffer[fields[0]]==null)buffer[fields[0]]=[];
    buffer[fields[0]].push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var key in buffer){
        if (key=="debug")continue;
        var ary=buffer[key];
        var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
        for(var i=0;i<ary.length;i++){
             writer.writeLine(ary[i]);//レコードを書き込む
        }
        writer.close();
    }
    buffer={};//バッファーを初期化する
}

同時に開くファイル数の問題を回避できています。これで特に問題はありませんが、職人精神でさらなる改善を続けます。

例6) マルチスレッドの例

読み込みを複数スレッドで行うように試します。

var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
var hasDataFlag=false;//データ有無フラグ
var lot=0;
do{
    hasDataFlag=false;//初期値false
    var threads = new Threads(2);
    threads.add({from:0+lot*10 ,run:makeCsvBuffer});
    threads.add({from:5+lot*10 ,run:makeCsvBuffer});
    threads.run();//マルチスレッドを実行する
    saveBuffer();//バッファーを保存する。データある場合、hasDataFlagをtrueにする
    lot++;
}while(hasDataFlag);
//------以下はCSVバッファーを作成する関数
function makeCsvBuffer(){
    new BinaryReader(
        "text&csv/myText.txt",//読み取るファイル
        [10,10],//項目ごとのバイト数
        ["MS932","MS932"],//項目ごとの文字コード
        20,//1つレイアウトのバイト数
        this.from,//読み込み開始レコード番号
        5//読み込み件数、ロット件数/スレッド数
    ).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
        //もしID別の配列がまだ存在しない場合、その配列を初期化する
        helloTextCSVThread_submit.mylocker.lock();//ロックする
            if (buffer[fields[0]]==null)buffer[fields[0]]=[];
            buffer[fields[0]].push(fields);
        helloTextCSVThread_submit.mylocker.unlock();//ロック解除する
    });
}
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var key in buffer){
        if (key=="debug")continue;
        var ary=buffer[key];
        var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
        for(var i=0;i<ary.length;i++){
             writer.writeLine(ary[i]);//レコードを書き込む
        }
        writer.close();
        hasDataFlag=true;
    }
    buffer={};//バッファーを初期化する
}

マルチスレッドでbuffer変数を操作するため、lockerを使って同期します。そうしないと、スレッドAがbufferに配列を追加する途中で、スレッドBがキーなしと判断して、さらに配列を追加する操作を行うと、スレッドAの追加データがなくなります。この効果は2つのスレッドでも見えます。

実際のプロジェクトで試した結果ですが、時間短縮の効果は例5よりわずか10%程度です。原因の推測は以下の通りです。
・そもそも読み込み処理がはやいです。
・bufferを操作する際の同期のせいでマルチスレッドは台無しになります。
・当時のプログラムにDB取込機能もあります。それの割合が結構大きいため、読み込みのマルチスレッドが思ったより効果がないように見えます。

あったほうが良いですが、保守性を考慮するとどうかと疑うレベルです。

今回のサンプルは以下のリンクからダウンロードできます。

利用するjarファイルです。

<dependency>
    <groupId>io.github.efwgrp</groupId>
    <artifactId>efw</artifactId>
    <version>4.07.000</version>
</dependency>

jdk15以上の場合、関連jarが必要です。

<dependency>
    <groupId>org.openjdk.nashorn</groupId>
    <artifactId>nashorn-core</artifactId>
    <version>15.4</version>
</dependency>
10
1
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?