はじめに
ジョブ管理システムとFabricのデータパイプラインを統合するために、Power Shellスクリプトを記述しました。
AI によりコードを生成しています。本番環境での利用は内容を要確認のうえご利用ください
準備
-
サービスプリンシパルの作成とシークレットの取得
https://learn.microsoft.com/ja-jp/entra/identity-platform/quickstart-register-app に従って、アプリを登録します。
シークレットを作成したら保管してください。 -
テナント設定
作成したサービスプリンシパルが Fabric API を実行できるようにテナント設定を構成します。
-
権限割当
-
power shell 実行環境のモジュール
https://learn.microsoft.com/ja-jp/powershell/azure/what-is-azure-powershell?view=azps-14.1.0 をインストールしておく必要があります。
実行
-
対象のパイプラインの情報を採取します。group のあとのワークスペースIDとpipelinesのあとのItem IDが必要となります。
-
以下の.ps1 ファイルを配置します。
powershell<# .SYNOPSIS Start a Fabric Data Pipeline job and poll its status until completion, refreshing tokens as needed. #> param ( [ValidateSet("UserPrincipal","ServicePrincipal")] [string] $PrincipalType = "ServicePrincipal", [Parameter(Mandatory)][string] $WorkspaceId, [Parameter(Mandatory)][string] $ItemId, [string] $ClientId, [string] $TenantId, [string] $ClientSecret ) $JobType = "Pipeline" $BaseUrl = "https://api.fabric.microsoft.com/v1" $ResourceUrl = "https://api.fabric.microsoft.com/.default" function Get-FabricHeaders { param( [ValidateSet("UserPrincipal","ServicePrincipal")] [string] $PrincipalType, [string] $ClientId, [string] $TenantId, [string] $ClientSecret, [string] $Scope # 例: "https://api.fabric.microsoft.com/.default" ) # v2.0 トークンエンドポイント $tokenEndpoint = "https://login.microsoftonline.com/$TenantId/oauth2/v2.0/token" if ($PrincipalType -eq "UserPrincipal") { Connect-AzAccount | Out-Null throw "UserPrincipal フローは未実装です。ServicePrincipal をご利用ください。" } else { # クライアント資格情報フロー $body = @{ client_id = $ClientId client_secret = $ClientSecret scope = $Scope grant_type = "client_credentials" } try { $resp = Invoke-RestMethod -Method Post -Uri $tokenEndpoint -Body $body -ErrorAction Stop $accessToken = $resp.access_token } catch { Write-Error "トークン取得失敗: $($_.Exception.Message)" exit 1 } } return @{ Authorization = "Bearer $accessToken" "Content-Type" = "application/json" } } function Start-PipelineJob { param( [string] $BaseUrl, [string] $WorkspaceId, [string] $ItemId, [string] $JobType, [hashtable] $Headers ) $url = "$BaseUrl/workspaces/$WorkspaceId/items/$ItemId/jobs/instances?jobType=$JobType" Write-Host "▶ Starting Pipeline job at: $url" try { $resp = Invoke-WebRequest -Uri $url -Method Post -Headers $Headers -Body '{}' -ErrorAction Stop Write-Host "✅ Job started (StatusCode: $($resp.StatusCode))" -ForegroundColor Green return $resp.Headers } catch { Write-Host "❌ Failed to start job: $($_.Exception.Message)" -ForegroundColor Red if ($_.Exception.Response) { $ex = $_.Exception.Response Write-Host " HTTP Status: $($ex.StatusCode) $($ex.StatusDescription)" Write-Host " -- Response Headers --" $ex.Headers.GetEnumerator() | ForEach-Object { Write-Host " $($_.Name): $((@($_.Value) -join ','))" } Write-Host " -- Response Body --" $reader = [System.IO.StreamReader]::new($ex.GetResponseStream()) Write-Host $reader.ReadToEnd() } exit 1 } } #--- 実行フロー ---# $headers = Get-FabricHeaders -PrincipalType $PrincipalType -ClientId $ClientId -TenantId $TenantId -ClientSecret $ClientSecret -Scope $ResourceUrl $respHeaders = Start-PipelineJob -BaseUrl $BaseUrl -WorkspaceId $WorkspaceId -ItemId $ItemId -JobType $JobType -Headers $headers # Retry-After $retryAfterValue = $respHeaders['Retry-After'] if ($retryAfterValue -is [array] -and $retryAfterValue.Count -gt 0) { $retryAfter = [int]$retryAfterValue[0] } elseif ($retryAfterValue) { $retryAfter = [int]$retryAfterValue } else { $retryAfter = 30 } Write-Host "⏱ Poll interval: $retryAfter seconds" # Location $locationValue = $respHeaders['Location'] if ($locationValue -is [array] -and $locationValue.Count -gt 0) { $location = [string]$locationValue[0] } elseif ($locationValue) { $location = [string]$locationValue } else { Write-Error 'Location header missing. Cannot poll status.' exit 1 } Write-Host "🔄 Polling at: $location" # Poll loop do { $headers = Get-FabricHeaders -PrincipalType $PrincipalType -ClientId $ClientId -TenantId $TenantId -ClientSecret $ClientSecret -Scope $ResourceUrl Start-Sleep -Seconds $retryAfter $statusResp = Invoke-RestMethod -Uri $location -Method Get -Headers $headers -ErrorAction Stop $state = $statusResp.status Write-Host "$(Get-Date -Format u) → ステータス: $state" } while ($state -in @('NotStarted','InProgress')) # Result if ($state -eq 'Completed') { Write-Host '✅ Job completed successfully.' exit 0 } else { Write-Host "❌ Job ended with status: $state" if ($statusResp.failureReason) { Write-Host "Error detail: $($statusResp.failureReason | ConvertTo-Json -Depth 4)" } exit 1 }
-
実行は以下のように行います。
powershell.\Start-FabricJobAndPoll.ps1 ` -PrincipalType ServicePrincipal ` -WorkspaceId <workspace id> ` -ItemId <pipeline id> ` -ClientId <サービスプリンシパルのクライアントID> ` -TenantId <サービスプリンシパルのテナントID> ` -ClientSecret <サービスプリンシパルのシークレット値>
シークレットは環境変数などで隠ぺいすることをおすすめします。
以上です。パラメータを設定する必要がある場合は、参考の API ドキュメントを使用して body の値を設定してください。
参考