要件
IBMホストサーバから巨大な固定長テキストファイルがきます。例えば、2000万件契約データ、1レコード1000バイト、合計サイズ18.6GBです。このファイルは、担当組織別で50000件のCSVファイルに振り分けます。このような尋常ではない処理にどういうバッグまたはモンスターとぶつかるか今回の記事で紹介したいです。※以下のプログラムはefwのjsイベントの書き方です。
サンプルデータ
要件どおりのテストデータはなかなか手動で作れないから、windows環境で取り扱いできるサンプルのため、文字コードはIBMCp930とCp939ではなくMS932、レコードサイズは1000バイトではなく20バイト、レコード件数も100にします。このサンプルファイルを使っていろいろのやり方を探りましょう。
サンプルファイルがサイズ小さいから、エディターで開けます。10バイトがID、10バイトが名称のように繰り返しです。
例1、各種制限を考慮せず無邪気な例
var ary=new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20//1つレイアウトのバイト数
).readAllLines();//全部レコードを一括で読み取る
for(var i=0;i<ary.length;i++){
//IDで保存先を特定する。
var writer= new CSVWriter("text&csv/seperated/"+ary[i][0]+".csv", ",", "\"", "MS932");
writer.writeLine(ary[i]);//レコードを書き込む
writer.close();
}
この例に問題になるのは、readAllLinesです。一気にデータファイル全体をメモリに持ち込んでいるから、データファイルサイズがギガバイトになるとメモリオーバーの恐れです。確かに、2千万行はまだjavaの配列制限を超えていないし、すごいサーバーならメモリに格納できる(かもしれない)し、だが職人精神で改善しましょう!※プログラマは職人といえるかしら?
例2、1件ずつ処理の慎重派の例
new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
//IDで保存先を特定する。
var writer= new CSVWriter("text&csv/seperated/"+fields[0]+".csv", ",", "\"", "MS932");
writer.writeLine(fields);//レコードを書き込む
writer.close();
});
loopAllLinesで1件ずつ読み込んで1件ずつファイルに書き込みます。メモリの圧迫は絶対発生しないです。だが、読み込みと書き込みは混ざり込んで発生するから、ハードディスクIOはボトルネックになりそうです。
例3、ロット別でIOを分ける例
ロット(lot)という言葉は、鋼鉄生産に一回で大量に同じ品質なものを作るを指す意味です。その意味を借りて使います。以下のソースにはロット数は10にしていますが、実際のプログラムにロット数は20万行つまり200MBぐらいのサイズにしています。
var buffer=[];//ロット処理のバッファー
new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
buffer.push(fields);
if (index % 10 == 0){//ロット数に達すかどうか判断
saveBuffer();//ロットを保存する
}
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
for (var i=0;i<buffer.length;i++){
//IDで保存先を特定する。
var writer= new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
writer.writeLine(buffer[i]);//レコードを書き込む
writer.close();
}
buffer=[];//バッファーを初期化する
}
読み込みと書き込みの分離を実現できす。だが、書き込み時一回のファイル開閉で1行データしか保存しません。どうももったいないですね。
例4、ライターの使いまわし例
例3をベースに、ライターの重複利用を考慮します。
var buffer=[];//ロット処理のバッファー
var writers={};//ライターを格納するマップ
new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
buffer.push(fields);
if (index % 10 == 0){//ロット数に達すかどうか判断
saveBuffer();//ロットを保存する
}
});
saveBuffer();//ロット数未満の残データを保存する
saveWriters();//ライターを一括で閉じる
//------以下はバッファー保存用の内部関数
function saveBuffer(){
for (var i=0;i<buffer.length;i++){
//IDで保存先を特定する。
var writer=writers[buffer[i][0]];
if (writer==null){
writer=new CSVWriter("text&csv/seperated/"+buffer[i][0]+".csv", ",", "\"", "MS932");
writers[buffer[i][0]]=writer;
}
writer.writeLine(buffer[i]);//レコードを書き込む
}
buffer=[];//バッファーを初期化する
}
//--------ライターを一括で閉じる関数
function saveWriters(){
for(var key in writers){
if (key=="debug")continue;
writers[key].close();
}
}
一つのライターは複数行データの保存に使えて効率がよくなります。だが、計算してみると、組織別5万件があるから、処理の最後に5万CSVファイルを同時開くことになり、危ない気ですね。具体的な制限数を見つかっていないですが、テストで数千ファイル同時開ける程度とわかりました。
例5、バッファーの配列をID別に分ける例
ライターを開きばなしできないから、バッファー配列を分割して保存しやすいようにします。
var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20//1つレイアウトのバイト数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
//もしID別の配列がまだ存在しない場合、その配列を初期化する
if (buffer[fields[0]]==null)buffer[fields[0]]=[];
buffer[fields[0]].push(fields);
if (index % 10 == 0){//ロット数に達すかどうか判断
saveBuffer();//ロットを保存する
}
});
saveBuffer();//ロット数未満の残データを保存する
//------以下はバッファー保存用の内部関数
function saveBuffer(){
for (var key in buffer){
if (key=="debug")continue;
var ary=buffer[key];
var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
for(var i=0;i<ary.length;i++){
writer.writeLine(ary[i]);//レコードを書き込む
}
writer.close();
}
buffer={};//バッファーを初期化する
}
同時開くファイル数の問題を回避できています。特に問題がありません。これでよいです。だが、職人精神でさらに改善(改悪か)を続けます。
例6、マルチスレッドの例
読み込みを複数スレッドで行うように試します。
var buffer={};//ロット処理のバッファーマップ、ID別の配列を格納する
var hasDataFlag=false;//データ有無フラグ
var lot=0;
do{
hasDataFlag=false;//初期値false
var threads = new Threads(2);
threads.add({from:0+lot*10 ,run:makeCsvBuffer});
threads.add({from:5+lot*10 ,run:makeCsvBuffer});
threads.run();//マルチスレッドを実行する
saveBuffer();//バッファーを保存する。データある場合、hasDataFlagをtrueにする
lot++;
}while(hasDataFlag);
//------以下はCSVバッファーを作成する関数
function makeCsvBuffer(){
new BinaryReader(
"text&csv/myText.txt",//読み取るファイル
[10,10],//項目ごとのバイト数
["MS932","MS932"],//項目ごとの文字コード
20,//1つレイアウトのバイト数
this.from,//読み込み開始レコード番号
5//読み込み件数、ロット件数/スレッド数
).loopAllLines(function(fields,index){//全部レコードを1件ずつ読み取る
//もしID別の配列がまだ存在しない場合、その配列を初期化する
helloTextCSVThread_submit.mylocker.lock();//ロックする
if (buffer[fields[0]]==null)buffer[fields[0]]=[];
buffer[fields[0]].push(fields);
helloTextCSVThread_submit.mylocker.unlock();//ロック解除する
});
}
//------以下はバッファー保存用の内部関数
function saveBuffer(){
for (var key in buffer){
if (key=="debug")continue;
var ary=buffer[key];
var writer=new CSVWriter("text&csv/seperated/"+key+".csv", ",", "\"", "MS932");
for(var i=0;i<ary.length;i++){
writer.writeLine(ary[i]);//レコードを書き込む
}
writer.close();
hasDataFlag=true;
}
buffer={};//バッファーを初期化する
}
マルチスレッドでbuffer変数を操作するから、lockerを使って同期します。そうしないと、スレッドAがbufferに配列を追加する途中で、スレッドBがキーのなしと判断してまだ配列を追加する操作を行うと、スレッドAの追加データがなくなります。この効果は2つのスレッドでも見えます。
実際のプロジェクトに試した結果ですが、時間短縮の効果は例5よりわずか10%程度です。推測の原因は以下です。
・そもそも読み込み処理ははやいです。
・bufferを操作する際の同期のせいでマルチスレッドは台無しになります。
・当時のプログラムにDB取込機能もあります。それの割合が結構大きいですから、読み込みのマルチスレッドが思うより効果がないように見えます。
なしよりましですが、保守性を考慮するとどうかなレベルです。
今回のサンプルは以下のリンクからダウンロードできます。
利用するjarファイルです。
<dependency>
<groupId>io.github.efwgrp</groupId>
<artifactId>efw</artifactId>
<version>4.07.000</version>
</dependency>
jdk15以上の場合、関連jarが必要です。
<dependency>
<groupId>org.openjdk.nashorn</groupId>
<artifactId>nashorn-core</artifactId>
<version>15.4</version>
</dependency>