今日は、Azure Data Factory の実行結果のエラーメッセージを取得するためのコードを書いてみたので、共有してみたい。
今回作成したサンプルコード
YOUR_XXX
の部分は、Azure にアクセスするための、Service Principle の値になっている。Azure にアクセスするためのアプリケーションに権限を与えてくれるものだ。Create an Azure service principal with Azure CLI 2.0 がおそらくもっとも簡単な方法だ。コードは長いので、個別に解説していこう。
using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace DataFactory
{
class Program
{
static void Main(string[] args)
{
new Program().DummyAllMessagesAsync();
Console.ReadLine();
}
public async Task DummyAllMessagesAsync()
{
var subscriptionId = "YOUR_SUBSCRIPTION_ID";
var tenantId = "YOUR_TENANT_ID";
var applicationId = "YOUR_CLIENT_ID";
var applicationKey = "YOUR_CLIENT_SECRET";
var tokenCredentails = new TokenCloudCredentials(
subscriptionId,
await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
);
var client = new DataFactoryManagementClient(tokenCredentails);
var dataFactory = "YOUR_DATA_FACTORY_NAME";
var resourceGroup = "YOUR_RESOURCE_GROUP_NAME";
var dataPipeline = "YOUR_PIPELINE_NAME";
var dataset = "YOUR_DATASET_NAME";
// var dataSliceRunListParameters = new DataSliceRunListParameters("2017-08-02T23:00:00.0000000");
var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);
var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);
// DateTime PipelineActivePeriodStartTime = new DateTime(2017, 8, 2, 22, 41, 0, 0, DateTimeKind.Utc);
// DateTime PipelineActivePeriodEndTime = new DateTime(2017, 8, 4, 22, 57, 0, 0, DateTimeKind.Utc);
var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
}
);
var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);
foreach (var startTime in sliceRunStartTimes)
{
var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
DataSliceRunListResponse sliceRuns;
try
{
sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
foreach (var sliceRun in sliceRuns.DataSliceRuns)
{
GetDataSliceRunSummary(sliceRun);
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
private void GetDataSliceRunSummary(DataSliceRun run)
{
var properties = typeof(DataSliceRun).GetProperties(BindingFlags.Instance | BindingFlags.Public)
.Where(p => p.CanRead && p.CanWrite)
.Where(p => p.PropertyType == typeof(string));
foreach (var p in properties)
{
Console.WriteLine(p.GetValue(run));
}
}
private const string ActiveDirectoryEndpoint = "https://login.windows.net/";
private const string WindowsManagementUri = "https://management.core.windows.net/";
public async Task<string> GetAuthorizationHeader(string tenantId, string applicationId, string applicationKey)
{
var context = new AuthenticationContext(ActiveDirectoryEndpoint + tenantId);
var credential = new ClientCredential(
applicationId,
applicationKey);
AuthenticationResult result = null;
try
{
result = await context.AcquireTokenAsync(
resource: WindowsManagementUri,
clientCredential: credential);
} catch (Exception e)
{
Console.WriteLine(e.ToString());
}
if (result != null)
return result.AccessToken;
throw new InvalidOperationException("Failed to acquire token");
}
}
}
コードの解説
必要なもの
お客様からのリクエストで、Azure Data Factory を実行したときのログをとるのだが、ログのエラーメッセージがわからないので、エラーメッセージを取得したいというリクエスト。
Azure Data Factory でのスケジューリングと実行で出てくる概念は次のようなものだ。Linked Service がたとえば、SQL だったり、Blob だったりする。それに対して、そこからデータ抜いてきて、 DataSet という名前で定義する。DataSet には、どれぐらいの頻度でデータを取得するか?という情報が含まれている。そのDataSet から、データを取得して、「Activity」が実行する。Activityには、Winodws という概念があり、特定の時間(Start - End) に、そのActivityが、実行される。Activity が特定の Windowで実行された結果、取得した結果のデータを、DataSlice という。これが基本的な Azure Data Factory の概念だ。
1. クライアントの作成
Azure にアクセスするための、Token を Service Principal と、Subscription Id から作成する。それを渡すと、クライアントのオブジェクトが生成される。内部では、REST API のコールの、ヘッダにToken を渡してコールしているだけだ。詳しくはこちら Tutorial: Use REST API to create an Azure Data Factory pipeline to copy data
ちなみに、GetAuthorizationHeader
はなんでやねん!というぐらいべたべたのコードになっている。これぐらいAPI にラップしてもらいたいものだw
var tokenCredentails = new TokenCloudCredentials(
subscriptionId,
await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
);
var client = new DataFactoryManagementClient(tokenCredentails);
2. パイプライン情報の取得
アクティビティを実行するのを管理しているのが、アクティビティだが、最終的な、ErrorMessage が含まれる、DataSlice をとろうと思うと、現在のパイプラインに設定されている開始時間と、終了時間をとる必要がある。下記のような感じで、取れるのだが、??
を使っている理由を説明する。??
は、デフォルト値を設定できる書き方だが、Start
や、 End
の型が DateTime?
になっている。型の横に?
がついているのは、null を許容するという型になる。ところが、DateTime
は null を許容しないので、null だったらデフォルト値をセットしてあげる必要がある。
var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);
var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);
これで開始時間と、終了時間が取れた!
3. データスライスの取得
次のメソッドで、データスライスの一覧を取得する。なぜ取得するかというと、次に出てくる、データスライスの実行結果を取得するためには、開始時間が必要だからだ。
var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
}
);
だから、開始時間の Enumerable オブジェクトを、Link で取得する。
var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);
4. データスライスランの取得
データスライスの実行結果を取得する。本番ではもちろんこんなエラー処理はしないが、非同期メソッドを await で待っているときは、しっかりエラー処理をしないとエラーが発生した時にわからないっぽい。(これは師匠に聞いてみよう)
foreach (var startTime in sliceRunStartTimes)
{
var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
DataSliceRunListResponse sliceRuns;
try
{
sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
foreach (var sliceRun in sliceRuns.DataSliceRuns)
{
GetDataSliceRunSummary(sliceRun);
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
ちなみに、GetDataSliceRunSummary
は、コードを調べて書くのが面倒だったので、リフレクションでstring の値だけダンプするようなメソッドにしている。
実行結果。
無事ゲットできた。
56a56b39-3d11-47d6-98fe-aa43b1a5d7de_636373098000000000_636373107000000000_InputDataset-7nt
Succeeded
InputDataset-7nt
Validation
93ef854e-5bdd-448d-b14b-3706a2cc5a33_636373107000000000_636373116000000000_InputDataset-7nt
Succeeded
InputDataset-7nt
Validation
Table '[dbo].[emp]' does not exist.
bc5931f7-6206-43f4-a156-3d1c5fbdf533_636373116000000000_636373125000000000_InputDataset-7nt
FailedValidation
InputDataset-7nt
Validation
Table '[dbo].[emp]' does not exist.
5c2df8b0-555d-46f8-b0f6-2b14abc0e9e6_636373125000000000_636373134000000000_InputDataset-7nt
FailedValidation
InputDataset-7nt
Validation
苦労した点
苦労した点といえば、デバッガーを使っても、最初何もメッセージが出なくてなんでだろうな?と思った。
デバッガで、どこまで実行できているかを調べて、とまっている、await の非同期メソッドのところに、try/catch を書いてあげると原因が分かった。ちなみに今回はvar client = new DataFactoryManagementClient(tokenCredentails);
のところで、tokenCredential の後に、"https://management.azure.com/";
を、URLにラップして渡していた。ところが、そのURLが間違っているといわれたので、普通こんなのライブラリに書いてあるだろうとおもって、外したらうまく動いた。
あとは、データスライスとかの概念の理解に時間がかかった。明日からこれでがっつりコーディングができるだろう。ちなみに、REST-API のコールは、リソースグループ毎に次の制限がある。今回のやり方だと、DataSliceRun を数だけコールするので、何回も呼ぶと、すぐに15,000 ぐらいいってしまいそう(お客様のところには、大量の DataFactory があるので)うまく工夫してあまりコールしなくても済むようにしよう。
リソース
- Data Factory scheduling and execution
- Tutorial: Create a pipeline with Copy Activity using .NET API
- what-is-the-purpose-of-a-question-mark-after-a-type-for-example-int-myvariabl
- Azure subscription and service limits, quotas, and constraints
- Tutorial: Use REST API to create an Azure Data Factory pipeline to copy data
- Rerunning many slices and activities in Azure Data Factory