2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

自分自身で処理を中断する Data Factory パイプラインを作ってみる

Last updated at Posted at 2021-12-13

この記事は Microsoft Azure Tech Advent Calendar 2021 の4日目の記事(後追い)です。

はじめに

日々の業務の中で、Data Factory を使っているのですが、パイプライン自身でエラー ハンドリングの結果、自分自身の実行を中断したいケースに出会ったので、試しに作ってみました。ということでその共有です。「イヤイヤ、それではちょっと」とか「もっとこうすればいいよ」などのツッコミ、フィードバックをお待ちしております。

全体像

繰り返し処理を実施して、指定したしきい値を越えた回数エラーとなったらパイプラインを終了する、という処理構造を実装します。今回は処理に特化した内容なので、ビジネス的なシナリオは特に想定しません。

なお、この記事内で断りなく「パイプライン」と記述がある場合は、Azure Data Factory のパイプラインを指しています。

事前準備

実際に試すためには以下のものが必要になります。

  • Azure サブスクリプション
  • Azure Data Factory リソース(V2を使用します。任意の名前でデプロイしてください。作成時の Git 統合は不要です。こちら の手順が参考になります)
  • 今回はデータを移動させたり、取り込んだりしないのでその他のリソースは不要です。

作ってみる

それでは作ってみましょう。手順書を目指しているわけではないので、完全なステップごとのスクリーンショットはありません。ご了承ください。

Data Factory のマネージド ID に RBAC で認可設定

現在、Azure Data Factory は Azure ポータルから作成すると、自動的にシステム割り当てのマネージド ID が自動的に作成されます。ここでは、システム割り当てのマネージド ID を使用します。どの粒度で認可を与えるかは、システム固有の設計事項なので充分に検討してください。ここではサブスクリプションの「共同作成者」ロールを付与してみます。

サブスクリプションの アクセス制御 (IAM) メニューから [+追加] → [ロールの割り当ての追加] を選択します。
image.png

ロール タブで「共同作成者」 を選択し、メンバー タブでアクセスの割り当て先に「マネージドID」を選択し、[+メンバーを選択する]をクリックします。マネージド IDの選択 ブレードが表示されたら、サブスクリプション、マネージド ID の種類を選ぶことで Data Factory のマネージド ID が一覧表示されますので該当するものを選択します。
image.png

選択したら、ロールの割り当てを完了させます。サブスクリプションのアクセス制御からロールの割り当て状況が確認できます。
image.png

パイプラインを作成する

エラーハンドリングとしてのパイプラインの中断としたいので、エラーを起こすための要素、エラーのしきい値チェック、中断の実施のステップを追って見ていきたいと思います。

エラーを起こす要素の実装

今回、エラーの種類はなんでもいいので、「変数に値を入れる際のデータ型の不一致」をエラーとして扱います。実際のパイプラインでは、これがデータのコピーだったり、カスタム アクティビティによる Azure Batch の実行だったりします。

Azure Data Factory Studio で新しいパイプラインを作成し、変数を定義します。loopCount はアレイ型とし、既定値を設定して、繰り返し処理を模擬的に実施させるために使用します。通常は、例えばファイル名のリスト、テーブル名のリストなどが考えられるかと思います。target_val はブール型とし、この変数に文字列データを設定しようとしてエラーを発生させます。failCount はアレイ型とし、処理に失敗した場合に配列に要素を追加していきます。
image.png

パイプラインに ForEach アクティビティを配置し、バッチ カウントを 1 にします。また、項目には動的なコンテンツとしてさっき作成した変数の loopCount を割り当てます。バッチカウントは、ForEach の内部の処理の並列数を決めるパラメーターです。2にすると、項目で指定した配列の1番目、2番目を使用して内部の処理が2つ並行して実行されます。
image.png

配置した ForEach アクティビティの内部処理を作っていきます。実際の処理は1秒、2秒では終わらないと思いますので疑似的に処理時間を取るための待機アクティビティと、エラーを発生させる処理としての変数の設定アクティビティを配置します。待機アクティビティと変数の設定アクティビティの間には依存関係を忘れずに設定します。変数の設定アクティビティでは、最初に作成した変数 target_val に文字列 (ここでは foobar) を設定するように構成します。エラーを起こす要素はこれで完了です。
image.png

エラーのしきい値チェック

次に、変数の設定アクティビティがエラーだった場合に処理が行われるように処理を追加していきます。

変数の設定アクティビティを選択した状態で、右下のアイコンを選択しエラーの場合の依存関係を追加します。
image.png

変数の追加アクティビティを追加し、変数の設定アクティビティと「エラー」の依存関係で接続します。最初に作成した failCount に、その時点の ForEach で扱っている項目を動的なコンテンツとして追加します。(配列なので、最後の要素として追加されます)これで、エラーが発生した際に failCount の要素が1つずつ増えるようになりました。
image.png

それでは、しきい値チェック部分を作っていきましょう。パイプラインに If Condition アクティビティを追加します。If Condition アクティビティに条件式を設定します。length 関数で failCount 変数の長さを取得し、greater 関数で比較して 5 よりも大きければ True となるような式です。しきい値は個別の設計事項になりますので、ご自身の要件に合わせて変更してください。

@greater(length(variables('failCount')),5)

image.png

中断の実装

いよいよ仕上げです。しきい値をチェックして True が成り立った場合は処理に、パイプラインを中断する部分を追加していきます。

さっき配置した If Condition アクティビティの True 側に Web アクティビティを追加します。URL はパイプライン キャンセルの REST API のドキュメント を参考に、動的コンテンツを設定します。API メソッドはドキュメントの記載通り「POST」を選択し、本文(これが HTTP Request の body です)には適当な文字列を埋めておきます。認証ではマネージド ID を選択し、リソースには https://management.azure.com/ と記述します。

下記は、Web アクティビティに設定する URL のサンプルです。{your subscription id}{your resource group name} はご自身の環境に合わせて置き換えてください。いくつかの文字列を concat 関数で結合して URL を組み立てています。

@concat('https://management.azure.com/subscriptions/{your subscription id}/resourceGroups/{your resource group name}/providers/Microsoft.DataFactory/factories/',pipeline().DataFactory,'/pipelineruns/',pipeline().RunId,'/cancel?api-version=2018-06-01')

image.png

動作確認

それでは、意図した通りに動作するか確認してみましょう。今回のパイプラインは残念ながらデバッグ実行で動作を見ることができません。(最後の中断部分がデバッグ実行の場合だとエラーになります)

[すべて発行] でパイプラインをライブ状態にします。
image.png
image.png

発行が完了したら、トリガーの追加から [今すぐトリガー] を選択してパイプラインを実行します。
image.png

パイプラインのモニター画面からパイプラインの状態を見ていきます。パイプライン名を選択するとアクティビティの実行状況が確認できます。この結果は、しきい値チェックのしきい値を2に変更した状態で実行したものをキャプチャしています。
image.png

処理が中断された後、モニター画面で確認すると下図のように記録が残ります。時系列としては下から見ていくのですが、最初に ForEach アクティビティが実行され、次に内部処理としての待機、変数の設定、変数の追加、しきい値チェックが繰り返し行われていることが判ります。そして最後に REST API 呼び出しが行われ「取り消されました」のステータスになります。
image.png

まとめ

今回はパイプラインの処理構造として「所定の回数エラーとなったら中断されるパイプライン」を作成してみました。実務上のデータ処理が組み込まれていないのでちょっとイメージがつかみにくいかもしれませんが、ぜひ、ご自身で試しにデータ処理を追加実装の上で試行錯誤してみてください。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?