LoginSignup
7

More than 5 years have passed since last update.

SqlBulkCopy を利用した複数テーブル同時出力

Last updated at Posted at 2015-02-17

はじめに

今までの経過からSqlBulkCopyを使い同時に複数のテーブルに出力することができた。
サンプルは下記の通り。

これで、1つのテーブルから分解してデータ分析しやすいようにDWHを構築ができる。
SSISは便利でサンプルコードのようなことが簡単にでき並列で処理できるが、複雑な処理を作成するときに処理が複雑になる。
その点、通常のコンソールプログラムで作れば下記のメリットがある。

  • 共通処理をClass Libraryにできる
  • テストができる
  • バージョン管理できる
  • ログに出力できる

サンプルコード

クラスのDataReaderExtensions とEnumerableDataReaderは「IEnumerable から IDataReader へ変換」を参照のこと。
サンプルは入院日、退院日の情報を基に、日別に展開している。
サンプルは簡素化のため、診療科、病棟など省略しているが、日別に展開することにより、その日にいた病棟、診療科を参照できる。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using System.Reflection;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
using SampleTPL.Properties;

namespace SampleTPL
{
    static class DataReaderExtensions ...

    internal class EnumerableDataReader<TSource> : IDataReader ...

    class Program
    {

        class DataReaderEnumerable<TSource>
        {
            private readonly string _sql;
            private readonly string _connectString;
            private readonly List<PropertyInfo> listProp = new List<PropertyInfo>();

            public DataReaderEnumerable(string sql, string connectString)
            {
                _sql = sql;
                _connectString = connectString;
                foreach (var prop in typeof(TSource).GetProperties())
                {
                    listProp.Add(prop);
                }
            }

            public IEnumerable<TSource> AsEnumerable()
            {
                using (var conn = new SqlConnection(_connectString))
                {
                    conn.Open();
                    var cmd = new SqlCommand(_sql, conn);
                    using (var rd = cmd.ExecuteReader())
                    {
                        while (rd.Read())
                        {
                            TSource cls = Activator.CreateInstance<TSource>();
                            foreach (var fld in listProp)
                                fld.SetValue(cls, rd[fld.Name]);
                            yield return cls;
                        }
                    }
                }
            }
        }

        class OutBuffer<T>
        {
            private BufferBlock<T> buf = new BufferBlock<T>();
            private bool isEnd = false;
            private int cntPost = 0;

            public int Count { get { return cntPost; } }

            public void Post(T val)
            {
                buf.Post(val);
                cntPost++;
                if (buf.Count > 10000)
                {
                    Thread.Sleep(10);
                }
            }

            public void Complite()
            {
                isEnd = true;
            }

            public IEnumerable<T> AsEnumerable()
            {
                var tsLimit = new TimeSpan(0, 0, 0, 0, 500);
                while (!isEnd || buf.Count > 0)
                {
                    T val;
                    try
                    {
                        val = buf.Receive(tsLimit);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("msg:{0} time:{1}", 
                            ex.Message, DateTime.Now);
                        continue;
                    }
                    yield return val;
                }
            }
        }

        class 入院履歴Base
        {
            public string 患者番号 { get; set; }
            public string 入院日 { get; set; }
            public string 退院日 { get; set; }
        }

        class 入院単位
        {
            public int 入院単位ID { get; set; }
            public string 患者番号 { get; set; }
            public string 入院日 { get; set; }
            public string 退院日 { get; set; }
        }

        class 入院日別
        {
            public int 入院日別ID { get; set; }
            public int 入院単位ID { get; set; }
            public string 患者番号 { get; set; }
            public string 日付 { get; set; }
            public byte 新入院件数 { get; set; }
            public byte 延件数 { get; set; }
            public byte 退院件数 { get; set; }
        }

        static void Main(string[] args)
        {
            string con = Settings.Default.SQLConnectionString;

            Console.WriteLine("出力開始 Time:{0}", DateTime.Now);

            var buf1 = new OutBuffer<入院単位>();
            var buf2 = new OutBuffer<入院日別>();

            var reader = Task.Run(() =>
            {
                int id = 1;
                int idDay = 1;
                string sql = "select * from 入院履歴Base";
                var rd = new DataReaderEnumerable<入院履歴Base>(sql, con);
                foreach (var row in rd.AsEnumerable())
                {
                    buf1.Post(new 入院単位 { 
                        入院単位ID = id,
                        患者番号 = row.患者番号,
                        入院日 = row.入院日,
                        退院日 = row.退院日
                    });
                    var start = DateTime.ParseExact(row.入院日, "yyyyMMdd", null);
                    // 退院日が99999999は入院中として、現在の日付をセット
                    var end = row.退院日 == "99999999" ?
                        DateTime.Now.Date :
                        DateTime.ParseExact(row.退院日, "yyyyMMdd", null);
                    for (var cur = start; cur <= end; cur = cur.AddDays(1) )
                    {
                        buf2.Post(new 入院日別
                        {
                            入院日別ID = idDay,
                            入院単位ID = id,
                            患者番号 = row.患者番号,
                            日付 = cur.ToString("yyyyMMdd"),
                            新入院件数 = (byte)(start == cur ? 1 : 0),
                            延件数 = 1,
                            退院件数 = (byte)(end == cur ? 1 : 0),
                        });
                        idDay++;
                    }
                    id++;
                }
                buf1.Complite();
                buf2.Complite();
                Console.WriteLine("読み込み終了 Time:{0}", DateTime.Now);
            });

            var writer1 = Task.Run(() =>
            {
                using (var bc = new SqlBulkCopy(con))
                {
                    bc.DestinationTableName = "dbo.入院単位";
                    bc.BulkCopyTimeout = 10000;
                    bc.WriteToServer(buf1.AsEnumerable().AsDataReader());
                }
                Console.WriteLine("BulkCopy End 入院単位");
            });

            var writer2 = Task.Run(() =>
            {
                using (var bc = new SqlBulkCopy(con))
                {
                    bc.DestinationTableName = "dbo.入院日別";
                    bc.BulkCopyTimeout = 10000;
                    bc.WriteToServer(buf2.AsEnumerable().AsDataReader());
                }
                Console.WriteLine("BulkCopy End 入院日別");
            });

            Task.WaitAll(reader, writer1, writer2);

            Console.WriteLine("出力終了 Time:{0}", DateTime.Now);
            Console.WriteLine("入院単位 件数:{0}", buf1.Count);
            Console.WriteLine("入院日別 件数:{0}", buf2.Count);
            Console.ReadKey();

        }
    }
}

結果

テーブル 件数
入院履歴Base 12,452
入院単位 12,452
入院日別 365,768

キャッシュがきいている状態で3秒で完了。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7