LoginSignup
2
1

要件

IBMホストサーバから巨大な固定長テキストファイルがきます。例えば、2000万件契約データ、1レコード1000バイト、合計サイズ18.6GBです。このファイルは、担当組織別で50000件のCSVファイルに振り分けます。このような尋常ではない処理にどういうバッグまたはモンスターとぶつかるか今回の記事で紹介したいです。※以下のプログラムはefwのjsイベントの書き方です。

サンプルデータ

要件どおりのテストデータはなかなか手動で作れないから、windows環境で取り扱いできるサンプルのため、文字コードはIBMCp930とCp939ではなくMS932、レコードサイズは1000バイトではなく20バイト、レコード件数も100にします。このサンプルファイルを使っていろいろのやり方を探りましょう。
image.png
サンプルファイルがサイズ小さいから、エディターで開けます。10バイトがID、10バイトが名称のように繰り返しです。

例1、各種制限を考慮せず無邪気な例

var ary=new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).readAllLines();//全部レコードを一括で読み取る
for(var i=0;i<ary.length;i++){
    //IDで保存先を特定する。
    var writer= new CSVWriter("text&csv/seperated/"+ary[i][0]+".csv", ",", "\"", "MS932");
    writer.writeLine(ary[i]);//レコードを書き込む
    writer.close();
}

この例に問題になるのは、readAllLinesです。一気にデータファイル全体をメモリに持ち込んでいるから、データファイルサイズがギガバイトになるとメモリオーバーの恐れです。確かに、2千万行はまだjavaの配列制限を超えていないし、すごいサーバーならメモリに格納できる(かもしれない)し、だが職人精神で改善しましょう!※プログラマは職人といえるかしら?

例2、1件ずつ処理の慎重派の例

new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    //IDで保存先を特定する。
    var writer= new CSVWriter("text&csv/seperated/"+fields[0]+".csv", ",", "\"", "MS932");
    writer.writeLine(fields);//レコードを書き込む
    writer.close();
});

loopAllLinesで1件ずつ読み込んで1件ずつファイルに書き込みます。メモリの圧迫は絶対発生しないです。だが、読み込みと書き込みは混ざり込んで発生するから、ハードディスクIOはボトルネックになりそうです。

例3、ロット別でIOを分ける例

ロット(lot)という言葉は、鋼鉄生産に一回で大量に同じ品質なものを作るを指す意味です。その意味を借りて使います。以下のソースにはロット数は10にしていますが、実際のプログラムにロット数は20万行つまり200MBぐらいのサイズにしています。

var buffer=[];//ロット処理のバッファー
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    buffer.push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var i=0;i<buffer.length;i++){
        //IDで保存先を特定する。
        var writer= new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
        writer.writeLine(buffer[i]);//レコードを書き込む
        writer.close();
    }
    buffer=[];//バッファーを初期化する
}

読み込みと書き込みの分離を実現できす。だが、書き込み時一回のファイル開閉で1行データしか保存しません。どうももったいないですね。

例4、ライターの使いまわし例

例3をベースに、ライターの重複利用を考慮します。

var buffer=[];//ロット処理のバッファー
var writers={};//ライターを格納するマップ
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    buffer.push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
saveWriters();//ライターを一括で閉じる
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var i=0;i<buffer.length;i++){
        //IDで保存先を特定する。
        var writer=writers[buffer[i][0]];
        if (writer==null){
            writer=new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
            writers[buffer[i][0]]=writer;
        }
        writer.writeLine(buffer[i]);//レコードを書き込む
    }
    buffer=[];//バッファーを初期化する
}
//--------ライターを一括で閉じる関数
function saveWriters(){
    for(var key in writers){
        if (key=="debug")continue;
        writers[key].close();
    }
}

一つのライターは複数行データの保存に使えて効率がよくなります。だが、計算してみると、組織別5万件があるから、処理の最後に5万CSVファイルを同時開くことになり、危ない気ですね。具体的な制限数を見つかっていないですが、テストで数千ファイル同時開ける程度とわかりました。

例5、バッファーの配列をID別に分ける例

ライターを開きばなしできないから、バッファー配列を分割して保存しやすいようにします。

var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
new BinaryReader(
    "text&csv/myText.txt",//読み取るファイル
    [10,10],//項目ごとのバイト数
    ["MS932","MS932"],//項目ごとの文字コード
    20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
    //もしID別の配列がまだ存在しない場合、その配列を初期化する
    if (buffer[fields[0]]==null)buffer[fields[0]]=[];
    buffer[fields[0]].push(fields);
    if (index % 10 == 0){//ロット数に達すかどうか判断
        saveBuffer();//ロットを保存する
    }
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var key in buffer){
        if (key=="debug")continue;
        var ary=buffer[key];
        var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
        for(var i=0;i<ary.length;i++){
             writer.writeLine(ary[i]);//レコードを書き込む
        }
        writer.close();
    }
    buffer={};//バッファーを初期化する
}

同時開くファイル数の問題を回避できています。特に問題がありません。これでよいです。だが、職人精神でさらに改善(改悪か)を続けます。

例6、マルチスレッドの例

読み込みを複数スレッドで行うように試します。

var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
var hasDataFlag=false;//データ有無フラグ
var lot=0;
do{
    hasDataFlag=false;//初期値false
    var threads = new Threads(2);
    threads.add({from:0+lot*10 ,run:makeCsvBuffer});
    threads.add({from:5+lot*10 ,run:makeCsvBuffer});
    threads.run();//マルチスレッドを実行する
    saveBuffer();//バッファーを保存する。データある場合、hasDataFlagをtrueにする
    lot++;
}while(hasDataFlag);
//------以下はCSVバッファーを作成する関数
function makeCsvBuffer(){
    new BinaryReader(
        "text&csv/myText.txt",//読み取るファイル
        [10,10],//項目ごとのバイト数
        ["MS932","MS932"],//項目ごとの文字コード
        20,//1つレイアウトのバイト数
        this.from,//読み込み開始レコード番号
        5//読み込み件数、ロット件数/スレッド数
    ).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
        //もしID別の配列がまだ存在しない場合、その配列を初期化する
        helloTextCSVThread_submit.mylocker.lock();//ロックする
            if (buffer[fields[0]]==null)buffer[fields[0]]=[];
            buffer[fields[0]].push(fields);
        helloTextCSVThread_submit.mylocker.unlock();//ロック解除する
    });
}
//------以下はバッファー保存用の内部関数
function saveBuffer(){
    for (var key in buffer){
        if (key=="debug")continue;
        var ary=buffer[key];
        var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
        for(var i=0;i<ary.length;i++){
             writer.writeLine(ary[i]);//レコードを書き込む
        }
        writer.close();
        hasDataFlag=true;
    }
    buffer={};//バッファーを初期化する
}

マルチスレッドでbuffer変数を操作するから、lockerを使って同期します。そうしないと、スレッドAがbufferに配列を追加する途中で、スレッドBがキーのなしと判断してまだ配列を追加する操作を行うと、スレッドAの追加データがなくなります。この効果は2つのスレッドでも見えます。

実際のプロジェクトに試した結果ですが、時間短縮の効果は例5よりわずか10%程度です。推測の原因は以下です。
・そもそも読み込み処理ははやいです。
・bufferを操作する際の同期のせいでマルチスレッドは台無しになります。
・当時のプログラムにDB取込機能もあります。それの割合が結構大きいですから、読み込みのマルチスレッドが思うより効果がないように見えます。

なしよりましですが、保守性を考慮するとどうかなレベルです。

今回のサンプルは以下のリンクからダウンロードできます。

利用するjarファイルです。

<dependency>
    <groupId>io.github.efwgrp</groupId>
    <artifactId>efw</artifactId>
    <version>4.07.000</version>
</dependency>

jdk15以上の場合、関連jarが必要です。

<dependency>
    <groupId>org.openjdk.nashorn</groupId>
    <artifactId>nashorn-core</artifactId>
    <version>15.4</version>
</dependency>
2
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1