はじめに
Databricks Asset BundlesとAzure DevOpsを用いて、簡単なジョブのCI/CDパイプラインを構築してみました。CI/CDパイプライン実行VM上でテストを実行するのではなく、Databricks上にテスト実行ジョブを作成し、それをCIのテストとして実施する構成にしています。
(録画をもとにキャプチャをとったため、全体的に画質が悪いですが、ご了承ください。)
環境
- Databricks
- Azure DevOps
- Azure Repos
- Azure Pipelines
- セルフホステッドエージェント
- Databricks CLI
- jq
- セルフホステッドエージェント
フォルダ構成
2つのジョブをasset bundlesで定義しています。(etl: CD対象、unittest: CD前のCI時に実行されるテスト)
data_source
|-- azure-pipelines.yml
|-- test
| |-- test.py
|
|-- etl_resource
| |-- etlのリソースをここに配置
|
|-- asset_bundles
|-- etl
| |-- databricks.yml
|
|-- unittest
|-- databricks.yml
azure-pipelines.yml
variables:
- group: databricks
trigger:
branches:
include:
- dev
- stg
- main
paths:
include:
- data_source/*
pool: {セルフホステッドエージェント}
stages:
- stage: setup
jobs:
- job: setup
steps:
- script: |
if [ "$(Build.SourceBranch)" == "refs/heads/main" ]; then
echo "##vso[task.setvariable variable=environment;isOutput=true]prod"
echo "##vso[task.setvariable variable=DATABRICKS_HOST;isOutput=true;isSecret=true]$(prod_host)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_ID;isOutput=true;isSecret=true]$(prod_client_id)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_SECRET;isOutput=true;isSecret=true]$(prod_client_secret)"
elif [ "$(Build.SourceBranch)" == "refs/heads/stg" ]; then
echo "##vso[task.setvariable variable=environment;isOutput=true]stg"
echo "##vso[task.setvariable variable=DATABRICKS_HOST;isOutput=true;isSecret=true]$(stg_host)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_ID;isOutput=true;isSecret=true]$(stg_client_id)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_SECRET;isOutput=true;isSecret=true]$(stg_client_secret)"
else
echo "##vso[task.setvariable variable=environment;isOutput=true]dev"
echo "##vso[task.setvariable variable=DATABRICKS_HOST;isOutput=true;isSecret=true]$(dev_host)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_ID;isOutput=true;isSecret=true]$(dev_client_id)"
echo "##vso[task.setvariable variable=DATABRICKS_CLIENT_SECRET;isOutput=true;isSecret=true]$(dev_client_secret)"
fi
name: set_variable
- stage: test
dependsOn: setup
condition: eq(dependencies.setup.result, 'Succeeded')
jobs:
- job: test
variables:
- name: environment
value: $[stageDependencies.setup.setup.outputs['set_variable.environment']]
- name: DATABRICKS_HOST
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_HOST']]
- name: DATABRICKS_CLIENT_ID
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_CLIENT_ID']]
- name: DATABRICKS_CLIENT_SECRET
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_CLIENT_SECRET']]
steps:
- script: |
export DATABRICKS_HOST='$(DATABRICKS_HOST)'
export DATABRICKS_CLIENT_ID='$(DATABRICKS_CLIENT_ID)'
export DATABRICKS_CLIENT_SECRET='$(DATABRICKS_CLIENT_SECRET)'
# unittestジョブ定義フォルダへ移動
cd data_source/asset_bundles/unittest/
# unittestジョブのデプロイ
databricks bundle deploy -t $(environment)
# unittestジョブのIDの特定
job_id=$(databricks jobs list | grep 'Demo_UnittestRunner' | awk '{print $1}')
# unittestジョブの実行
response=$(databricks jobs run-now $job_id)
result=$(echo $response | jq -r .status.termination_details.code)
if [ $result == "SUCCESS" ]; then
exit 0
else
exit 1
fi
- stage: deploy
dependsOn:
- setup
- test
condition: and(eq(dependencies.setup.result, 'Succeeded'), in(dependencies.test.result, 'Succeeded', 'Skipped'))
jobs:
- job: deploy
variables:
- name: environment
value: $[stageDependencies.setup.setup.outputs['set_variable.environment']]
- name: DATABRICKS_HOST
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_HOST']]
- name: DATABRICKS_CLIENT_ID
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_CLIENT_ID']]
- name: DATABRICKS_CLIENT_SECRET
value: $[stageDependencies.setup.setup.outputs['set_variable.DATABRICKS_CLIENT_SECRET']]
steps:
- script: |
export DATABRICKS_HOST='$(DATABRICKS_HOST)'
export DATABRICKS_CLIENT_ID='$(DATABRICKS_CLIENT_ID)'
export DATABRICKS_CLIENT_SECRET='$(DATABRICKS_CLIENT_SECRET)'
# etlジョブ定義フォルダへ移動
cd data_source/asset_bundles/etl/
# etlジョブのデプロイ
databricks bundle deploy -t $(environment)
ステージを3つ定義しています。各ステージの役割は以下の通りです。
- setup
- デプロイ先のワークスペースを特定し、認証情報をセット
- test
- テスト用のジョブをデプロイ・実行
- deploy
- (テスト通過後、)ETL用のジョブをデプロイ
asset_bundles/unittest/databricks.yml
bundle:
name: Demo_UnittestRunner
sync:
paths:
- "../../test"
resources:
jobs:
demo_job:
name: Demo_UnittestRunner
tasks:
- task_key: test
job_cluster_key: UnittestRunner_Cluster
notebook_task:
notebook_path: ../../test/test.py
permissions:
- service_principal_name: {サービスプリンシパルID}
level: IS_OWNER
job_clusters:
- job_cluster_key: UnittestRunner_Cluster
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: Standard_D4ads_v5
autoscale:
min_workers: 1
max_workers: 1
azure_attributes:
availability: SPOT_WITH_FALLBACK_AZURE
data_security_mode: SINGLE_USER
targets:
dev:
mode: production
default: true
workspace:
host: {ホスト}
root_path: {ルートパス}
run_as:
service_principal_name: {サービスプリンシパルID}
stg:
mode: production
default: true
workspace:
host: {ホスト}
root_path: {ルートパス}
run_as:
service_principal_name: {サービスプリンシパルID}
prod:
mode: production
default: true
workspace:
host: {ホスト}
root_path: {ルートパス}
run_as:
service_principal_name: {サービスプリンシパルID}
動作確認
テスト失敗例
テスト失敗例として、test/test.pyに例外を出すコードを記載します。
Azure Pipelinesを実行すると、Databricks上にunittest用のジョブがデプロイ・実行されます。test/test.pyが例外を出すため、ジョブが失敗します。
Azure Pipelinesの結果を見ると、想定通りtestステージで失敗しており、後続のdeployステージがスキップされていることが確認できます。
テスト成功例
Azure Pipelinesを実行すると、こちらも想定通りにdeployステージまで実行されていることが確認できます。
unittestジョブが成功していること、etl用ジョブがデプロイされていることを確認できます。
最後に
Databricksについて実際のCI/CD構築の具体例まで記載ある記事はまだあまり見つけられずでしたので、今回CI/CD環境について整理してみました。
最小限の構成ですし、パイプライン定義についても見直す余地はあるので、実運用として利用するにはブラッシュアップする必要があります。
(途中でMicrosoftホステッドエージェントからセルフホステッドエージェント切り替えたこともあり、冗長な記載がある気がします。また、ジョブの実行についてもAsset Bundlesで実行できそうなのでそちらを使えばよかったかもです。)
ただ、Databricks上でテストを動作させる構成について実際の動作含めて確認できたため、自身の今後の業務へ活用していけそうです。