LoginSignup
2
2

More than 5 years have passed since last update.

Azure Data Factory の実行結果のエラーメッセージを取得する

Last updated at Posted at 2017-08-04

今日は、Azure Data Factory の実行結果のエラーメッセージを取得するためのコードを書いてみたので、共有してみたい。

今回作成したサンプルコード

YOUR_XXX の部分は、Azure にアクセスするための、Service Principle の値になっている。Azure にアクセスするためのアプリケーションに権限を与えてくれるものだ。Create an Azure service principal with Azure CLI 2.0 がおそらくもっとも簡単な方法だ。コードは長いので、個別に解説していこう。

using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace DataFactory
{
    class Program
    {


        static void Main(string[] args)
        {
            new Program().DummyAllMessagesAsync();
            Console.ReadLine();

        }

        public async Task DummyAllMessagesAsync()

        {
            var subscriptionId = "YOUR_SUBSCRIPTION_ID";
            var tenantId = "YOUR_TENANT_ID";
            var applicationId = "YOUR_CLIENT_ID";
            var applicationKey = "YOUR_CLIENT_SECRET";

            var tokenCredentails = new TokenCloudCredentials(
                subscriptionId,
                await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
                );

            var client = new DataFactoryManagementClient(tokenCredentails);
            var dataFactory = "YOUR_DATA_FACTORY_NAME";
            var resourceGroup = "YOUR_RESOURCE_GROUP_NAME";
            var dataPipeline = "YOUR_PIPELINE_NAME";
            var dataset = "YOUR_DATASET_NAME"; 
            // var dataSliceRunListParameters = new DataSliceRunListParameters("2017-08-02T23:00:00.0000000");



            var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);

            var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
            var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);

            // DateTime PipelineActivePeriodStartTime = new DateTime(2017, 8, 2, 22, 41, 0, 0, DateTimeKind.Utc);
            // DateTime PipelineActivePeriodEndTime = new DateTime(2017, 8, 4, 22, 57, 0, 0, DateTimeKind.Utc);


            var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
                DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
                DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
            }
                );

            var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);

            foreach (var startTime in sliceRunStartTimes)
            {
                var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
                DataSliceRunListResponse sliceRuns;
                try
                {
                    sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
                    foreach (var sliceRun in sliceRuns.DataSliceRuns)
                    {

                        GetDataSliceRunSummary(sliceRun);
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                }
            }


        }
        private void GetDataSliceRunSummary(DataSliceRun run)
        {

            var properties = typeof(DataSliceRun).GetProperties(BindingFlags.Instance | BindingFlags.Public)
                .Where(p => p.CanRead && p.CanWrite)
                .Where(p => p.PropertyType == typeof(string));
            foreach (var p in properties)
            {
                Console.WriteLine(p.GetValue(run));
            }
        }

        private const string ActiveDirectoryEndpoint = "https://login.windows.net/";
        private const string WindowsManagementUri = "https://management.core.windows.net/";

        public async Task<string> GetAuthorizationHeader(string tenantId, string applicationId, string applicationKey)
        {



            var context = new AuthenticationContext(ActiveDirectoryEndpoint + tenantId);
            var credential = new ClientCredential(
                applicationId,
                applicationKey);
            AuthenticationResult result = null;
            try
            {

                result = await context.AcquireTokenAsync(
                  resource: WindowsManagementUri,
                  clientCredential: credential);
            } catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }


            if (result != null)
                return result.AccessToken;

            throw new InvalidOperationException("Failed to acquire token");
        }
    }


}

コードの解説

必要なもの

お客様からのリクエストで、Azure Data Factory を実行したときのログをとるのだが、ログのエラーメッセージがわからないので、エラーメッセージを取得したいというリクエスト。

Azure Data Factory でのスケジューリングと実行で出てくる概念は次のようなものだ。Linked Service がたとえば、SQL だったり、Blob だったりする。それに対して、そこからデータ抜いてきて、 DataSet という名前で定義する。DataSet には、どれぐらいの頻度でデータを取得するか?という情報が含まれている。そのDataSet から、データを取得して、「Activity」が実行する。Activityには、Winodws という概念があり、特定の時間(Start - End) に、そのActivityが、実行される。Activity が特定の Windowで実行された結果、取得した結果のデータを、DataSlice という。これが基本的な Azure Data Factory の概念だ。

availability-scheduler.png

1. クライアントの作成

Azure にアクセスするための、Token を Service Principal と、Subscription Id から作成する。それを渡すと、クライアントのオブジェクトが生成される。内部では、REST API のコールの、ヘッダにToken を渡してコールしているだけだ。詳しくはこちら Tutorial: Use REST API to create an Azure Data Factory pipeline to copy data

ちなみに、GetAuthorizationHeader はなんでやねん!というぐらいべたべたのコードになっている。これぐらいAPI にラップしてもらいたいものだw

var tokenCredentails = new TokenCloudCredentials(
    subscriptionId,
    await GetAuthorizationHeader(tenantId, applicationId, applicationKey)
);

var client = new DataFactoryManagementClient(tokenCredentails);

2. パイプライン情報の取得

アクティビティを実行するのを管理しているのが、アクティビティだが、最終的な、ErrorMessage が含まれる、DataSlice をとろうと思うと、現在のパイプラインに設定されている開始時間と、終了時間をとる必要がある。下記のような感じで、取れるのだが、?? を使っている理由を説明する。?? は、デフォルト値を設定できる書き方だが、Start や、 End の型が DateTime? になっている。型の横に?がついているのは、null を許容するという型になる。ところが、DateTime は null を許容しないので、null だったらデフォルト値をセットしてあげる必要がある。

var pipelineResponse = await client.Pipelines.GetAsync(resourceGroup, dataFactory, dataPipeline);

var PipelineActivePeriodStartTime = pipelineResponse.Pipeline.Properties.Start ?? DateTime.Now;
var PipelineActivePeriodEndTime = pipelineResponse.Pipeline.Properties.End ?? new DateTime(2099, 12, 32, 23, 59, 0, 0, DateTimeKind.Utc);

これで開始時間と、終了時間が取れた!

3. データスライスの取得

次のメソッドで、データスライスの一覧を取得する。なぜ取得するかというと、次に出てくる、データスライスの実行結果を取得するためには、開始時間が必要だからだ。

var slices = await client.DataSlices.ListAsync(resourceGroup, dataFactory, dataset, new DataSliceListParameters() {
    DataSliceRangeStartTime = PipelineActivePeriodStartTime.ConvertToISO8601DateTimeString(),
    DataSliceRangeEndTime = PipelineActivePeriodEndTime.ConvertToISO8601DateTimeString()
    }
);

だから、開始時間の Enumerable オブジェクトを、Link で取得する。


var sliceRunStartTimes = slices.DataSlices.Select(s => s.Start);

4. データスライスランの取得

データスライスの実行結果を取得する。本番ではもちろんこんなエラー処理はしないが、非同期メソッドを await で待っているときは、しっかりエラー処理をしないとエラーが発生した時にわからないっぽい。(これは師匠に聞いてみよう)


foreach (var startTime in sliceRunStartTimes)
{
     var dataSliceRunListParameters = new DataSliceRunListParameters(startTime.ConvertToISO8601DateTimeString());
     DataSliceRunListResponse sliceRuns;
     try
     {
          sliceRuns = await client.DataSliceRuns.ListAsync(resourceGroup, dataFactory, dataset, dataSliceRunListParameters);
          foreach (var sliceRun in sliceRuns.DataSliceRuns)
          {

               GetDataSliceRunSummary(sliceRun);
           }
      }
      catch (Exception e)
      {
           Console.WriteLine(e.ToString());
      }
}

ちなみに、GetDataSliceRunSummary は、コードを調べて書くのが面倒だったので、リフレクションでstring の値だけダンプするようなメソッドにしている。

実行結果。

無事ゲットできた。

56a56b39-3d11-47d6-98fe-aa43b1a5d7de_636373098000000000_636373107000000000_InputDataset-7nt


Succeeded
InputDataset-7nt
Validation



93ef854e-5bdd-448d-b14b-3706a2cc5a33_636373107000000000_636373116000000000_InputDataset-7nt


Succeeded
InputDataset-7nt
Validation


Table '[dbo].[emp]' does not exist.
bc5931f7-6206-43f4-a156-3d1c5fbdf533_636373116000000000_636373125000000000_InputDataset-7nt


FailedValidation
InputDataset-7nt
Validation


Table '[dbo].[emp]' does not exist.
5c2df8b0-555d-46f8-b0f6-2b14abc0e9e6_636373125000000000_636373134000000000_InputDataset-7nt


FailedValidation
InputDataset-7nt
Validation

苦労した点

苦労した点といえば、デバッガーを使っても、最初何もメッセージが出なくてなんでだろうな?と思った。
デバッガで、どこまで実行できているかを調べて、とまっている、await の非同期メソッドのところに、try/catch を書いてあげると原因が分かった。ちなみに今回はvar client = new DataFactoryManagementClient(tokenCredentails); のところで、tokenCredential の後に、"https://management.azure.com/";を、URLにラップして渡していた。ところが、そのURLが間違っているといわれたので、普通こんなのライブラリに書いてあるだろうとおもって、外したらうまく動いた。

あとは、データスライスとかの概念の理解に時間がかかった。明日からこれでがっつりコーディングができるだろう。ちなみに、REST-API のコールは、リソースグループ毎に次の制限がある。今回のやり方だと、DataSliceRun を数だけコールするので、何回も呼ぶと、すぐに15,000 ぐらいいってしまいそう(お客様のところには、大量の DataFactory があるので)うまく工夫してあまりコールしなくても済むようにしよう。

API.png

リソース

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2