はじめに
URL https://github.com/kiichi54321/ParallelCountLib/
C#で作成した並列化したカウントクラスです。
MapReduceアルゴリズムを元ネタにしています。
Core i7 で、ハードディスクにある、6GBのデータの組み合わせカウントをシングルスレッドでは65分のところを、20分程度で終わらせることができました。
3分の1程度の速度UPでしょうか。RamドライブやSSDのドライブを使うともっと早くなり、10分程度で出来ました。
並列化バンザイ。
用途
やっていることはhadoopの真似事なのですが、1PCを前提にしています。
そのため、インフラの整備が必要ありません。一つのPCで行えます。
ほどほどの大きさのビックデータの分析にいいと思います。数G程度のデータ。
Ramディスクに入るぐらいの大きさのデータ、SSDドライブに入る程度の大きさのデータがいいでしょう。
使い方
1.分割ファイルの作成
FileDivisionByKeyクラスで巨大ファイルを分割。通常のトランザクションデータなら、ユーザIDをKeyとしてソートする。
大量にファイルを分割するのは、スレッドで複数のファイルを読むこむためと、データを小さくすることでソートを軽くするために行う。
2.数え方を定義(通常はIntCountを使えばいいのでスルーしていい)
ICountDataStructを継承したクラスを作る。
通常のただ数えるだけならIntCountを使えばいいのでスルーしていい
同じKeyのものがあった時の処理を定義している。MapReduceにおいてはReduce処理に相当。
定義次第では、ベクトルとかも扱える。まだ作っていないけど。
3.ReadDataクラスの作成
BaseReadDataクラスを継承した、データを読み込むためのクラスを作る。
GroupByKeyFuncを設定して、まとめるKeyを設定する。
その中で、ReadLinesAction()をオーバーライドして、蓄積されたReadLinesからOnAddCountを叩いて数え上げるデータを生成する。
4.ParallelCountの起動
今まで作ったReadData、CountDataを設定して ParallelCountをNewする。
作成するスレッド数は初期設定で6です。CPUやメモリーなどをかんがみて設定してください。
CPUのコア数以上にスレッドを設定しても遅くなるし、スレッドを増やすとメモリー使用量も増えます。
Runで、分割ファイルを最終出力ファイルを設定して、実行開始。待つ。
結果はタブ区切りのKeyValueデータで返ってきます。
ファイル分割のサンプル
void Sample()
{
//カンマ区切りのデータであること前提
FileDivisionByKey file = new FileDivisionByKey();
//分割先の設定
file.GetHashFunc = (n) =>
{
//初めの列を使い、それがLong型なので変換して、それを120の余りをハッシュに設定。
var s = n.Split(',').First();
long value;
if (long.TryParse(s, out value))
{
var a = long.Parse(s) % 120;
return a.ToString();
}
return "-1";
};
//初めの列を一番目のソートキーにする
file.GetKeyFunc = (n) =>
{
return n.Split(',').FirstOrDefault();
};
//二番目の列を2番目のソートキーにする
file.GetSubKeyFunc = (n) =>
{
return n.Split(',').ElementAtOrDefault(1);
};
file.FileNameHeader = "Division";
file.Folder = "Data";
file.Run("DataSource.txt");
//これで、データを120個に分割し、それぞれがソートされたデータが作られる。
//重たい処理なので、実行時はスレッドにするといいと思う。
}
ReadDataクラスのサンプル
購買履歴データをユーザごとにまとめて、購買の共起関係を調べることを仮定しています。
public class SampleReadData : BaseReadData<IntCount>
{
public override string GetGroupByKey(string line)
{
//カンマ区切りの初めの列をキーとして設定。
return line.Split(',').FirstOrDefault();
}
public override void ReadLinesAction()
{
//カンマ区切りの3番目の列を集計に使用。
List<string> list = new List<string>();
//this.ReadLinesにはGetGroupByKeyでグループ化されたデータが複数入っている。
foreach (var item in this.ReadLines)
{
var data = item.Split(',').ElementAtOrDefault(2);
if (data != null) list.Add(data);
}
//単純集計
foreach (var item in list)
{
//OnAddCountで数え上げ。第一引数は、ハッシュ。
this.OnAddCount(item, item, IntCount.Default);
}
list = list.Distinct().OrderBy(n=>n).ToList();
//組み合わせ生成(共起関係の集計)
for (int i = 0; i < list.Count - 1; i++)
{
for (int l = i + 1; l < list.Count; l++)
{
//カウントするのをOnAddCountで設定。
this.OnAddCount(list[i], list[i] + "_" + list[l], IntCount.Default);
}
}
}
}
実行例
void Run()
{
var files = System.IO.Directory.GetFiles("Data").Where(n => n.Contains("sorted"));
using (ParallelCount<IntCount,SampleReadData> paraCount = new ParallelCount<IntCount, SampleReadData>())
{
paraCount.ThreadNum = 4;
paraCount.ReportAction = (n) => { System.Console.WriteLine(n); };
paraCount.Run("result.txt", files);
}
}