概要
数億点のCSVファイルを読み込むにあたり高速化をできないかを試した備忘録になります。
行いたい処理としては”X,Y,Z座標が記載されたCSVファイルを読み込み、Double型の座標情報として配列に格納”となります。
結論
今回のケースではAgentを使用し、1億点のCSVファイルを処理するにあたり5割ほどの高速化を行えました。
使用ライブラリ
Agent
C++の同時実行ランタイムの非同期エージェントを使用し、ファイルを読み込む処理と読み込んだ文字列を座標データに変換する処理を並列に行います。(非同期エージェント)
ソースコード
高速化の考え方
1. 1つの読み込みスレッドと複数の変換スレッドを用意します。
2. CSVファイルを1行ずつ読み込み、100万行たまったらunbounded_bufferに投げて読み込みを続けます。
3. 読み込みスレッドはunbounded_bufferを監視し、メッセージを受け取ったら変換処理を行います。
こうすることにより読み込みと変換を並列に行え、かつ時間のかかる変換処理を複数のスレッドで受け持つことができます。
あくまでイメージですが、1マス100msと仮定すると、1スレッドで行うと100 * 4+200 * 4=1200[ms]のところ600[ms]で行える形になります。
ファイルの読み込みAgent
class fileRead_agent : public agent {
public:
fileRead_agent( ITarget<shared_ptr<vector<string>>>& target,string path,int receiveCnt,string sentinel ) :_target( target ),_path(path),_recCnt(receiveCnt),_sentinel(sentinel) {};
protected:
void run() {
std::ifstream inputFile(_path);
//終了条件を設定する
auto endAnnounce = make_shared<vector<string>>();
endAnnounce->push_back( _sentinel );
if (!inputFile.is_open()) {
asend( _target, endAnnounce );//終了条件としてendAnnounceを投げる
done();
}
std::string line;
shared_ptr<vector<string>> buffers = make_shared<vector<string>>();
buffers->reserve( BLOCK_COUNT );
//
while (std::getline(inputFile, line)) {
buffers->push_back( line );
//読み込んだ行がBLOCK_COUNTに達したらunbounded_bufferに送る
if (buffers->size() == BLOCK_COUNT) {
send( _target, buffers );
buffers.reset();//本スコープでは不要になったポインタを開放
buffers = make_shared<vector<string>>();//作り直し
}
}
//BLOCK_COUNTの端数を最後に送る
if (buffers->size() > 0) {
send( _target, buffers );
}
inputFile.close();//入力ファイルを閉じる
//立ち上げたTargetに終了を通知する
for (size_t i = 0; i < _recCnt; i++)
{
send( _target, endAnnounce );
}
done();//読込Agent終了
}
private:
const int BLOCK_COUNT = 1000000;
ITarget<shared_ptr<vector<string>>>& _target;
string _path;//読み込むファイルパス
int _recCnt;
string _sentinel;//終了条件となる文字列
};
文字列の変換Agent
class converter_agent :public agent {
public:
converter_agent( ISource<shared_ptr<vector<string>>>& source,bool usePara,string sentinel ) :_source( source ),_usePara(usePara),_sentinel(sentinel){};
concurrency::concurrent_vector<lineInfo>& GetInfos()& { return _infos; }
protected:
void run() {
shared_ptr<vector<string>> buffers=receive(_source);
if (!buffers) {
done();
return;
}
while (buffers) {
//終了条件が送られていないか調べる
if (buffers->size() == 1 && _sentinel == buffers->at( 0 )) {
done();
return;
}
/*ブロックを並列ループで処理の実験(逆に遅くなったため不採用)
parallel_for_each( buffers->begin(), buffers->end(), [&]( string buf ) {
std::vector<std::string> tokens;
std::istringstream iss( buf );
string token;
while (std::getline( iss, token, ',' )) {
tokens.push_back( token );
}
_infos.push_back( lineInfo( stod( tokens[0] ), stod( tokens[1] ), stod( tokens[2] ) ) );
} );*/
//各行を","で分割し、Doubleに変換
for (auto& buf : *buffers) {
std::vector<std::string> tokens;
std::istringstream iss( buf );
string token;
while (std::getline( iss, token, ',' )) {
tokens.push_back( token );
}
_infos.push_back( lineInfo( stod( tokens[0] ), stod( tokens[1] ), stod( tokens[2] ) ) );
}
buffers = receive( _source );
}
done();
}
private:
ISource<shared_ptr<vector<string>>>& _source;
bool _usePara;
concurrency::concurrent_vector<lineInfo>_infos;
string _sentinel;
};
メイン関数
//1つの読み込みAgentと複数の変換Agentを立てる
void ReadWithMultiAgent(concurrency::concurrent_vector<lineInfo>& allInfos) {
//メッセージをやりとりするbufferを定義
unbounded_buffer<shared_ptr<vector<string>>> buffer;
fileRead_agent reader( buffer, ReadFileName,3,gSentinel );
//変換Agentは3つ用意
converter_agent converter( buffer,false,gSentinel );
converter_agent converter2( buffer,false,gSentinel );
converter_agent converter3( buffer,false,gSentinel );
//各Agentを始動する
reader.start();
converter.start();
converter2.start();
converter3.start();
//すべてのAgentの終了を待機する
agent::wait( &reader );
agent::wait( &converter );
agent::wait( &converter2 );
agent::wait( &converter3 );
//3つの変換Agentから結果を受取り、結合
auto& moveInfos = converter.GetInfos();
allInfos=std::move( moveInfos );
auto& moveInfos_ = converter2.GetInfos();
std::move( moveInfos_.begin(), moveInfos_.end(), std::back_inserter(allInfos) );
auto& moveInfos3 = converter3.GetInfos();
std::move( moveInfos3.begin(), moveInfos3.end(), std::back_inserter(allInfos) );
cout << "AllInfos:" << allInfos.size() << endl;
}
void AgentMain() {
std::chrono::system_clock::time_point start, seek,agent,pAgent; // 型は auto で可
start = std::chrono::system_clock::now(); // 計測開始時間
ReadWithAgent();
agent = std::chrono::system_clock::now();
SeekingGetInfos(ReadFileName);
seek = std::chrono::system_clock::now();
concurrency::concurrent_vector<lineInfo> allInfos;
ReadWithMultiAgent(allInfos);
pAgent = std::chrono::system_clock::now();
cout <<"Agent仕様:"<< std::chrono::duration_cast<std::chrono::milliseconds>(agent - start).count() << endl;
cout <<"1行ずつ処理:"<< std::chrono::duration_cast<std::chrono::milliseconds>(seek - agent).count() << endl;
cout <<"並列Agent仕様:"<< std::chrono::duration_cast<std::chrono::milliseconds>(pAgent - seek).count() << endl;
}
比較結果
1億行のCSVファイルを読み込み配列に格納するまでの時間を計測しました。
- 読み込みと変換Agentを一つずつ使用: 117524[ms]
- 1スレッドで読み込んだ行を都度変換: 117273[ms]
- 読み込みAgent1つと変換Agent3つを使用:44626[ms]
考察
- はじめは1行毎に変換Agentに投げていましたが、オーバーヘッドや待機によりむしろ遅くなっていたため、1000000行を1ブロックとして投げる形となりました。
- また1ブロックを読み込むのに対して変換の方が時間がかかるため、unbounded_bufferがどんどん溜まっていく形になります。
そこで、変換Agentを複数作成 することにより1行ずつ変換を行う処理より高速な変換を実現できました。 - 数パターン試した中で1000000行1ブロックが性能が良かったのですが、もちろんメモリサイズとの兼ね合いやファイルサイズなどから適切な値は変わって来ます。
しかしなるべくブロックを大きくしたほうがオーバーヘッドを小さくすることができ、ファイルが小さければそもそも読み込み時間を小さくするメリットも少ないのでなるべくブロックは大きくするのが良いと現在は考えています。
おまけ
fgetsとgetline
ifstreamのstd::getline()を使用せず、fgetsを使用するだけでも処理速度が大きく変わります。
今回の主題とは外れるため置き換えていませんが、速度を重視するならばfgetsを使用するのが無難なようです。
ParallelForループ
文字列のブロックをParallelFor文を用いて並列に数値変換及び格納を行いましたが、むしろ遅くなりました。
数値変換自体の負荷が小さいため、並列ループのオーバーヘッド及び要素追加の待機時間のデメリットのほうが大きかったと考えられます。
感想
個人の感想で恐縮ですが、Agentの簡単な概要を理解し、ParallelForやUnique_ptrとの兼ね合いが少しわかった気がしたので記事を書いてみました。
補足や感想、こうするともっといいよ!などコメントをいただけるととっても嬉しいです:D
参考
非同期エージェント ライブラリ:https://learn.microsoft.com/ja-jp/cpp/parallel/concrt/asynchronous-agents-library?view=msvc-170