31
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

TPL Dataflowを使って複数のスレッドからの結果を一つのファイルに書き込む

Last updated at Posted at 2015-10-09

#はじめに

微妙に知られているようで知られていないマイクロソフトが作っているライブラリーに
System.Threading.Tasks.Dataflowがあります。
ライブラリーの取得は、Nugetで行います。

日本語ドキュメントは、「MSDN」と「未確認飛行」ぐらいというものなのですが、使ってみるとかなり便利です。

C#における非同期処理は、Task、Rx、async/await 等色々ありますが、これも一つの柱である思います。

#TPL Dataflowとは

データフロープログラミングとアクターモデルを組み合わせた非同期処理をするためのライブラリーです。
非同期データフローを実現するために、アクターモデルを採用しているというものです。
実際使ってみると、データフローのためのライブラリーというよりも、アクターモデルのおまけとしてデータフローが作れるという感じで、どんどん使っていると、データフローになっているという感じのものです。
アクターモデルについての解説は、このスライドがいいです。→アクターモデルについて

#TPL Dataflow を使うためのusingの指定

これからのソースコードには、定番のLINQやTaskのUsingに加えて、このusingの指定があります。

using System.Threading.Tasks.Dataflow;

これは拡張メソッドがあるので必ず初めに加えてください。(ややはまるポイント)

#複数のスレッドから一つのファイルに書き込むの困難さ

今回示す事例は、「複数のスレッドからの結果を一つのファイルに書き込む」です。これは、やや難易度が高い問題です。なぜならば、複数のスレッドから一つのファイルに対して書き込むことができないからです。ファイルへの書き込みは、必ず一つのスレッドから行わなければなりません。そのため、書き込みを一つのスレッドで集中して行うというプログラミングをする必要があります。
TPL Dataflowは、このような問題を容易にプログラミングする仕組みを提供します。

#1 シンプルなアクターモデル

これはfileWriteBlockに複数のスレッドからPostするという形です。
アクターモデルとして基本的な振る舞いです。

filename

[TestMethod]
public void TestMethod1()
{
    //ファイルに書き込むためのActionBlock
    ActionBlock<string> fileWriteBlock = new ActionBlock<string>((n) => {
        using (var f = System.IO.File.AppendText("test.txt"))
        {
            f.WriteLine(n);
        }
    });

    //複数のスレッドからfileWriteBlockにPostする。
    var tasks = Enumerable.Range(0, 5).Select(n => Task.Factory.StartNew(() => {
        for (int i = 0; i < 500; i++)
        {
            fileWriteBlock.Post(n + "_" + i);
        } })).ToArray();
    Task.WaitAll(tasks);
    //fileWriteBlockのすべてを実行させる。
    fileWriteBlock.Complete();
    //fileWriteBlockが終わるのを待つ
    fileWriteBlock.Completion.Wait();
}

このようにとてもシンプルに作ることができます。
しかし、これはとても遅いです。なぜならば、一行書くたびに、毎回ファイルのOpen、Closeをしているため、数が多くなれば、それだけ時間がかかるようになります。(私の環境では13秒かかりました。)
実際には、別のところでファイルハンドルを開き、どこかのタイミングでCloseすれば問題ないのですが、まぁ、話の展開的にこうします。

これをましな実行速度にします。要は、Open、Closeの回数を減らせばいいのです。

#2 BatchBlockを使い、データの蓄積を行う
BatchBlockは、Postがある指定数に達した段階で次のブロックにデータを送ります。
batchBlockとfileWriteBlockをLinkToでつなげます。
DataflowLinkOptions() { PropagateCompletion = true }をすることがポイントで、batchBlockをComplete()したあと、fileWriteBlockもComplete()します。
batchBlockで、指定個数に溜まりきっていないデータを最後に実行するために、Complete()を必ず実行します。

filename
[TestMethod]
public void TestMethod2()
{
    //100個たまったら、次のブロックに送る
    BatchBlock<string> batchBlock = new BatchBlock<string>(100);
    //ファイルに書き込むためのActionBlock
    ActionBlock<string[]> fileWriteBlock2 = new ActionBlock<string[]>((n) => {
        using (var f = System.IO.File.AppendText("test2.txt"))
        {
            n.ToList().ForEach(m => f.WriteLine(m));
        }
    });
  //blockをつなげる。
    batchBlock.LinkTo(fileWriteBlock2, new DataflowLinkOptions() { PropagateCompletion = true });

    //複数のスレッドからfileWriteBlockにPostする。
    var tasks = Enumerable.Range(0, 5).Select(n => Task.Factory.StartNew(() => {
        for (int i = 0; i < 500; i++)
        {
            batchBlock.Post(n + "_" + i);
        }
    })).ToArray();
    Task.WaitAll(tasks);
    //溜まりきっていないもの送る
    batchBlock.Complete();
    //fileWriteBlock2が終わるのを待つ。
    fileWriteBlock2.Completion.Wait();
}

実行すると、0.14秒になりました。期待通りの性能ですかね。

#3 すべてをデータフローで表現してみる。

TransformManyBlockを使い、数値から複数の文字列を生成することで、今まで、Taskを生成していた並列入力をデータフローで表現します。

filename
[TestMethod]
public void TestMethod3()
{
    BufferBlock<int> inputBlock = new BufferBlock<int>();
    
    //並列数5で実行させる。
    TransformManyBlock<int, string> transBlock = new TransformManyBlock<int, string>((n) =>
     {
         return Enumerable.Range(0, 500).Select(m => n + "_" + m);
     },new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 5 });

    //100個たまったら、次のブロックに送る
    BatchBlock<string> batchBlock = new BatchBlock<string>(100);
    //ファイルに書き込むためのActionBlock
    ActionBlock<string[]> fileWriteBlock2 = new ActionBlock<string[]>((n) => {
        using (var f = System.IO.File.AppendText("test3.txt"))
        {
            n.ToList().ForEach(m => f.WriteLine(m));
        }
    });

    //blockをつなげる。
    inputBlock.LinkTo(transBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    transBlock.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    batchBlock.LinkTo(fileWriteBlock2, new DataflowLinkOptions() { PropagateCompletion = true });

    //inputBlockに入力する。
    for (int i = 0; i < 5; i++)
    {
        inputBlock.SendAsync(i);
    }
    //実行
    inputBlock.Complete();
    fileWriteBlock2.Completion.Wait();
}

Postでは、並列実行してくれなかったのでSendAsyncで送り、それを実行させています。
MaxDegreeOfParallelismの指定であっさり並列実行数を指定できるのがポイントです。すごいですね。
ちなみに、ExecutionDataflowBlockOptions.Unbounded と指定すると、限界いっぱいまで、並列実行を行うみたいです。

#まとめ
TPL Dataflowは、データフローとアクターモデルが融合したライブラリーです。示したように、少ない記述で、並列実行のためのプログラミングをバグを少なく作ることができるようになるものです。これは、関数型プログラミングの良さをC#に取り入れる仕組みでもあります。もともと、データフロープログラミングは、並列実行、関数型プログラミングからの方法論です。データフロープログラミングは、データが主役になるので、データの処理には、最適です。ちなみに、LINQが便利に感じるのも、同様に、データが主役になっているためです。データ駆動型ってやつですね。TPL Dataflowは、LINQほどクールに書けないのが問題ですけど。
一応、欠点としては、メッセージを投げるため、オーバーヘッドがあり、最速を考えるなら、普通に書いたほうがおそらく速いです。非同期処理の闇と戦う必要がありますけどね。そんなのゴメンだという人は使うといいと思います。私もゴメンです。

そういうわけで、TPL Dataflowの日本語ドキュメントはMSDNぐらいで、それが最強みたいな感じなのですが、とても使えるライブラリーだと思います。

31
34
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
31
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?