はじめに
今までの経過からSqlBulkCopyを使い同時に複数のテーブルに出力することができた。
サンプルは下記の通り。
これで、1つのテーブルから分解してデータ分析しやすいようにDWHを構築ができる。
SSISは便利でサンプルコードのようなことが簡単にでき並列で処理できるが、複雑な処理を作成するときに処理が複雑になる。
その点、通常のコンソールプログラムで作れば下記のメリットがある。
- 共通処理をClass Libraryにできる
- テストができる
- バージョン管理できる
- ログに出力できる
サンプルコード
クラスのDataReaderExtensions とEnumerableDataReaderは「IEnumerable から IDataReader へ変換」を参照のこと。
サンプルは入院日、退院日の情報を基に、日別に展開している。
サンプルは簡素化のため、診療科、病棟など省略しているが、日別に展開することにより、その日にいた病棟、診療科を参照できる。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using System.Reflection;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
using SampleTPL.Properties;
namespace SampleTPL
{
static class DataReaderExtensions ...
internal class EnumerableDataReader<TSource> : IDataReader ...
class Program
{
class DataReaderEnumerable<TSource>
{
private readonly string _sql;
private readonly string _connectString;
private readonly List<PropertyInfo> listProp = new List<PropertyInfo>();
public DataReaderEnumerable(string sql, string connectString)
{
_sql = sql;
_connectString = connectString;
foreach (var prop in typeof(TSource).GetProperties())
{
listProp.Add(prop);
}
}
public IEnumerable<TSource> AsEnumerable()
{
using (var conn = new SqlConnection(_connectString))
{
conn.Open();
var cmd = new SqlCommand(_sql, conn);
using (var rd = cmd.ExecuteReader())
{
while (rd.Read())
{
TSource cls = Activator.CreateInstance<TSource>();
foreach (var fld in listProp)
fld.SetValue(cls, rd[fld.Name]);
yield return cls;
}
}
}
}
}
class OutBuffer<T>
{
private BufferBlock<T> buf = new BufferBlock<T>();
private bool isEnd = false;
private int cntPost = 0;
public int Count { get { return cntPost; } }
public void Post(T val)
{
buf.Post(val);
cntPost++;
if (buf.Count > 10000)
{
Thread.Sleep(10);
}
}
public void Complite()
{
isEnd = true;
}
public IEnumerable<T> AsEnumerable()
{
var tsLimit = new TimeSpan(0, 0, 0, 0, 500);
while (!isEnd || buf.Count > 0)
{
T val;
try
{
val = buf.Receive(tsLimit);
}
catch (Exception ex)
{
Console.WriteLine("msg:{0} time:{1}",
ex.Message, DateTime.Now);
continue;
}
yield return val;
}
}
}
class 入院履歴Base
{
public string 患者番号 { get; set; }
public string 入院日 { get; set; }
public string 退院日 { get; set; }
}
class 入院単位
{
public int 入院単位ID { get; set; }
public string 患者番号 { get; set; }
public string 入院日 { get; set; }
public string 退院日 { get; set; }
}
class 入院日別
{
public int 入院日別ID { get; set; }
public int 入院単位ID { get; set; }
public string 患者番号 { get; set; }
public string 日付 { get; set; }
public byte 新入院件数 { get; set; }
public byte 延件数 { get; set; }
public byte 退院件数 { get; set; }
}
static void Main(string[] args)
{
string con = Settings.Default.SQLConnectionString;
Console.WriteLine("出力開始 Time:{0}", DateTime.Now);
var buf1 = new OutBuffer<入院単位>();
var buf2 = new OutBuffer<入院日別>();
var reader = Task.Run(() =>
{
int id = 1;
int idDay = 1;
string sql = "select * from 入院履歴Base";
var rd = new DataReaderEnumerable<入院履歴Base>(sql, con);
foreach (var row in rd.AsEnumerable())
{
buf1.Post(new 入院単位 {
入院単位ID = id,
患者番号 = row.患者番号,
入院日 = row.入院日,
退院日 = row.退院日
});
var start = DateTime.ParseExact(row.入院日, "yyyyMMdd", null);
// 退院日が99999999は入院中として、現在の日付をセット
var end = row.退院日 == "99999999" ?
DateTime.Now.Date :
DateTime.ParseExact(row.退院日, "yyyyMMdd", null);
for (var cur = start; cur <= end; cur = cur.AddDays(1) )
{
buf2.Post(new 入院日別
{
入院日別ID = idDay,
入院単位ID = id,
患者番号 = row.患者番号,
日付 = cur.ToString("yyyyMMdd"),
新入院件数 = (byte)(start == cur ? 1 : 0),
延件数 = 1,
退院件数 = (byte)(end == cur ? 1 : 0),
});
idDay++;
}
id++;
}
buf1.Complite();
buf2.Complite();
Console.WriteLine("読み込み終了 Time:{0}", DateTime.Now);
});
var writer1 = Task.Run(() =>
{
using (var bc = new SqlBulkCopy(con))
{
bc.DestinationTableName = "dbo.入院単位";
bc.BulkCopyTimeout = 10000;
bc.WriteToServer(buf1.AsEnumerable().AsDataReader());
}
Console.WriteLine("BulkCopy End 入院単位");
});
var writer2 = Task.Run(() =>
{
using (var bc = new SqlBulkCopy(con))
{
bc.DestinationTableName = "dbo.入院日別";
bc.BulkCopyTimeout = 10000;
bc.WriteToServer(buf2.AsEnumerable().AsDataReader());
}
Console.WriteLine("BulkCopy End 入院日別");
});
Task.WaitAll(reader, writer1, writer2);
Console.WriteLine("出力終了 Time:{0}", DateTime.Now);
Console.WriteLine("入院単位 件数:{0}", buf1.Count);
Console.WriteLine("入院日別 件数:{0}", buf2.Count);
Console.ReadKey();
}
}
}
結果
テーブル | 件数 |
---|---|
入院履歴Base | 12,452 |
入院単位 | 12,452 |
入院日別 | 365,768 |
キャッシュがきいている状態で3秒で完了。