前に、こんな記事を書いたけど、
http://qiita.com/kiichi54321/items/f07e56a99d236f12a3cf
自分で再び使おうと思ったら、やりにくくて仕方なかったから、新たに作った。
ソース https://github.com/kiichi54321/ParallelCountLib/
使い方。例:TinySegmenterを使い、テキストファイルを一行づつ、文章を分かち書きしたものを集計。
TinySegmenterDotNet.TinySegmenter tinySegmenter = new TinySegmenterDotNet.TinySegmenter();
var dic = ParallelCountLib.ParallelCount.RunTextFile("sample.txt",
(n) =>
{
return tinySegmenter.Segment(n);
});
dic.Save(@"result.txt", 20);
これだけ。
6個のスレッドを使いそれぞれで集計し、それをまとめ上げを行います。
カウント処理は、ロックが発生するため、複数のスレッドで数えて、まとめるのがいいです。
普通にシングルスレッドでForeachを回すより、3分の1から4分の1のスピードアップになります。(core i7を使用)
IEnumerable がソースとしてあればいいので、対象がファイルでなくても集計したいという時に便利かと思います。
とはいえ、このIEnumerableは、スレッド対応してないとダメみたい。
当たり前ですが、ファイルをRamディスクなりSSDなりに置くと、速くなります。
特徴としては、逐次読み出しみたいなことをしていて、一度にすべてのデータを読み込むなんていうことはしません。
とはいえ、初期設定で、かなりの量を読み込む設定になっています。readRange パラメータがそれ。
そのため、初期設定のままでは、小さいListでは、並列化の効果は発揮しないと思います。
また、副産物的にこんな拡張メソッドも作ってみました。わりと便利。返り値があるので、さらにメソッドチェーンをつなげることもできます。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ParallelCount.Extend
{
public static class Parallel
{
private static int threadNum = 6;
public static int ThreadNum
{
get { return threadNum; }
set { threadNum = value; }
}
public static IEnumerable<T> ForEach<T>(this IEnumerable<T> source, Action<T> action)
{
ConcurrentStack<T> stack = new ConcurrentStack<T>(source);
List<System.Threading.Tasks.Task<List<T>>> tasks = new List<System.Threading.Tasks.Task<List<T>>>();
for (int i = 0; i < ThreadNum; i++)
{
var task = System.Threading.Tasks.Task.Factory.StartNew<List<T>>((n) =>
{
List<T> list = new List<T>();
while (true)
{
T s;
if (stack.TryPop(out s) == false)
{
break;
}
action(s);
list.Add(s);
}
return list;
}, System.Threading.Tasks.TaskCreationOptions.LongRunning);
if (task != null) tasks.Add(task);
}
System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
return tasks.SelectMany(n => n.Result);
}
public static IEnumerable<T> ForEach<Source, T>(this IEnumerable<Source> source, Func<Source, T> func)
{
ConcurrentStack<Source> stack = new ConcurrentStack<Source>(source);
List<System.Threading.Tasks.Task<List<T>>> tasks = new List<System.Threading.Tasks.Task<List<T>>>();
for (int i = 0; i < ThreadNum; i++)
{
var task = System.Threading.Tasks.Task.Factory.StartNew<List<T>>((n) =>
{
List<T> list = new List<T>();
while (true)
{
Source s;
if (stack.TryPop(out s) == false)
{
break;
}
list.Add(func(s));
}
return list;
}, System.Threading.Tasks.TaskCreationOptions.LongRunning);
if (task != null) tasks.Add(task);
}
System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
return tasks.SelectMany(n => n.Result);
}
}
}
stack.TryPopが並列処理的にやや重たい処理のため、並列化する内容が重たくないと効果は発揮しづらいと思う。
そのため、初めからデータを分割しちゃった方がいいかなぁとか思う。
とはいえ、本家のParallelForeachを適切に使えばいいんじゃね?みたいな感じもする。.....まぁいいか。