Continuous integration and delivery on Databricks using Jenkins | Databricks on AWS [2021/10/14時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
継続的インテグレーション、継続的デリバリー(CI/CD)は、自動化パイプラインの活用による、短期間かつ高頻度なサイクルでソフトウェアを開発、デリバリーするプロセスに関するものです。これは新しいプロセスではなく、数十年もの間従来型のソフトウェアエンジニアリングで幅広く用いられているものであり、データエンジニアリング、データサイエンスチームのプロセスにおける必要性が高まってきています。データプロダクトの価値を生み出すためには、これらはタイムリーな方法でデリバリーされる必要があります。さらには、利用者はこれらのプロダクトの成果の妥当性に自信を持たなくてはなりません。多くのデータエンジニアリング、データサイエンスチームではいまだに手動で多くのプロセスが行われていますが、コードのビルド、テストを自動化することで、開発チームはより高頻度、高信頼にリリースを行うことが可能となります。
継続的インテグレーションは、ソースコードリポジトリのブランチにある程度の頻度でコードをコミットするプラクティスからスタートします。それぞれのコミットは、競合が起きないことが保証されつつも他の開発者とのコミットとマージされます。ビルドを生成し、当該ビルドに対する自動テストを実行することで、さらに変更は検証されます。このプロセスは最終的には、ターゲットの環境、この場合にはDatabricksワークスペースにデプロイされるアーティファクトや開発バンドルにたどり着きます。
典型的なDatabricks CI/CDパイプラインの概要
皆様の要件によって変化しますが、Databricksパイプラインの典型的な設定には、以下のようなステップが含まれます。
継続的インテグレーション
- コード
- Databricksノートブックあるいは外部IDEを用いてコードを開発しユニットテストを実施する。
- 手動でテストを実行する。
- gitブランチにコードとテストをコミットする。
- ビルド
- 新規、更新されたコードとテストを収集する。
- 自動テストを実行する。
- ライブラリ、非ノートブックのApache Sparkコードをビルドする。
- リリース:リリースアーティファクトを生成する。
継続的デリバリー
- デプロイ
- ノートブックをデプロイする。
- ライブラリをデプロイする。
- テスト:自動化テストを実行し結果をレポートする。
- オペレーション:データエンジニアリング、アナリティクス、機械学習ワークフローをプログラムからスケジューリングする。
コードの開発、コミット
CI/CDパイプラインの設計の最初のステップの一つには、現在プロダクション状態にあるコードに対して不可逆的な影響を与えることなしに、新規、更新コードの開発とインテグレーションを管理するためのコードコミット、ブランチ作成の戦略の決定があります。この決定の一部には、お使いのコードを格納し、コードのプロモーションを促進するためのバージョン管理システムの選択が含まれます。DatabricksではGitHub、Bitbucketとのインテグレーションをサポートしており、gitリポジトリにノートブックをコミットすることが可能です。
訳者注:2022/2末時点ではGitLab、Azure DevOps、AWS CodeCommitもサポートしています。
直接的なノートブックとのインテグレーションでお使いのバージョン管理システムがサポートされていない、あるいはセルフサービスのgitインテグレーションよりもさらに柔軟性、コントロールを持ちたいのであれば、ノートブックをエクスポートしローカルマシンからコミットするためにDatabricks CLIを使用することができます。このスクリプトは、適切なリモートリポジトリと同期するようにセットアップされたローカルのgitリポジトリで実行される必要があります。実行すると、このスクリプトは以下の処理を実行します。
- 対象のブランチのチェックアウト
- リモートブランチからの新規の変更のプル
- Databricks CLIを用いてDatabricksワークスペースからノートブックをエクスポート
- ユーザーに対してコミットのメッセージの入力を促し、指定されない場合にはデフォルトのメッセージを使用
- 更新されたノートブックをローカルブランチにコミット
- 変更をリモートブランチにプッシュ
以下のスクリプトがこれらのステップを実行します。
git checkout <branch>
git pull
databricks workspace export_dir --profile <profile> -o <path> ./Workspace
dt=`date '+%Y-%m-%d %H:%M:%S'`
msg_default="DB export on $dt"
read -p "Enter the commit comment [$msg_default]: " msg
msg=${msg:-$msg_default}
echo $msg
git add .
git commit -m "<commit-message>"
git push
DatabricksノートブックではなくIDEでの開発を好むのであれば、モダンなIDEに組み込まれているVCSインテグレーション機能や、コードをコミットするためにgit CLIを使用することができます。
DatabricksはIDEとDatabricksクラスターを接続するSDKであるDatabricks Connectを提供しています。お使いのコードをデプロイすることなしにDatabricksクラスターでユニットコードを実行できるので、これは特にライブラリの開発で有用です。ご自身のユースケースがサポートされているのかを確認するにはDatabricks Connectの制限をご覧ください。
ブランチ戦略とプロモーションプロセスに応じて、CI/CDパイプラインがビルドを開始するタイミングは変化します。しかし、様々なコントリビューターからコミットされるコードは、最終的にはビルド、デプロイされるための専用のブランチにマージされます。ブランチの管理ステップは、バージョン管理システムから提供されるインタフェースを用いてDatabricksの外で実行されます。
お使いのパイプラインを管理、実行するために使うことができる数多くのCI/CDツールが存在しています。本文では、Jenkinsオートメーションサーバーの使い方を説明します。CI/CDはデザインパターンであり、本書で説明されるステップとステージは、それぞれのツールにおけるパイプライン定義言語に応じてある程度の修正を行なって移植する必要があります。さらに、本例のパイプラインにおけるコードの大部分は、他のツールで利用できる標準的なPythonコードを実行します。
エージェントの設定
Jenkinsは、コーディネーションと1対多の実行エージェントに対するマスターサービスを使用します。この例では、Jenkinsサーバーに含まれるデフォルトのパーマネントエージェントノードを使用します。エージェント、この例ではJenkinsサーバーのパイプラインで必要となる以下のツール、パッケージを手動でインストールします。
- Conda: オープンソースPython環境管理システム
- Python 3.7.3: テストの実行、wheelの構築、デプロイメント、デプロイメントスクリプトの実行に用いられます。テストではエージェントで実行されるPythonバージョンとDatabricksクラスターのバージョンと一致する必要があるので、バージョンは重要です。この例では、Python 3.7を含むDatabricksランタイム6.4を使用します。
- Pythonライブラリ:
requests
、databricks-connect
、databricks-cli
、pytest
パイプラインの設計
Jenkinsは、CI/CDパイプラインを作成するためにいくつか異なるプロジェクトタイプを提供します。この例では、Jenkinsパイプラインを実装します。Jenkinsパイプラインは、Jenkinsプラグインの呼び出し、設定をするためにGroovyコードを用いて、パイプラインにおけるステージを定義するためのインタフェースを提供します。
プロジェクトのソースコントロールリポジトリにチェックインされる(Jenkinsfileと呼ばれる)テキストファイルでパイプライン定義を記述します。詳細に関してはJenkins Pipelineを参照ください。こちらがサンプルのパイプラインとなります。
// Jenkinsfile
node {
def GITREPO = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE = "https://github.com/<repo>"
def GITHUBCREDID = "<github-token>"
def CURRENTRELEASE = "<release>"
def DBTOKEN = "<databricks-token>"
def DBURL = "https://<databricks-instance>"
def SCRIPTPATH = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH = "${GITREPO}/Workspace"
def LIBRARYPATH = "${GITREPO}/Libraries"
def BUILDPATH = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH = "/Shared/<path>"
def DBFSPATH = "dbfs:<dbfs-path>"
def CLUSTERID = "<cluster-id>"
def CONDAPATH = "<conda-path>"
def CONDAENV = "<conda-env>"
stage('Setup') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
# Configure Conda environment for deployment & testing
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Configure Databricks CLI for deployment
echo "${DBURL}
$TOKEN" | databricks configure --token
# Configure Databricks Connect for testing
echo "${DBURL}
$TOKEN
${CLUSTERID}
0
15001" | databricks-connect configure
"""
}
}
stage('Checkout') { // for display purposes
echo "Pulling ${CURRENTRELEASE} Branch from Github"
git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}
stage('Run Unit Tests') {
try {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Python tests for libs
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
"""
} catch(err) {
step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
if (currentBuild.result == 'UNSTABLE')
currentBuild.result = 'FAILURE'
throw err
}
}
stage('Package') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Package Python library to wheel
cd ${LIBRARYPATH}/python/dbxdemo
python3 setup.py sdist bdist_wheel
"""
}
stage('Build Artifact') {
sh """mkdir -p ${BUILDPATH}/Workspace
mkdir -p ${BUILDPATH}/Libraries/python
mkdir -p ${BUILDPATH}/Validation/Output
#Get modified files
git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}
# Get packaged libs
find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/
# Generate artifact
tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
"""
archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}
stage('Deploy') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Use Databricks CLI to deploy notebooks
databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}
dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
"""
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
#Get space delimited list of libraries
LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")
#Script to uninstall, reboot if needed & instsall library
python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--libs=\$LIBS\
--dbfspath=${DBFSPATH}
"""
}
}
stage('Run Integration Tests') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--localpath=${NOTEBOOKPATH}/VALIDATION\
--workspacepath=${WORKSPACEPATH}/VALIDATION\
--outfilepath=${OUTFILEPATH}
"""
}
sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
"""
}
stage('Report Test Results') {
sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
touch ${TESTRESULTPATH}/TEST-*.xml
"""
junit "**/reports/junit/*.xml"
}
}
本書の残りでは、パイプラインの各ステップを説明します。
環境変数の定義
異なるパイプラインのパイプラインステージで使用できる環境変数を定義することができます。
-
GITREPO
: gitリポジトリルートのローカルパス -
GITREPOREMOTE
: gitリポジトリのWeb URL -
GITHUBCREDID
: GitHubパーソナルアクセストークンに対するJenkins credential ID -
CURRENTRELEASE
: デプロイメントブランチ -
DBTOKEN
: Databricksパーソナルアクセストークンに対するJenkins credential ID - `DBURL: DatabricksワークスペースのWeb URL
-
SCRIPTPATH
: 自動化スクリプトのgitプロジェクトディレクトリのローカルパス -
NOTEBOOKPATH
: ノートブックのgitプロジェクトディレクトリのローカルパス -
LIBRARYPATH
: ライブラリコード、他のDBFSコードに対するgitプロジェクトディレクトリのローカルパス -
BUILDPATH
: ビルドアーティファクトに対するディレクトリのローカルパス -
OUTFILEPATH
: 自動化テストで生成されたJSON結果ファイルのローカルパス -
TESTRESULTPATH
: Junitテスト結果のサマリーに対するディレクトリのローカルパス -
WORKSPACEPATH
: ノートブックに対するDatabricksワークスペースパス -
DBFSPATH
: ライブラリ、非ノートブックコードに対するDatabricks DBFSパス -
CLUSTERID
: テストを実行するDatabricksクラスターID -
CONDAPATH
: Condaインストレーションへのパス -
CONDAENV
: ビルド依存関係のあるライブラリを含むConda環境名
パイプラインのセットアップ
Setup
ステージにおいては、接続情報を用いてDatabricks CLIとDatabricks Connectを設定します。
def GITREPO = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE = "https://github.com/<repo>"
def GITHUBCREDID = "<github-token>"
def CURRENTRELEASE = "<release>"
def DBTOKEN = "<databricks-token>"
def DBURL = "https://<databricks-instance>"
def SCRIPTPATH = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH = "${GITREPO}/Workspace"
def LIBRARYPATH = "${GITREPO}/Libraries"
def BUILDPATH = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH = "/Shared/<path>"
def DBFSPATH = "dbfs:<dbfs-path>"
def CLUSTERID = "<cluster-id>"
def CONDAPATH = "<conda-path>"
def CONDAENV = "<conda-env>"
stage('Setup') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
# Configure Conda environment for deployment & testing
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Configure Databricks CLI for deployment
echo "${DBURL}
$TOKEN" | databricks configure --token
# Configure Databricks Connect for testing
echo "${DBURL}
$TOKEN
${CLUSTERID}
0
15001" | databricks-connect configure
"""
}
}
最新の変更の取得
Checkout
ステージは指定されたブランチから、Jenkinsプラグインを用いて実行エージェントにコードをダウンロードします。
stage('Checkout') { // for display purposes
echo "Pulling ${CURRENTRELEASE} Branch from Github"
git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}
ユニットテストの開発
お使いのコードに対してどのように、いつユニットテストを行うのかに関してはいくつかのオプションが存在します。Databricksノートブック外で開発されるライブラリコードに対しては、プロセスは従来型のソフトウェア開発プラクティスに似たものになります。PythonのpytestモジュールやJUnitフォーマットのXMLファイルのようなテストフレームワークを用いてユニットテストを記述し、結果を格納します。
Databricksのプロセスは、テストされるコードがDatabricksのSparkクラスターで実行されることを意図したApache Sparkコードか、頻繁にローカルで実行されるかによって変化します。この要件に対応するためにDatabricks Connectを使用します。SDKは事前に設定されているので、Databricksクラスターでテストを実行するためにテウsとコードを変更する必要はありません。あなたはDatabricks ConnectをCondaの仮想環境にインストールしました。Conda環境がアクティベートされるとPythonツール、pytest
を用いてテストが実行され、テストで指定された場所に出力ファイルが生成されます。
Databricks Connectを用いたライブラリコードのテスト
stage('Run Unit Tests') {
try {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Python tests for libs
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
"""
} catch(err) {
step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
if (currentBuild.result == 'UNSTABLE')
currentBuild.result = 'FAILURE'
throw err
}
}
以下のスニペットはDatabricksクラスターにインストールされるライブラリ関数となります。これは、Apache Sparkデータフレームに新規カラムを追加し、リテラルで充当するシンプルな関数となります。
# addcol.py
import pyspark.sql.functions as F
def with_status(df):
return df.withColumn("status", F.lit("checked"))
このテストでは、addcol.py
で定義されるwith_status
関数にモックのデータフレームオブジェクトを引き渡します。そして、結果は期待する値を含むデータフレームオブジェクトと比較されます。値が一致すれば(この場合は一致しますが)テストを通過したことになります。
# test-addcol.py
import pytest
from dbxdemo.spark import get_spark
from dbxdemo.appendcol import with_status
class TestAppendCol(object):
def test_with_status(self):
source_data = [
("paula", "white", "paula.white@example.com"),
("john", "baer", "john.baer@example.com")
]
source_df = get_spark().createDataFrame(
source_data,
["first_name", "last_name", "email"]
)
actual_df = with_status(source_df)
expected_data = [
("paula", "white", "paula.white@example.com", "checked"),
("john", "baer", "john.baer@example.com", "checked")
]
expected_df = get_spark().createDataFrame(
expected_data,
["first_name", "last_name", "email", "status"]
)
assert(expected_df.collect() == actual_df.collect())
ライブラリコードのパッケージング
Package
ステージでは、ライブラリコードをPython wheelにパッケージングします。
stage('Package') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Package Python library to wheel
cd ${LIBRARYPATH}/python/dbxdemo
python3 setup.py sdist bdist_wheel
"""
}
開発アーティファクトの生成、格納
Databricksにおける開発アーティファクトのビルドには、適切なDatabricks環境にデプロイされる全ての新規、更新コードの収集が含まれます。Build Artifact
ステージでは、ワークスペースにデプロイされるノートブックコード、ビルドプロセスで生成されたすべてのwhl
ライブラリ、アーカイブの目的でテストの結果のサマリーが追加されます。これを行うためには、最新のgitマージに含まれる全ての新規ファイルにフラグを立てるためにgit diff
を使用します。これはあくまで一例であり、お使いのパイプラインの実装は異なることがありますが、目的は現在のリリースで意図される全てのファイルを追加するというものです。
stage('Build Artifact') {
sh """mkdir -p ${BUILDPATH}/Workspace
mkdir -p ${BUILDPATH}/Libraries/python
mkdir -p ${BUILDPATH}/Validation/Output
#Get Modified Files
git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}
# Get packaged libs
find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/
# Generate artifact
tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
"""
archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}
アーティファクトのデプロイ
Deploy
ステージでは、Databricks Connectモジュールと同様にお使いのConda環境にインストールされたDatabricks CLIを使用しますので、シェルセッションのためにConda環境をアクティベートする必要があります。ノートブック、ライブラリをそれぞれアップロードするためにWorkspace CLIとDBFS CLIを使用します。
databricks workspace import_dir <local build path> <remote workspace path>
dbfs cp -r <local build path> <remote dbfs path>
stage('Deploy') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Use Databricks CLI to deploy notebooks
databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}
dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
"""
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
#Get space delimited list of libraries
LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")
#Script to uninstall, reboot if needed & instsall library
python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--libs=\$LIBS\
--dbfspath=${DBFSPATH}
"""
}
}
Databricksクラスターに新バージョンのライブラリをインストールするには、既存のライブラリを最初にアンインストールする必要があります。このため、以下のステップを実行するためにPythonスクリプトでDatabricks REST APIを呼び出します。
- ライブラリがインストールされているかどうかを確認
- ライブラリをアンインストール
- アンインストールが行われた場合にはクラスターを再起動
- 次に進む前にクラスターが稼働状態になるまで待機
- ライブラリをインストール
# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time
def main():
workspace = ''
token = ''
clusterid = ''
libs = ''
dbfspath = ''
try:
opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
['workspace=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
except getopt.GetoptError:
print(
'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(
'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
sys.exit()
elif opt in ('-s', '--workspace'):
workspace = arg
elif opt in ('-t', '--token'):
token = arg
elif opt in ('-c', '--clusterid'):
clusterid = arg
elif opt in ('-l', '--libs'):
libs=arg
elif opt in ('-d', '--dbfspath'):
dbfspath=arg
print('-s is ' + workspace)
print('-t is ' + token)
print('-c is ' + clusterid)
print('-l is ' + libs)
print('-d is ' + dbfspath)
libslist = libs.split()
# Uninstall Library if exists on cluster
i=0
for lib in libslist:
dbfslib = dbfspath + lib
print(dbfslib + ' before:' + getLibStatus(workspace, token, clusterid, dbfslib))
if (getLibStatus(workspace, token, clusterid, dbfslib) != 'not found'):
print(dbfslib + " exists. Uninstalling.")
i = i + 1
values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
resp = requests.post(workspace + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))
runjson = resp.text
d = json.loads(runjson)
print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))
# Restart if libraries uninstalled
if i > 0:
values = {'cluster_id': clusterid}
print("Restarting cluster:" + clusterid)
resp = requests.post(workspace + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))
restartjson = resp.text
print(restartjson)
p = 0
waiting = True
while waiting:
time.sleep(30)
clusterresp = requests.get(workspace + '/api/2.0/clusters/get?cluster_id=' + clusterid,
auth=("token", token))
clusterjson = clusterresp.text
jsonout = json.loads(clusterjson)
current_state = jsonout['state']
print(clusterid + " state:" + current_state)
if current_state in ['RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
break
p = p + 1
# Install Libraries
for lib in libslist:
dbfslib = dbfspath + lib
print("Installing " + dbfslib)
values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
resp = requests.post(workspace + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))
runjson = resp.text
d = json.loads(runjson)
print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))
def getLibStatus(workspace, token, clusterid, dbfslib):
resp = requests.get(workspace + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
libjson = resp.text
d = json.loads(libjson)
if (d.get('library_statuses')):
statuses = d['library_statuses']
for status in statuses:
if (status['library'].get('whl')):
if (status['library']['whl'] == dbfslib):
return status['status']
else:
return "not found"
else:
# No libraries found
return "not found"
if __name__ == '__main__':
main()
別のノートブックを使ったノートブックコードのテスト
アーティファクトがデプロイされたら、新規の環境で全てのコードが問題なく動いていることを確認するためにインテグレーションテストを実行することが重要です。このためには、デプロイメントをテストするためのアサートを含むノートブックを実行することができます。このケースでは、ユニットテストで用いたのと同じテストを使用しますが、今回は先ほどクラスターにインストールされたwhl
からappendcol
ライブラリをインポートします。
このテストを自動化し、お使いのCI/CDパイプラインに含めるには、Jenkinsサーバーからノートブックを実行するためにDatabricks REST APIを使用します。これによって、pytest
を用いてノートブックがテストに通過したのか、失敗したのかをチェックすることができるようになります。ノートブックのアサートが失敗した場合、これはREST APIから返却されるJSON出力に表示され、結果としてJUnitのテスト結果に表示されます。
stage('Run Integration Tests') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--localpath=${NOTEBOOKPATH}/VALIDATION\
--workspacepath=${WORKSPACEPATH}/VALIDATION\
--outfilepath=${OUTFILEPATH}
"""
}
sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
"""
}
このステージはえは二つのPython自動化スクリプトをコールしています。最初のスクリプトexecutenotebook.py
は非同期ジョブをサブミットするCreate and trigger a one-time run(POST /jobs/runs/submit
)エンドポイントを用いたノートブックを実行します。このエンドポイントは非同期なので、RESTコールで最初に返却されるジョブIDを用いて、ジョブのステータスをポーリングします。ジョブが完了すると、呼び出し時に関数の引数に指定したパスに出力結果が保存されます。
# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time
def main():
workspace = ''
token = ''
clusterid = ''
localpath = ''
workspacepath = ''
outfilepath = ''
try:
opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
['workspace=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
except getopt.GetoptError:
print(
'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(
'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
sys.exit()
elif opt in ('-s', '--workspace'):
workspace = arg
elif opt in ('-t', '--token'):
token = arg
elif opt in ('-c', '--clusterid'):
clusterid = arg
elif opt in ('-l', '--localpath'):
localpath = arg
elif opt in ('-w', '--workspacepath'):
workspacepath = arg
elif opt in ('-o', '--outfilepath'):
outfilepath = arg
print('-s is ' + workspace)
print('-t is ' + token)
print('-c is ' + clusterid)
print('-l is ' + localpath)
print('-w is ' + workspacepath)
print('-o is ' + outfilepath)
# Generate array from walking local path
notebooks = []
for path, subdirs, files in os.walk(localpath):
for name in files:
fullpath = path + '/' + name
# removes localpath to repo but keeps workspace path
fullworkspacepath = workspacepath + path.replace(localpath, '')
name, file_extension = os.path.splitext(fullpath)
if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
row = [fullpath, fullworkspacepath, 1]
notebooks.append(row)
# run each element in list
for notebook in notebooks:
nameonly = os.path.basename(notebook[0])
workspacepath = notebook[1]
name, file_extension = os.path.splitext(nameonly)
# workpath removes extension
fullworkspacepath = workspacepath + '/' + name
print('Running job for:' + fullworkspacepath)
values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}
resp = requests.post(workspace + '/api/2.0/jobs/runs/submit',
data=json.dumps(values), auth=("token", token))
runjson = resp.text
print("runjson:" + runjson)
d = json.loads(runjson)
runid = d['run_id']
i=0
waiting = True
while waiting:
time.sleep(10)
jobresp = requests.get(workspace + '/api/2.0/jobs/runs/get?run_id='+str(runid),
data=json.dumps(values), auth=("token", token))
jobjson = jobresp.text
print("jobjson:" + jobjson)
j = json.loads(jobjson)
current_state = j['state']['life_cycle_state']
runid = j['run_id']
if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
break
i=i+1
if outfilepath != '':
file = open(outfilepath + '/' + str(runid) + '.json', 'w')
file.write(json.dumps(j))
file.close()
if __name__ == '__main__':
main()
2つ目のスクリプトevaluatenotebookruns.py
は、ノートブックのアサート文が成功したのか、失敗したのかを判断するためにJSONをパースし、評価するtest_job_run
関数を定義します。追加のテストtest_performance
は予想するよりも処理に時間を要するテストを捕捉します。
# evaluatenotebookruns.py
import unittest
import json
import glob
import os
class TestJobOutput(unittest.TestCase):
test_output_path = '#ENV#'
def test_performance(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
data = json.load(open(filename))
duration = data['execution_duration']
if duration > 100000:
status = 'FAILED'
else:
status = 'SUCCESS'
statuses.append(status)
self.assertFalse('FAILED' in statuses)
def test_job_run(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
data = json.load(open(filename))
status = data['state']['result_state']
statuses.append(status)
self.assertFalse('FAILED' in statuses)
if __name__ == '__main__':
unittest.main()
ユニッテストのステージで見たように、テストを実行し、結果のサマリーを生成するためにpytest
を使用します。
テスト結果の公開
Jenkinsプラグインjunit
を用いて、JSONの結果はアーカイブされテスト結果が公開されます。これによって、ビルドプロセスに関連するレポート、ダッシュボードを可視化することができます。
stage('Report Test Results') {
sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
touch ${TESTRESULTPATH}/TEST-*.xml
"""
junit "**/reports/junit/*.xml"
}
この時点で、CI/CDパイプラインはインテグレーション、デプロイメントサイクルを完了しました。このプロセスを自動化することで、効率的で一貫性があり、再現可能なプロセスを用いてお使いのコードがテストされ、デプロイされることを保証することができます。