はじめに
SQL Serverに大量のデータを変換しながら出力したい場合、1テーブルならIEnumerable から IDataReader へ変換で出来る。
1つのテーブルから複数のテーブルへ出力する場合、効率よく変換する方法はないかと調べたところ、Microsoft TPL Dataflow を見つけた。
なぜ非同期を使うのか?
- メモリの負担をかけないためにはIEnumerableを使いたい(それしか知らない)
- IEnumurableを使うとテーブルを出力する度にデータを読む(効率がよくない)
- 非同期にすることで入力と出力が同時に動かせることが出来る
TPLをどのように使えば良いのか試行錯誤して、非同期でIEnumurableを返すクラスを作成してみた。
これで、IDataReaderに変換して、SqlBulkCopyに渡すことができる。
準備
Microsoft TPL Dataflow はNugetで取得する。
サンプルコード
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
class DataflowReadWrite
{
class OutBuffer<T>
{
private BufferBlock<T> buf = new BufferBlock<T>();
private bool isEnd = false;
public void Post(T val)
{
buf.Post(val);
}
public void Complite()
{
isEnd = true;
}
public IEnumerable<T> AsEnumerable()
{
// 0.5秒待ってもデータがない場合終了の判断をする
var tsLimit = new TimeSpan(0, 0, 0, 0, 500);
while (!isEnd || buf.Count > 0)
{
T val;
try
{
val = buf.Receive(tsLimit);
Console.WriteLine("recive val:{0}", val.ToString());
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
continue;
}
// yield return はtryの中にはかけない
yield return val;
}
}
}
static void Main(string[] args)
{
var buf1 = new OutBuffer<int>();
var ite1 = buf1.AsEnumerable();
var buf2 = new OutBuffer<int>();
var ite2 = buf2.AsEnumerable();
var reder = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
buf1.Post(i);
buf2.Post(i);
Thread.Sleep(1000);
Console.WriteLine("send val:{0}", i);
}
buf1.Complite();
buf2.Complite();
});
var writer1 = Task.Run(() =>
{
foreach (var val in ite1)
Console.WriteLine("Write1 val:{0}", val);
});
var writer2 = Task.Run(() =>
{
foreach (var val in ite2)
Console.WriteLine("Write2 val:{0}", val);
});
Task.WaitAll(reder, writer1, writer2);
}
}