LoginSignup
4
3

More than 5 years have passed since last update.

TPL Dataflowのサンプル

Last updated at Posted at 2015-02-14

はじめに

SQL Serverに大量のデータを変換しながら出力したい場合、1テーブルならIEnumerable から IDataReader へ変換で出来る。
1つのテーブルから複数のテーブルへ出力する場合、効率よく変換する方法はないかと調べたところ、Microsoft TPL Dataflow を見つけた。
なぜ非同期を使うのか?

  1. メモリの負担をかけないためにはIEnumerableを使いたい(それしか知らない)
  2. IEnumurableを使うとテーブルを出力する度にデータを読む(効率がよくない)
  3. 非同期にすることで入力と出力が同時に動かせることが出来る

TPLをどのように使えば良いのか試行錯誤して、非同期でIEnumurableを返すクラスを作成してみた。
これで、IDataReaderに変換して、SqlBulkCopyに渡すことができる。

準備

Microsoft TPL Dataflow はNugetで取得する。

サンプルコード

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

class DataflowReadWrite
{
    class OutBuffer<T>
    {
        private BufferBlock<T> buf = new BufferBlock<T>();

        private bool isEnd = false;

        public void Post(T val)
        {
            buf.Post(val);
        }

        public void Complite()
        {
            isEnd = true;
        }

        public IEnumerable<T> AsEnumerable()
        {
            // 0.5秒待ってもデータがない場合終了の判断をする
            var tsLimit = new TimeSpan(0, 0, 0, 0, 500);
            while (!isEnd || buf.Count > 0)
            {
                T val;
                try
                {
                    val = buf.Receive(tsLimit);
                    Console.WriteLine("recive val:{0}", val.ToString());
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    continue;
                }
                // yield return はtryの中にはかけない
                yield return val;
            }

        }


    }

    static void Main(string[] args)
    {

        var buf1 = new OutBuffer<int>();
        var ite1 = buf1.AsEnumerable();
        var buf2 = new OutBuffer<int>();
        var ite2 = buf2.AsEnumerable();

        var reder = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                buf1.Post(i);
                buf2.Post(i);
                Thread.Sleep(1000);
                Console.WriteLine("send val:{0}", i);
            }
            buf1.Complite();
            buf2.Complite();

        });
        var writer1 = Task.Run(() =>
        {
            foreach (var val in ite1)
                Console.WriteLine("Write1 val:{0}", val);
        });
        var writer2 = Task.Run(() =>
        {
            foreach (var val in ite2)
                Console.WriteLine("Write2 val:{0}", val);
        });
        Task.WaitAll(reder, writer1, writer2);

    }
}

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3