SSISのデータフロータスクでエラー行をリダイレクトしてファイルに出力しようとすると、エラーの情報が貧相でそのままでは意味が解りにくい。
ErrorCode:エラーコードを表す数値。
ErrorColumn:エラー列のIDを表す数値。別名LineageId。入力、出力列の情報のID。フロータスクが複数ある場合、重複することも。
特にErrorColumnを列名に変換SQL Sever 2014以前は力技で変換する必要がある。
以下のサイトを参考に実施した。
http://sorrell.github.io/2015/09/14/Getting-SSIS-LineageIDs.html
私の環境ではフロータスクが複数あったため、辞書を3重の入れ子にしました。
スクリプトタスクを追加
スクリプトタスクを追加して、以下のコードを追記。
* その際、ソリューションエクスプローラーの参照で「Microsoft.SqlServer.DTSPipelineWrap」 を追加してください。
* ユーザー変数 User::execsObj,User::lineageIds をObject型として追加して、スクリプトタスクでReadWriteVariablesで指定する。
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using System.Windows.Forms;
using System.Collections.Generic;
using System.IO;
using System.Linq;
#endregion
namespace ST_9c114207400e475ba28541b989f48749
{
/// <summary>
/// ScriptMain is the entry point class of the script. Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
Dictionary<string, Dictionary<string,Dictionary<int,string>>> lineageIds = new Dictionary<string, Dictionary<string, Dictionary<int, string>>>();
public void Main()
{
// TODO: Add your code here
Dts.Variables["User::execsObj"].Value = ((Package)Dts.Variables["User::execsObj"].Parent).Executables;
Executables execs = (Executables)Dts.Variables["User::execsObj"].Value;
ReadExecutables(execs);
Dts.Variables["User::lineageIds"].Value = lineageIds;
WriteErrorCodeToFile(lineageIds);
Dts.TaskResult = (int)ScriptResults.Success;
}
private void ReadExecutables(Executables executables)
{
foreach (Executable pkgExecutable in executables)
{
if (object.ReferenceEquals(pkgExecutable.GetType(), typeof(Microsoft.SqlServer.Dts.Runtime.TaskHost)))
{
TaskHost pkgExecTaskHost = (TaskHost)pkgExecutable;
if (pkgExecTaskHost.CreationName.StartsWith("SSIS.Pipeline"))
{
ProcessDataFlowTask(pkgExecTaskHost);
}
}
else if (object.ReferenceEquals(pkgExecutable.GetType(), typeof(Microsoft.SqlServer.Dts.Runtime.ForEachLoop)))
{
// Recurse into FELCs
ReadExecutables(((ForEachLoop)pkgExecutable).Executables);
}
}
}
private void ProcessDataFlowTask(TaskHost currentDataFlowTask)
{
Dictionary<string, Dictionary<int, string>> componentAndColumnDic = new Dictionary<string,Dictionary<int,string>>();
MainPipe currentDataFlow = (MainPipe)currentDataFlowTask.InnerObject;
foreach (IDTSComponentMetaData100 currentComponent in currentDataFlow.ComponentMetaDataCollection)
{
Dictionary<int, string> columnIdAndcolumnNameDic = new Dictionary<int, string>();
// Get the inputs in the component.
if (currentComponent.InputCollection.Count > 0)
{
foreach (IDTSInput100 currentInput in currentComponent.InputCollection)
{
foreach (IDTSInputColumn100 currentInputColumn in currentInput.InputColumnCollection)
{
columnIdAndcolumnNameDic.Add(currentInputColumn.ID, currentInputColumn.Name);
}
}
}
// Get the outputss in the component.
if (currentComponent.OutputCollection.Count > 0)
{
foreach (IDTSOutput100 currentOutput in currentComponent.OutputCollection)
{
foreach (IDTSOutputColumn100 currentOutputColumn in currentOutput.OutputColumnCollection)
{
columnIdAndcolumnNameDic.Add(currentOutputColumn.ID, currentOutputColumn.Name);
}
}
}
componentAndColumnDic.Add(currentComponent.Name, columnIdAndcolumnNameDic);
}
lineageIds.Add(currentDataFlowTask.Name, componentAndColumnDic);
}
/// <summary>
/// エラーIDとエラー情報をファイルに書き出す。
/// </summary>
/// <param name="lineageIds"></param>
private void WriteErrorCodeToFile(Dictionary<string, Dictionary<string, Dictionary<int, string>>> lineageIds) {
// Just proof of concept to see the results before you dedicate your time to the solution
// Delete this code in your actual implementation
using (StreamWriter writetext = new StreamWriter(@"D:\temp\write.txt", true))
{
foreach (KeyValuePair<string, Dictionary<string, Dictionary<int, string>>> flowPair in lineageIds)
{
writetext.WriteLine("-------flow:" + flowPair.Key + "-------");
foreach (KeyValuePair<string, Dictionary<int, string>> componentPair in flowPair.Value)
{
writetext.WriteLine("-------component:" + componentPair.Key + "-------");
foreach (KeyValuePair<int, string> columnPair in componentPair.Value)
{
writetext.WriteLine("columnid:" + columnPair.Key + ",columnname" + columnPair.Value);
}
writetext.WriteLine();
}
}
}
}
フロータスクのエラー行をリダイレクトしたフローで使う。
あとはフロータスクで、エラー行をリダイレクトしたフローの中で、スクリプトコンポ―ネントを追加して、スクリプト内で辞書を参照して、出力に列を追加します。
- スクリプトとコンポーネントを変換として追加
- ReadOnlyVariablesにSystem::TaskName,User::lineageIdsの2つを追加
- 出力列にErrorColumnNameとErrorDescriptionを追加
public override void 入力0_ProcessInputRow(入力0Buffer Row)
{
/*
* Add your code here
*/
//User::lineageIdsに登録した列ID、列名の辞書を参照して、列名を取得します。
//User::lineageIdsは入れ子の辞書で、タスク名、コンポート名を指定することで、列情報の辞書を取得できます。
Dictionary<string, Dictionary<string, Dictionary<int, string>>> lineageIds = null;
if (Variables.lineageIds is Dictionary<string,Dictionary<string,Dictionary<int, string>>>)
{
lineageIds = (Dictionary<string, Dictionary<string, Dictionary<int, string>>>)Variables.lineageIds;
}
//現在のタスクの、現在のコンポーネントの名前から列IDを取得
//コンポート名はハードコーティングしています。#本当はしたくない。
Dictionary<int, string> currentLineageIds = lineageIds[Variables.TaskName]["データ変換"];
if (currentLineageIds == null) {
bool cancel = false;
ComponentMetaData.FireError(10, this.ComponentMetaData.Name, "エラーコードの辞書変数 User::linageIdsが空です。", "", 0, out cancel);
}
int? colNum = Row.ErrorColumn;
if (colNum.HasValue && (currentLineageIds != null)&¤tLineageIds.ContainsKey(colNum.Value))
{
Row.ErrorColumnName = currentLineageIds[colNum.Value];
}else{
Row.ErrorColumnName = "Row error";
}
Row.ErrorDescription = this.ComponentMetaData.GetErrorDescription(Row.ErrorCode).TrimEnd();
}
あとは取得したエラー情報をフラットファイル等に吐き出すと、わかりやすいエラー情報を取得することができます。
スクリプト コンポーネント内で、自分の1つ前のコンポーネント名を取得する方法がわかる方いらっしゃったら教えてください。