C#
AzureDataFactory

Azure Data Factory の実行結果のエラーメッセージを取得する

More than 1 year has passed since last update.

今日は、Azure Data Factory の実行結果のエラーメッセージを取得するためのコードを書いてみたので、共有してみたい。

今回作成したサンプルコード

YOUR_XXX の部分は、Azure にアクセスするための、Service Principle の値になっている。Azure にアクセスするためのアプリケーションに権限を与えてくれるものだ。Create an Azure service principal with Azure CLI 2.0 がおそらくもっとも簡単な方法だ。コードは長いので、個別に解説していこう。

using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace DataFactory
{
    class Program
    {


        static void Main(string[] args)
        {
            new Program().DummyAllMessagesAsync();
            Console.ReadLine();

        }

        public async Task DummyAllMessagesAsync()

        {
            var subscriptionId = "YOUR_SUBSCRIPTION_ID";
            var tenantId = "YOUR_TENANT_ID";
            var applicationId = "YOUR_CLIENT_ID";
            var applicationKey = "YOUR_CLIENT_SECRET";

            var tokenCredentails = new TokenCloudCredentials(
                subscriptionId,
                await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
                );

            var client = new DataFactoryManagementClient(tokenCredentails);
            var dataFactory = "YOUR_DATA_FACTORY_NAME";
            var resourceGroup = "YOUR_RESOURCE_GROUP_NAME";
            var dataPipeline = "YOUR_PIPELINE_NAME";
            var dataset = "YOUR_DATASET_NAME"; 
            // var dataSliceRunListParameters = new DataSliceRunListParameters("2017-08-02T23:00:00.0000000");



            var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);

            var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
            var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);

            // DateTime PipelineActivePeriodStartTime = new DateTime(2017, 8, 2, 22, 41, 0, 0, DateTimeKind.Utc);
            // DateTime PipelineActivePeriodEndTime = new DateTime(2017, 8, 4, 22, 57, 0, 0, DateTimeKind.Utc);


            var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
                DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
                DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
            }
                );

            var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);

            foreach (var startTime in sliceRunStartTimes)
            {
                var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
                DataSliceRunListResponse sliceRuns;
                try
                {
                    sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
                    foreach (var sliceRun in sliceRuns.DataSliceRuns)
                    {

                        GetDataSliceRunSummary(sliceRun);
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
            }


        }
        private void GetDataSliceRunSummary(DataSliceRun run)
        {

            var properties = typeof(DataSliceRun).GetProperties(BindingFlags.Instance | BindingFlags.Public)
                .Where(p => p.CanRead && p.CanWrite)
                .Where(p => p.PropertyType == typeof(string));
            foreach (var p in properties)
            {
                Console.WriteLine(p.GetValue(run));
            }
        }

        private const string ActiveDirectoryEndpoint = "https://login.windows.net/";
        private const string WindowsManagementUri = "https://management.core.windows.net/";

        public async Task<string> GetAuthorizationHeader(string tenantId, string applicationId, string applicationKey)
        {



            var context = new AuthenticationContext(ActiveDirectoryEndpoint + tenantId);
            var credential = new ClientCredential(
                applicationId,
                applicationKey);
            AuthenticationResult result = null;
            try
            {

                result = await context.AcquireTokenAsync(
                  resource: WindowsManagementUri,
                  clientCredential: credential);
            } catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }


            if (result != null)
                return result.AccessToken;

            throw new InvalidOperationException("Failed to acquire token");
        }
    }


}

コードの解説

必要なもの

お客様からのリクエストで、Azure Data Factory を実行したときのログをとるのだが、ログのエラーメッセージがわからないので、エラーメッセージを取得したいというリクエスト。

Azure Data Factory でのスケジューリングと実行で出てくる概念は次のようなものだ。Linked Service がたとえば、SQL だったり、Blob だったりする。それに対して、そこからデータ抜いてきて、 DataSet という名前で定義する。DataSet には、どれぐらいの頻度でデータを取得するか?という情報が含まれている。そのDataSet から、データを取得して、「Activity」が実行する。Activityには、Winodws という概念があり、特定の時間(Start - End) に、そのActivityが、実行される。Activity が特定の Windowで実行された結果、取得した結果のデータを、DataSlice という。これが基本的な Azure Data Factory の概念だ。

availability-scheduler.png

1. クライアントの作成

Azure にアクセスするための、Token を Service Principal と、Subscription Id から作成する。それを渡すと、クライアントのオブジェクトが生成される。内部では、REST API のコールの、ヘッダにToken を渡してコールしているだけだ。詳しくはこちら Tutorial: Use REST API to create an Azure Data Factory pipeline to copy data

ちなみに、GetAuthorizationHeader はなんでやねん!というぐらいべたべたのコードになっている。これぐらいAPI にラップしてもらいたいものだw

var tokenCredentails = new TokenCloudCredentials(
    subscriptionId,
    await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
);

var client = new DataFactoryManagementClient(tokenCredentails);

2. パイプライン情報の取得

アクティビティを実行するのを管理しているのが、アクティビティだが、最終的な、ErrorMessage が含まれる、DataSlice をとろうと思うと、現在のパイプラインに設定されている開始時間と、終了時間をとる必要がある。下記のような感じで、取れるのだが、?? を使っている理由を説明する。?? は、デフォルト値を設定できる書き方だが、Start や、 End の型が DateTime? になっている。型の横に?がついているのは、null を許容するという型になる。ところが、DateTime は null を許容しないので、null だったらデフォルト値をセットしてあげる必要がある。

var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);

var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);

これで開始時間と、終了時間が取れた!

3. データスライスの取得

次のメソッドで、データスライスの一覧を取得する。なぜ取得するかというと、次に出てくる、データスライスの実行結果を取得するためには、開始時間が必要だからだ。

var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
    DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
    DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
    }
);

だから、開始時間の Enumerable オブジェクトを、Link で取得する。

var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);

4. データスライスランの取得

データスライスの実行結果を取得する。本番ではもちろんこんなエラー処理はしないが、非同期メソッドを await で待っているときは、しっかりエラー処理をしないとエラーが発生した時にわからないっぽい。(これは師匠に聞いてみよう)

foreach (var startTime in sliceRunStartTimes)
{
     var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
     DataSliceRunListResponse sliceRuns;
     try
     {
          sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
          foreach (var sliceRun in sliceRuns.DataSliceRuns)
          {

               GetDataSliceRunSummary(sliceRun);
           }
      }
      catch (Exception e)
      {
           Console.WriteLine(e.ToString());
      }
}

ちなみに、GetDataSliceRunSummary は、コードを調べて書くのが面倒だったので、リフレクションでstring の値だけダンプするようなメソッドにしている。

実行結果。

無事ゲットできた。

56a56b39-3d11-47d6-98fe-aa43b1a5d7de_636373098000000000_636373107000000000_InputDataset-7nt


Succeeded
InputDataset-7nt
Validation



93ef854e-5bdd-448d-b14b-3706a2cc5a33_636373107000000000_636373116000000000_InputDataset-7nt


Succeeded
InputDataset-7nt
Validation


Table '[dbo].[emp]' does not exist.
bc5931f7-6206-43f4-a156-3d1c5fbdf533_636373116000000000_636373125000000000_InputDataset-7nt


FailedValidation
InputDataset-7nt
Validation


Table '[dbo].[emp]' does not exist.
5c2df8b0-555d-46f8-b0f6-2b14abc0e9e6_636373125000000000_636373134000000000_InputDataset-7nt


FailedValidation
InputDataset-7nt
Validation

苦労した点

苦労した点といえば、デバッガーを使っても、最初何もメッセージが出なくてなんでだろうな?と思った。
デバッガで、どこまで実行できているかを調べて、とまっている、await の非同期メソッドのところに、try/catch を書いてあげると原因が分かった。ちなみに今回はvar client = new DataFactoryManagementClient(tokenCredentails); のところで、tokenCredential の後に、"https://management.azure.com/";を、URLにラップして渡していた。ところが、そのURLが間違っているといわれたので、普通こんなのライブラリに書いてあるだろうとおもって、外したらうまく動いた。

あとは、データスライスとかの概念の理解に時間がかかった。明日からこれでがっつりコーディングができるだろう。ちなみに、REST-API のコールは、リソースグループ毎に次の制限がある。今回のやり方だと、DataSliceRun を数だけコールするので、何回も呼ぶと、すぐに15,000 ぐらいいってしまいそう(お客様のところには、大量の DataFactory があるので)うまく工夫してあまりコールしなくても済むようにしよう。

API.png

リソース