我ながらアレなタイトルだな...
変更履歴
追記23-05-16に、@albireo さんから頂いたコメントに対応する記述を追加しました!
本記事の趣旨
そこそこ量のList化されたデータをSQLServerにInsertするに当たり、
- SqlBulkCopyの高速性の恩恵に預かりながら、
- データを加工したり・しなかったりしつつ、
- お手軽に使えるように!
...を実現しようというお話です。
お手軽度合いはSqlConnectionの拡張メソッドに、
- 対象テーブル名
- List化されたデータ
- 対象テーブルの列数
- 列に対するマッピングのデリゲート処理
...を渡すと実行してくれるという感じです。
そもそもSqlBulkCopyとは?
C#とSQLServerの組み合わせでデータのInsertをメチャ早(※)でやってくれます。
トランザクションも問題なく機能します。
※私環境でテストテーブル(数値キー・文字列の2項目)に100万レコード追加(※※)が約3秒...
※※これは1~100万を返すイテレーターを使った適当Readerによる結果です。本ソースの性能は後述します。
先に参考情報
- じんぐるさんのブログ - SqlBulkCopy + IDataReader を利用した IEnumerable の高効率なバルク挿入
- noxiさんのブログ - .NET CoreとSqlBulkCopy
※お二人のブログでSqlBulkCopyの詳細は説明され尽くしています。
本記事はデータ加工が主な観点となります。
いきなりコード提示
中身の詳細はどうでも良いから動けばOKな方は、下記を気を付けて(もちろん自己責任で!)お使い下さい。
※デリゲート以外はほぼまんま、じんぐるさんコードです... m(__)m
/// <summary>
/// BulkInsert時のデリゲート定義
/// </summary>
/// <typeparam name="T">コレクションの対象データ型</typeparam>
/// <param name="currentItem">コレクションの対象データ</param>
/// <param name="fieldNo">書込みテーブル列番号(0オリジン)</param>
/// <returns>最終的な書込みデータ</returns>
public delegate object SqlBulkInsertDelegate<T>(T currentItem, int fieldNo);
/// <summary>
/// BulkInsert用のDataReaderクラス
/// </summary>
/// <typeparam name="T">対象データ型</typeparam>
public class SqlBulkInsertReader<T> : IDataReader
{
/// <summary>
/// 型情報の内部キャッシュ用クラス
/// </summary>
/// <typeparam name="U"></typeparam>
private static class PropertyInfoCache<U>
{
public static PropertyInfo[] Instances { get; }
static PropertyInfoCache()
=> Instances = typeof(U).GetProperties(BindingFlags.Instance | BindingFlags.Public);
}
private IEnumerator<T> enumerator;
private SqlBulkInsertDelegate<T> insertDelegate;
private int delegateFieldCount;
/// <summary>
/// コンストラクタ(データ対テーブルが1対1用)
/// </summary>
/// <param name="enumerator">書込みデータEnumerator</param>
public SqlBulkInsertReader(IEnumerator<T> enumerator) : this(enumerator, 0, null)
{
}
/// <summary>
/// コンストラクタ(Insertデリゲート用)
/// </summary>
/// <param name="enumerator">書込みデータEnumerator</param>
/// <param name="fieldCount">書込みテーブル列数</param>
/// <param name="insertDelegate">Insert時デリゲート(現在データ,書込み列番号(0オリジン))</param>
public SqlBulkInsertReader(IEnumerator<T> enumerator, int fieldCount, SqlBulkInsertDelegate<T> insertDelegate)
{
this.enumerator = enumerator;
this.insertDelegate = insertDelegate;
delegateFieldCount = fieldCount;
}
public void Dispose() => this.enumerator.Dispose();
/// <summary>
/// 書込みテーブル列数
/// </summary>
/// <remarks>
/// デリゲートを指定しない場合、データのPublicインスタンスの定義数なのでデータ定義とテーブル列数が1対1な必要あり。
/// </remarks>
public int FieldCount => insertDelegate != null ? delegateFieldCount : PropertyInfoCache<T>.Instances.Length;
/// <summary>
/// 書込みデータを取得します。
/// </summary>
/// <param name="fieldNo">書込みテーブル列番号(0オリジン)</param>
/// <returns>書込みデータ</returns>
/// <remarks>
/// SqlBulkCopyが書き込むデータを読み込む際に取得するデータ、
/// つまり我々開発者視点では"書込みデータ"。
///
/// 1データ毎にFieldCount回、呼ばれる。
///
/// デリゲート指定では現在データと書込みテーブル列番号をデリゲートに渡して任せる、
/// 未指定の場合は現在データと書込みテーブル列番号が1:1の想定。
/// </remarks>
public object GetValue(int fieldNo)
{
if (insertDelegate != null)
{
//delegate指定あり
return insertDelegate(this.enumerator.Current, fieldNo);
}
else
{
//1対1割当
var prop = PropertyInfoCache<T>.Instances[fieldNo];
return prop.GetValue(this.enumerator.Current);
}
}
//NotImplementedExceptionを投げる部分は長いので↓に折り畳んで入れました。
}
NotImplementedException投げる部分
public object this[int i] => throw new NotImplementedException();
public object this[string name] => throw new NotImplementedException();
public int Depth => throw new NotImplementedException();
public bool IsClosed => throw new NotImplementedException();
public int RecordsAffected => throw new NotImplementedException();
public void Close()
{
throw new NotImplementedException();
}
public bool GetBoolean(int i)
{
throw new NotImplementedException();
}
public byte GetByte(int i)
{
throw new NotImplementedException();
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public char GetChar(int i)
{
throw new NotImplementedException();
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public IDataReader GetData(int i)
{
throw new NotImplementedException();
}
public string GetDataTypeName(int i)
{
throw new NotImplementedException();
}
public DateTime GetDateTime(int i)
{
throw new NotImplementedException();
}
public decimal GetDecimal(int i)
{
throw new NotImplementedException();
}
public double GetDouble(int i)
{
throw new NotImplementedException();
}
public Type GetFieldType(int i)
{
throw new NotImplementedException();
}
public float GetFloat(int i)
{
throw new NotImplementedException();
}
public Guid GetGuid(int i)
{
throw new NotImplementedException();
}
public short GetInt16(int i)
{
throw new NotImplementedException();
}
public int GetInt32(int i)
{
throw new NotImplementedException();
}
public long GetInt64(int i)
{
throw new NotImplementedException();
}
public string GetName(int i)
{
throw new NotImplementedException();
}
public int GetOrdinal(string name)
{
//Console.WriteLine($"GetOrdinal: {name}");
throw new NotImplementedException();
}
public DataTable GetSchemaTable()
{
throw new NotImplementedException();
}
public string GetString(int i)
{
throw new NotImplementedException();
}
public int GetValues(object[] values)
{
throw new NotImplementedException();
}
public bool IsDBNull(int i)
{
throw new NotImplementedException();
}
public bool NextResult()
{
throw new NotImplementedException();
}
※SqlConnectionExtensionsは、各々適当に自社に合うように直して下さい...
public static class SqlConnectionExtensions
{
/// <summary>
/// BulkInsertAsync(デリゲート)
/// </summary>
/// <typeparam name="T">データ型</typeparam>
/// <param name="connection">コネクション</param>
/// <param name="targetTableName">対象テーブル名</param>
/// <param name="data">書き込みデータ</param>
/// <param name="fieldCount">書き込みテーブルの列数</param>
/// <param name="insertDelegate">insert時のデリゲート処理</param>
/// <param name="tr">トランザクション</param>
/// <param name="options">SqlBulkCopyのオプション</param>
/// <param name="timeout">タイムアウト指定</param>
/// <param name="cancellationToken">キャンセルトークン</param>
/// <returns>データ件数</returns>
public static async ValueTask<int> BulkInsertAsync<T>(this SqlConnection connection,
string targetTableName, IEnumerable<T> data, int fieldCount, SqlBulkInsertDelegate<T> insertDelegate,
SqlTransaction tr = null, SqlBulkCopyOptions options = default, int? timeout = null,
CancellationToken cancellationToken = default)
{
using (var executor = new SqlBulkCopy(connection, options, tr))
{
executor.DestinationTableName = targetTableName;
// タイムアウトを指定できるようにしておくと優しそう
executor.BulkCopyTimeout = timeout ?? executor.BulkCopyTimeout;
// データを流し込む
using (var reader = new SqlBulkInsertReader<T>(data.GetEnumerator(), fieldCount, insertDelegate))
await executor.WriteToServerAsync(reader, cancellationToken);
// 影響した行数 (= 流し込んだ件数) を返すのが一般的
return data.Count();
//return executor.RowsCopied; //私環境ではRowsCopiedない...
}
}
/// <summary>
/// BulkInsertAsync(1対1)
/// </summary>
/// <typeparam name="T">データ型</typeparam>
/// <param name="connection">コネクション</param>
/// <param name="targetTableName">対象テーブル名</param>
/// <param name="data">書き込みデータ</param>
/// <param name="tr">トランザクション</param>
/// <param name="options">SqlBulkCopyのオプション</param>
/// <param name="timeout">タイムアウト指定</param>
/// <param name="cancellationToken">キャンセルトークン</param>
/// <returns>データ件数</returns>
public static ValueTask<int> BulkInsertAsync<T>(this SqlConnection connection,
string targetTableName, IEnumerable<T> data, SqlTransaction tr = null, SqlBulkCopyOptions options = default,
int? timeout = null, CancellationToken cancellationToken = default)
{
return BulkInsertAsync(connection, targetTableName, data, 0, null, tr, options, timeout, cancellationToken);
}
}
使い方は、生成したコネクション(conn)の拡張したBulkInsertAsyncに
- 対象テーブル名
- データリスト
- 対象テーブルの列数(※)
- Insertデリゲート処理(※)
...を渡します。
またオプション指定でトランザクション、SqlBulkCopyOptions、タイムアウト時間、CancellationTokenも渡せます。
※対象テーブルとデータが1対1なら省略可能な拡張あります。その場合はまんま、じんぐるさんソースなので、ここでの説明は行いません。
conn.BulkInsertAsync("TEST", data, 3, (currentData, fieldNo) =>
{
switch (fieldNo)
{
case 0: return currentData.FieldInt;
case 1: return currentData.FieldStr + "加工!";
case 2: return currentData.FieldDateTime;
}
throw new Exception($"fieldNo定義漏れ: {fieldNo}");
});
これでInsertデリゲート処理でcurrentDataやswitchのケース毎に、好きな内容を書き込む事が出来ます。
気を付ける点は、
-
対象テーブルの列数を正しく記述する
間違えると正しくテーブルに書かれなかったり例外が発生したりします。
-
デリゲート処理に時間の掛かる処理は書かない
いくらSqlBulkCopyが高速でもデリゲート処理に時間が掛かれば当然その分、足を引っ張られます。
(呼び出し回数 = レコード数 x 列数)
即値等はデリゲート前に準備しておきます。 -
書き込む列に合った値を返す
DateTime型ならDateTime、Bit型ならbool等、適切な値を返します。
(NULLの場合はDBNull.Value)
※私もデータ型の全種類を試した訳ではないです...
...といった所でしょうか。
動けばOK勢の皆様はここまでで充分かと思います。お疲れ様でした!
以下は詳細です。
作成動機 - SqlBulkCopyは扱い難い
詳細は、先述の参考情報で説明し尽くされているのでご覧頂くとして、とにかく扱いが面倒です。
また実際の業務では、データを加工しながら登録したい事もあろうかと思います。
例えば、
-
データとテーブルが1対1じゃない
データ側に過剰な情報を保持していたり、不足していたりする場合があろうかと思います。
この場合は、SqlBulkCopy標準の機能でSqlBulkCopyColumnMappingCollectionにソース側とテーブル側の列番号や列名でマッピングを指定する事は可能です。
...が、↑の拡張に別途マッピングコレクションのオプションを生やして渡すのも何だかだし、デリゲート処理の方が同じ事が可能&加工の自由度も高いしという事で、見送りました。
-
テーブル書き込み前に加工したい
仮に1対1だったとしても、やっぱり加工したい事があろうかと思います。
自分の場合は、取得済みの既存データリストの反対伝票を作成するイメージで数値をマイナス化&更新日・更新者を差し替えながら登録したいというケースがありました。
...トニカクカワイイとにかくデータを弄りたかったと言ってるだけですね(^^;
色々悩んだ(SqlBulkCopyColumnMappingCollection → 即値も指定できる自作マッピングコレクション(没)と試行錯誤の)末に、Insertデリゲート処理が一番、見た目も使い勝手も良さげという結果に落ち着きました。
そして気になる性能は?
手元環境で11列・26,000件強のデータで試した所、
- 1件毎Insert:約20秒
- BulkInsert :約0.3秒
...と、やはり圧倒的な速度!!
前述通り、デリゲート処理を可能な限り軽くするのがポイントです。
最後にやっと解説
ソースのコメントに、まぁまぁガッツリ書いてあるのでポイントだけ。
SqlBulkInsertReader.FieldCountの結果が、SqlBulkCopyからデータを吸い上げられる際に1データ毎にSqlBulkInsertReader.GetValueが呼ばれる回数になります。
仮にデータ5件・列数3の場合、GetValueが合計15回呼ばれます(=デリゲート処理も15回)。
1件目:fieldNo 0~2
2件目:fieldNo 0~2
...
5件目:fieldNo 0~2
SqlBulkInsertReader.GetValueの結果(object)が、最終的にテーブルに書かれる内容になります。
なのでデリゲート処理に、データ・fieldNoに応じた処理を書いてあげれば好きな内容を書き放題という訳です!
ここで純粋なIDataReaderとしては書き込み先テーブルの列数は不明(どこにもそんな情報持ってない)、だったら開発者は列数を把握しているんだから自作のSqlBulkInsertReaderにはコンストラクタで列数・デリゲートをセットで渡して上げれば良いと割り切りました。
仮に、渡す列数を間違えると何が起こるのでしょうか?
-
少なく指定した場合(上例で3の所を2)
case 0,1 だけがやってきて、case 2が通る事はありません。
テーブルも3列目への書き込みがスキップされます。 -
多く指定した場合(上例で3の所を4)
System.InvalidOperationException: 指定された ColumnMapping はソースまたはターゲットのいずれの列とも一致しません。
...の例外が発生します。
なので列数、デリゲート処理には充分にご注意下さい。
またよ~~~くテストしてから実戦投入して下さいね!
public class SqlBulkInsertReader<T> : IDataReader
{
private SqlBulkInsertDelegate<T> insertDelegate;
private int delegateFieldCount;
/// <summary>
/// コンストラクタ(Insertデリゲート用)
/// </summary>
/// <param name="enumerator">書込みデータEnumerator</param>
/// <param name="fieldCount">書込みテーブル列数</param>
/// <param name="insertDelegate">Insert時デリゲート(現在データ,書込み列番号(0オリジン))</param>
public SqlBulkInsertReader(IEnumerator<T> enumerator, int fieldCount, SqlBulkInsertDelegate<T> insertDelegate)
{
this.enumerator = enumerator;
this.insertDelegate = insertDelegate;
delegateFieldCount = fieldCount;
}
/// <summary>
/// 書込みテーブル列数
/// </summary>
/// <remarks>
/// デリゲートを指定しない場合、データのPublicインスタンスの定義数なのでデータ定義とテーブル列数が1対1な必要あり。
/// </remarks>
public int FieldCount => insertDelegate != null ? delegateFieldCount : PropertyInfoCache<T>.Instances.Length;
/// <summary>
/// 書込みデータを取得します。
/// </summary>
/// <param name="fieldNo">書込みテーブル列番号(0オリジン)</param>
/// <returns>書込みデータ</returns>
/// <remarks>
/// SqlBulkCopyが書き込むデータを読み込む際に取得するデータ、
/// つまり我々開発者視点では"書込みデータ"。
///
/// 1データ毎にFieldCount回、呼ばれる。
///
/// デリゲート指定では現在データと書込みテーブル列番号をデリゲートに渡して任せる、
/// 未指定の場合は現在データと書込みテーブル列番号が1:1の想定。
/// </remarks>
public object GetValue(int fieldNo)
{
if (insertDelegate != null)
{
//delegate指定あり
return insertDelegate(this.enumerator.Current, fieldNo);
}
else
{
//1対1割当
var prop = PropertyInfoCache<T>.Instances[fieldNo];
return prop.GetValue(this.enumerator.Current);
}
}
以上、久しぶりのC#記事でした!
追記23-05-16
@albireo さんから、テーブル列情報のディクショナリ(以下、列情報DIC)を利用するご提案を頂きました!
本記事ソースをベースに手を入れて行きたいと思います!
先ずはSqlConnectionExtensionsクラスに、列情報DICを取得する拡張メソッドとBulkInsertAsync(列情報DIC版)の拡張メソッドを生やします。
public static class SqlConnectionExtensions
{
/// <summary>
/// テーブルの列情報DICを取得
/// </summary>
/// <param name="connection">コネクション</param>
/// <param name="tableName">テーブル名</param>
/// <returns>列情報DIC(列順序,(列名,型))</returns>
public static Dictionary<int, (string, Type)> GetTableColumnInfo(this SqlConnection connection, string tableName)
{
var command = new SqlCommand($"SELECT * FROM {tableName} WHERE 1 = 0", connection);
using (var reader = command.ExecuteReader())
{
return reader.GetColumnSchema().ToDictionary(
col => col.ColumnOrdinal ?? 0,
col => (col.ColumnName, col.DataType!));
}
}
}
/// <summary>
/// BulkInsertAsync(列情報DIC,デリゲート(オプション))
/// </summary>
/// <typeparam name="T">データ型</typeparam>
/// <param name="connection">コネクション</param>
/// <param name="targetTableName">対象テーブル名</param>
/// <param name="data">書き込みデータ</param>
/// <param name="fieldDic">列情報DIC</param>
/// <param name="insertDelegate">insert時のデリゲート処理(オプション)</param>
/// <param name="tr">トランザクション</param>
/// <param name="options">SqlBulkCopyのオプション</param>
/// <param name="timeout">タイムアウト指定</param>
/// <param name="cancellationToken">キャンセルトークン</param>
/// <returns>データ件数</returns>
public static async ValueTask<int> BulkInsertAsync<T>(this SqlConnection connection,
string targetTableName, IEnumerable<T> data, Dictionary<int, (string, Type)> fieldDic,
SqlBulkInsertDelegate<T> insertDelegate = null,
SqlTransaction tr = null, SqlBulkCopyOptions options = default, int? timeout = null,
CancellationToken cancellationToken = default)
{
using (var executor = new SqlBulkCopy(connection, options, tr))
{
executor.DestinationTableName = targetTableName;
// タイムアウトを指定できるようにしておくと優しそう
executor.BulkCopyTimeout = timeout ?? executor.BulkCopyTimeout;
// データを流し込む
using (var reader = new SqlBulkInsertReader<T>(data.GetEnumerator(), fieldDic, insertDelegate))
await executor.WriteToServerAsync(reader, cancellationToken);
// 影響した行数 (= 流し込んだ件数) を返すのが一般的
return data.Count();
}
}
これでお手軽に列情報DICが取得できるようになりました。
因みに手元環境では10msちょい~20ms弱で取得できました。
次にSqlBulkInsertReaderクラスを列情報DIC対応にします。
今回はGetValueで、
- デリゲート処理を(存在すれば)実行
処理なし or 処理結果NULLの場合、
- 列処理DICが存在する場合は列名で、なければ1対1モードで値を取得
...という流れにします。
記事本文でデリゲート処理について(テーブルへの書き込み値が)NULLの場合はDBNull.Valueを返そうと言ってますが、普通のNULLを返しても問題ありませんでした。
...が、今回はデリゲート処理の結果のDBNull.ValueとNULLを分けた方が都合が良いので、引き続き明示的にNULLとする場合はDBNull.Valueを利用します。
そして列処理DICを受け取るコンストラクタを生やします。
→このパターンの場合、デリゲート処理はあってもなくても良いのでオプション化。
元ソースに手を入れた所だけ記述します。
public class SqlBulkInsertReader<T> : IDataReader
{
//private static class PropertyInfoCache<U>の下に追加
//PropertyInfoCacheを辞書化
private Dictionary<string, PropertyInfo> propertyDic;
//コンストラクタの列情報DICをセットする
private Dictionary<int, (string, Type)> fieldDic;
/// <summary>
/// コンストラクタ(列情報DIC,Insertデリゲート(オプション)用)
/// </summary>
public SqlBulkInsertReader(IEnumerator<T> enumerator, Dictionary<int, (string, Type)> fieldDic,
SqlBulkInsertDelegate<T> insertDelegate = null) : this(enumerator)
{
//enumerator = Enumerator;
this.insertDelegate = insertDelegate;
this.fieldDic = fieldDic;
//PropertyInfoCacheを辞書化
//(辞書を利用するのは列情報DICが存在する時だけ)
propertyDic = PropertyInfoCache<T>.Instances.ToDictionary(prop => prop.Name.ToUpper(), prop => prop);
}
/// <summary>
/// 書込みテーブル列数
/// </summary>
/// <remarks>
/// 列情報辞書もデリゲートも指定しない場合、データのPublicインスタンスの定義数なのでデータ定義とテーブル列数が1対1な必要あり。
/// </remarks>
public int FieldCount => fieldDic != null ? fieldDic.Count
: insertDelegate != null ? delegateFieldCount
: PropertyInfoCache<T>.Instances.Length;
//流れを変更
public object GetValue(int fieldNo)
{
object r = null;
if (insertDelegate != null)
{
//デリゲート処理あり、値の取得を試みる
r = insertDelegate(this.enumerator.Current, fieldNo);
}
if (r == null)
{
//値がNULL(デリゲートなしor取得しなかった想定)
if (fieldDic != null)
{
//列情報DICあり、列名で取得を試みる
var prop = propertyDic[fieldDic[fieldNo].Item1.ToUpper()];
//当初ここでPropertyInfoCache<T>.Instances配列からwhereで
//取得していたが、まぁまぁ遅かったので素直に辞書化...
//prop=NULLの場合、変換失敗の例外投げても良いかも。
//今回はNULLのまま突き進みます。
r = prop?.GetValue(this.enumerator.Current);
}
else
{
//1対1割当
var prop = PropertyInfoCache<T>.Instances[fieldNo];
return prop.GetValue(this.enumerator.Current);
}
}
//r=NULLの場合、ここで例外投げても良いかも。
return r;
}
}
利用コード例です。
記事本文ではデリゲート処理に全フィールドを定義する必要がありましたが、テーブル列とデータのフィールドが同名なら列情報DICが使えるので加工したい列だけを記述して、他はNULLを返すようにします。
(加工不要ならデリゲート処理も不要)
var fieldDic = conn.GetTableColumnInfo("TEST");
await conn.BulkInsertAsync("TEST", data, fieldDic, (currentData, fieldNo) =>
{
switch (fieldNo)
{
case 1: return currentData.FieldStr + "加工!";
}
return null;
});
記事本文と同じ環境(11列・26,000件強のデータ)でデリゲート処理なしで試した所、
- 列情報DIC版 :約0.45秒
...と、全てデリゲート側で処理するよりは若干遅くなりますが、それでも充分に高速かと思います!
テーブル列とデータフィールド名が基本一致している場合、テーブル構造変化時に加工不要な場合はメンテナンスフリーというのは実に大きなメリットですね!
※ソースのコメントで書いたプロパティ配列からwhere抽出だと約0.9秒...
今回は元記事のままデリゲート処理の構造は弄らない形としましたが、必要に応じて列情報DICの中身を渡しても良いだろうし、列情報とデータ構造のミスマッチの扱いも色々なパターンが考えられると思います。
#例えば列数を渡すオプションを廃止して、常に列情報DICを渡すという判断もアリだと思います。生成に20ms弱ですからね!
デリゲート処理に時間の掛かる処理は書かない事だけ気を付けつつ、ご自身に合ったバラ色の高速Insert生活を楽しんで頂ければと思います♪
@albireo さん、誠にありがとうございました m(__)m