概要
Azure Data Factory (ADF)のパイプラインを PowerShell の Azure CLI から実行する方法の検証結果を共有します。Azure CLI による ADFのパイプラインの基本的な実行方法を確認し、処理の共通化を検討しました。
本検証を VS Code の Polyglot Notebooks を利用して実施しました。
事前準備
1. Azure Data Factory にてパイプラインを作成
Azure Data Factory にて、次のパイプラインを作成します。待機アクティビティにて 10 秒待機し、result
パラメータがfailure
の場合にはエラー終了するパイプラインです。result
パラメータへの引数を変更することで、正常終了とエラー終了を切り替えられるようにしてあります。
パイプラインの json 定義
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Wait1",
"type": "Wait",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"waitTimeInSeconds": 10
}
},
{
"name": "If Condition1",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Wait1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(pipeline().parameters.result,pipeline().parameters._error_result_txt)",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "Fail1",
"type": "Fail",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"message": "error desu",
"errorCode": "1"
}
}
]
}
}
],
"parameters": {
"result": {
"type": "string",
"defaultValue": "failure"
},
"_error_result_txt": {
"type": "string",
"defaultValue": "failure"
}
},
"variables": {
"values": {
"type": "String"
}
},
"folder": {
"name": "test"
},
"annotations": []
}
}
2. PowerShell にて Azure CLI をインストール後、Azure にログインを実施
PowerShell にて 次のコマンドを実行し、 Azure CLI をインストールします。
winget install -e --id Microsoft.AzureCLI
次のコマンドを実行して、ブラウザーにて Azure への認証を実施します。
$tenant_id ="{tenand_id}"
az login --tenant $tenant_id
基本的な実行
1. 変数の定義
$tgt_tenant_id ="{tenant_id}"
$tgt_subscription="{subscription_id}"
$tgt_resource_group="{resoure_group_name}"
$tgt_factory_name="{adf_name}"
$pip_name="pipeline1"
$pip_paras='{"result": "success"}' # success or failure
2. Azure への認証
# Check if the session is valid
$account = az account show --output json
# If the session is not valid, login to Azure with the specified tenant ID
if ($account -eq $null) {
try {
# Login to Azure with the specified tenant ID
$account = az login --tenant $tgt_tenant_id
}
catch {
Write-Host "Login failed."
exit 1
}
}
ログインできていない場合には、次のような結果となる想定。
3. パイプライン実行
# Create a pipeline run
$current_run_id=az datafactory pipeline create-run `
--subscription $tgt_subscription `
--resource-group $tgt_resource_group `
--factory-name $tgt_factory_name `
--name $pip_name `
--parameters ($pip_paras | ConvertTo-Json) `
| Convertfrom-Json `
| % {$_.runId}
4. 完了確認
# Wait for the pipeline run to complete
$command = "az datafactory pipeline-run show ``
--subscription $tgt_subscription ``
--resource-group $tgt_resource_group ``
--factory-name $tgt_factory_name ``
--run-id $current_run_id "
$expected_outputs = @("Succeeded", "Failed", "Canceled")
while ($true) {
$output = Invoke-Expression $command | ConvertFrom-Json
if ($expected_outputs -contains $output.status) {
break
}
Start-Sleep -Seconds $interval
}
5. 実行結果の確認
# Check the status of the pipeline run
if ($output.status -eq "Failed") {
Write-Host $output | ConvertTo-Json
throw "Pipeline failed"
} else {
Write-Host $output | ConvertTo-Json
exit 0
}
共通化した処理による実行
1. 共通の変数と関数を別のファイルに記述
# variables.ps1
$tgt_tenant_id=tenant_id
$tgt_subscription="{subscription_id}"
$tgt_resource_group="{resoure_group_name}"
$tgt_factory_name="{adf_name}"
# functions.ps1
# functions.ps1
function login_azure {
param (
[string]$tenant_id
)
# Check if the session is valid
$account = az account show --output json
# If the session is not valid, login to Azure with the specified tenant ID
if ($account -eq $null) {
try {
# Login to Azure with the specified tenant ID
$account = az login --tenant $tenant_id
}
catch {
Write-Host "Login failed."
exit 1
}
}
return $account
}
function create_adf_pipe_run {
param(
[string]$subscription,
[string]$resource_group,
[string]$factory_name,
[string]$pip_name,
[string]$pip_paras
)
# Create a pipeline run
$current_run_id = az datafactory pipeline create-run `
--subscription $subscription `
--resource-group $resource_group `
--factory-name $factory_name `
--name $pip_name `
--parameters ($pip_paras | ConvertTo-Json) `
| Convertfrom-Json `
| % { $_.runId }
return $current_run_id
}
function check_adf_pipe_run {
param(
[string]$subscription,
[string]$resource_group,
[string]$factory_name,
[string]$run_id,
[int]$interval = 20 # seconds
)
# Wait for the pipeline run to complete
$command = "az datafactory pipeline-run show ``
--subscription $subscription ``
--resource-group $resource_group ``
--factory-name $factory_name ``
--run-id $run_id "
$expected_outputs = @("Succeeded", "Failed", "Canceled")
while ($true) {
$output = Invoke-Expression $command | ConvertFrom-Json
if ($expected_outputs -contains $output.status) {
break
}
Start-Sleep -Seconds $interval
}
return $output
}
function check_pipeline_status {
param(
[PSCustomObject]$adf_pipe_result
)
$status = $adf_pipe_result.status
if ($status -eq "Failed") {
$message = "Pipeline failed: $($adf_pipe_result.error.message)"
throw $message
}
else {
Write-Output "Pipeline succeeded"
exit 0
}
}
2. 外部ファイルの読み込みと変数の定義
$variablesFile = "./variables.ps1"
. $variablesFile
$functionsFile = "./functions.ps1"
. $functionsFile
$pip_name="pipeline1"
$pip_paras='{"result": "failure"}' # success or failure
3. ADF の実行と結果確認
$account=login_azure `
-tenant_id $tgt_subscription
$current_run_id=create_adf_pipe_run `
-subscription $tgt_subscription `
-resource_group $tgt_resource_group `
-factory_name $tgt_factory_name `
-pip_name $pip_name `
-pip_paras $pip_paras
$pip_output=check_adf_pipe_run `
-subscription $tgt_subscription `
-resource_group $tgt_resource_group `
-factory_name $tgt_factory_name `
-run_id $current_run_id
check_pipeline_status `
-adf_pipe_result $pip_output
その他論点
検討すべき項目
- 認証方法
- 証明書ベースのサービスプリンシパルで問題ないか
- 再認証を定期的に実施する必要があるか *1
- 権原付与
- カスタムロールを作成できる場合には、カスタムロールを付与 *2
- 閲覧者
Microsoft.DataFactory/factories/pipelines/createrun/action
- カスタムロールを作成できない場合には、Data Factory 共同作成者ロールを付与
- カスタムロールを作成できる場合には、カスタムロールを付与 *2
- 運用
- リトライをどのように実行するか
- キャンセルをどうやるか
- パイプラインを個別に実施した場合にパイプラインごとに数秒の待機が発生するが許容できるか
- 許容できない場合には ADF 側にてまとめて実行するパイプラインの作成を検討
- リターン方法
- リターンとして受け取るのか
- ファイルに書き込む必要があるのか
- 主な制約条件
-
Azure Data Factory
- データ ファクトリあたりの同時実行パイプラインの実行数 (ファクトリ内のすべてのパイプライン間で共有)
- API 呼び出しの読み取り
- その他
-
Azure Data Factory
*1 ドキュメントに次のように記載されている通り、90日間は認証は不要でありいずれかの処理を行うことで更新トークンが新しく置き換えられる旨の記載がされている。
更新トークンの既定の有効期間は、シングル ページ アプリの場合は 24 時間、他のすべてのシナリオでは 90 日間です。 更新トークンは、使用するたびに新しいトークンに置き換えられます。
引用元:Microsoft ID プラットフォームの更新トークン - Microsoft Entra | Microsoft Learn
*2 次の記事が参考になりそうです。