1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Jenkinsを用いたDatabricksにおけるCI/CD

Posted at

Continuous integration and delivery on Databricks using Jenkins | Databricks on AWS [2021/10/14時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

継続的インテグレーション、継続的デリバリー(CI/CD)は、自動化パイプラインの活用による、短期間かつ高頻度なサイクルでソフトウェアを開発、デリバリーするプロセスに関するものです。これは新しいプロセスではなく、数十年もの間従来型のソフトウェアエンジニアリングで幅広く用いられているものであり、データエンジニアリング、データサイエンスチームのプロセスにおける必要性が高まってきています。データプロダクトの価値を生み出すためには、これらはタイムリーな方法でデリバリーされる必要があります。さらには、利用者はこれらのプロダクトの成果の妥当性に自信を持たなくてはなりません。多くのデータエンジニアリング、データサイエンスチームではいまだに手動で多くのプロセスが行われていますが、コードのビルド、テストを自動化することで、開発チームはより高頻度、高信頼にリリースを行うことが可能となります。

継続的インテグレーションは、ソースコードリポジトリのブランチにある程度の頻度でコードをコミットするプラクティスからスタートします。それぞれのコミットは、競合が起きないことが保証されつつも他の開発者とのコミットとマージされます。ビルドを生成し、当該ビルドに対する自動テストを実行することで、さらに変更は検証されます。このプロセスは最終的には、ターゲットの環境、この場合にはDatabricksワークスペースにデプロイされるアーティファクトや開発バンドルにたどり着きます。

典型的なDatabricks CI/CDパイプラインの概要

皆様の要件によって変化しますが、Databricksパイプラインの典型的な設定には、以下のようなステップが含まれます。

継続的インテグレーション

  1. コード
    1. Databricksノートブックあるいは外部IDEを用いてコードを開発しユニットテストを実施する。
    2. 手動でテストを実行する。
    3. gitブランチにコードとテストをコミットする。
  2. ビルド
    1. 新規、更新されたコードとテストを収集する。
    2. 自動テストを実行する。
    3. ライブラリ、非ノートブックのApache Sparkコードをビルドする。
  3. リリース:リリースアーティファクトを生成する。

継続的デリバリー

  1. デプロイ
    1. ノートブックをデプロイする。
    2. ライブラリをデプロイする。
  2. テスト:自動化テストを実行し結果をレポートする。
  3. オペレーション:データエンジニアリング、アナリティクス、機械学習ワークフローをプログラムからスケジューリングする。

コードの開発、コミット

CI/CDパイプラインの設計の最初のステップの一つには、現在プロダクション状態にあるコードに対して不可逆的な影響を与えることなしに、新規、更新コードの開発とインテグレーションを管理するためのコードコミット、ブランチ作成の戦略の決定があります。この決定の一部には、お使いのコードを格納し、コードのプロモーションを促進するためのバージョン管理システムの選択が含まれます。DatabricksではGitHub、Bitbucketとのインテグレーションをサポートしており、gitリポジトリにノートブックをコミットすることが可能です。

訳者注:2022/2末時点ではGitLab、Azure DevOps、AWS CodeCommitもサポートしています。

直接的なノートブックとのインテグレーションでお使いのバージョン管理システムがサポートされていない、あるいはセルフサービスのgitインテグレーションよりもさらに柔軟性、コントロールを持ちたいのであれば、ノートブックをエクスポートしローカルマシンからコミットするためにDatabricks CLIを使用することができます。このスクリプトは、適切なリモートリポジトリと同期するようにセットアップされたローカルのgitリポジトリで実行される必要があります。実行すると、このスクリプトは以下の処理を実行します。

  1. 対象のブランチのチェックアウト
  2. リモートブランチからの新規の変更のプル
  3. Databricks CLIを用いてDatabricksワークスペースからノートブックをエクスポート
  4. ユーザーに対してコミットのメッセージの入力を促し、指定されない場合にはデフォルトのメッセージを使用
  5. 更新されたノートブックをローカルブランチにコミット
  6. 変更をリモートブランチにプッシュ

以下のスクリプトがこれらのステップを実行します。

Bash
git checkout <branch>
git pull
databricks workspace export_dir --profile <profile> -o <path> ./Workspace

dt=`date '+%Y-%m-%d %H:%M:%S'`
msg_default="DB export on $dt"
read -p "Enter the commit comment [$msg_default]: " msg
msg=${msg:-$msg_default}
echo $msg

git add .
git commit -m "<commit-message>"
git push

DatabricksノートブックではなくIDEでの開発を好むのであれば、モダンなIDEに組み込まれているVCSインテグレーション機能や、コードをコミットするためにgit CLIを使用することができます。

DatabricksはIDEとDatabricksクラスターを接続するSDKであるDatabricks Connectを提供しています。お使いのコードをデプロイすることなしにDatabricksクラスターでユニットコードを実行できるので、これは特にライブラリの開発で有用です。ご自身のユースケースがサポートされているのかを確認するにはDatabricks Connectの制限をご覧ください。

ブランチ戦略とプロモーションプロセスに応じて、CI/CDパイプラインがビルドを開始するタイミングは変化します。しかし、様々なコントリビューターからコミットされるコードは、最終的にはビルド、デプロイされるための専用のブランチにマージされます。ブランチの管理ステップは、バージョン管理システムから提供されるインタフェースを用いてDatabricksの外で実行されます。

お使いのパイプラインを管理、実行するために使うことができる数多くのCI/CDツールが存在しています。本文では、Jenkinsオートメーションサーバーの使い方を説明します。CI/CDはデザインパターンであり、本書で説明されるステップとステージは、それぞれのツールにおけるパイプライン定義言語に応じてある程度の修正を行なって移植する必要があります。さらに、本例のパイプラインにおけるコードの大部分は、他のツールで利用できる標準的なPythonコードを実行します。

エージェントの設定

Jenkinsは、コーディネーションと1対多の実行エージェントに対するマスターサービスを使用します。この例では、Jenkinsサーバーに含まれるデフォルトのパーマネントエージェントノードを使用します。エージェント、この例ではJenkinsサーバーのパイプラインで必要となる以下のツール、パッケージを手動でインストールします。

  • Conda: オープンソースPython環境管理システム
  • Python 3.7.3: テストの実行、wheelの構築、デプロイメント、デプロイメントスクリプトの実行に用いられます。テストではエージェントで実行されるPythonバージョンとDatabricksクラスターのバージョンと一致する必要があるので、バージョンは重要です。この例では、Python 3.7を含むDatabricksランタイム6.4を使用します。
  • Pythonライブラリ:requestsdatabricks-connectdatabricks-clipytest

パイプラインの設計

Jenkinsは、CI/CDパイプラインを作成するためにいくつか異なるプロジェクトタイプを提供します。この例では、Jenkinsパイプラインを実装します。Jenkinsパイプラインは、Jenkinsプラグインの呼び出し、設定をするためにGroovyコードを用いて、パイプラインにおけるステージを定義するためのインタフェースを提供します。

プロジェクトのソースコントロールリポジトリにチェックインされる(Jenkinsfileと呼ばれる)テキストファイルでパイプライン定義を記述します。詳細に関してはJenkins Pipelineを参照ください。こちらがサンプルのパイプラインとなります。

Gloovy
// Jenkinsfile
node {
  def GITREPO         = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
  def GITREPOREMOTE   = "https://github.com/<repo>"
  def GITHUBCREDID    = "<github-token>"
  def CURRENTRELEASE  = "<release>"
  def DBTOKEN         = "<databricks-token>"
  def DBURL           = "https://<databricks-instance>"
  def SCRIPTPATH      = "${GITREPO}/Automation/Deployments"
  def NOTEBOOKPATH    = "${GITREPO}/Workspace"
  def LIBRARYPATH     = "${GITREPO}/Libraries"
  def BUILDPATH       = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
  def OUTFILEPATH     = "${BUILDPATH}/Validation/Output"
  def TESTRESULTPATH  = "${BUILDPATH}/Validation/reports/junit"
  def WORKSPACEPATH   = "/Shared/<path>"
  def DBFSPATH        = "dbfs:<dbfs-path>"
  def CLUSTERID       = "<cluster-id>"
  def CONDAPATH       = "<conda-path>"
  def CONDAENV        = "<conda-env>"

  stage('Setup') {
      withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """#!/bin/bash
            # Configure Conda environment for deployment & testing
            source ${CONDAPATH}/bin/activate ${CONDAENV}

            # Configure Databricks CLI for deployment
            echo "${DBURL}
            $TOKEN" | databricks configure --token

            # Configure Databricks Connect for testing
            echo "${DBURL}
            $TOKEN
            ${CLUSTERID}
            0
            15001" | databricks-connect configure
           """
      }
  }
  stage('Checkout') { // for display purposes
    echo "Pulling ${CURRENTRELEASE} Branch from Github"
    git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
  }
  stage('Run Unit Tests') {
    try {
        sh """#!/bin/bash

              # Enable Conda environment for tests
              source ${CONDAPATH}/bin/activate ${CONDAENV}

              # Python tests for libs
              python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
           """
    } catch(err) {
      step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
      if (currentBuild.result == 'UNSTABLE')
        currentBuild.result = 'FAILURE'
      throw err
    }
  }
  stage('Package') {
    sh """#!/bin/bash

          # Enable Conda environment for tests
          source ${CONDAPATH}/bin/activate ${CONDAENV}

          # Package Python library to wheel
          cd ${LIBRARYPATH}/python/dbxdemo
          python3 setup.py sdist bdist_wheel
       """
  }
  stage('Build Artifact') {
    sh """mkdir -p ${BUILDPATH}/Workspace
          mkdir -p ${BUILDPATH}/Libraries/python
          mkdir -p ${BUILDPATH}/Validation/Output
          #Get modified files
          git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}

          # Get packaged libs
          find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/

          # Generate artifact
          tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
       """
    archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
  }
  stage('Deploy') {
    sh """#!/bin/bash
          # Enable Conda environment for tests
          source ${CONDAPATH}/bin/activate ${CONDAENV}

          # Use Databricks CLI to deploy notebooks
          databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}

          dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
       """
    withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """#!/bin/bash

              #Get space delimited list of libraries
              LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")

              #Script to uninstall, reboot if needed & instsall library
              python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
                        --token=$TOKEN\
                        --clusterid=${CLUSTERID}\
                        --libs=\$LIBS\
                        --dbfspath=${DBFSPATH}
           """
    }
  }
  stage('Run Integration Tests') {
    withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
                        --token=$TOKEN\
                        --clusterid=${CLUSTERID}\
                        --localpath=${NOTEBOOKPATH}/VALIDATION\
                        --workspacepath=${WORKSPACEPATH}/VALIDATION\
                        --outfilepath=${OUTFILEPATH}
           """
    }
    sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
          python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
       """
  }
  stage('Report Test Results') {
    sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
          touch ${TESTRESULTPATH}/TEST-*.xml
       """
    junit "**/reports/junit/*.xml"
  }
}

本書の残りでは、パイプラインの各ステップを説明します。

環境変数の定義

異なるパイプラインのパイプラインステージで使用できる環境変数を定義することができます。

  • GITREPO: gitリポジトリルートのローカルパス
  • GITREPOREMOTE: gitリポジトリのWeb URL
  • GITHUBCREDID: GitHubパーソナルアクセストークンに対するJenkins credential ID
  • CURRENTRELEASE: デプロイメントブランチ
  • DBTOKEN: Databricksパーソナルアクセストークンに対するJenkins credential ID
  • `DBURL: DatabricksワークスペースのWeb URL
  • SCRIPTPATH: 自動化スクリプトのgitプロジェクトディレクトリのローカルパス
  • NOTEBOOKPATH: ノートブックのgitプロジェクトディレクトリのローカルパス
  • LIBRARYPATH: ライブラリコード、他のDBFSコードに対するgitプロジェクトディレクトリのローカルパス
  • BUILDPATH: ビルドアーティファクトに対するディレクトリのローカルパス
  • OUTFILEPATH: 自動化テストで生成されたJSON結果ファイルのローカルパス
  • TESTRESULTPATH: Junitテスト結果のサマリーに対するディレクトリのローカルパス
  • WORKSPACEPATH: ノートブックに対するDatabricksワークスペースパス
  • DBFSPATH: ライブラリ、非ノートブックコードに対するDatabricks DBFSパス
  • CLUSTERID: テストを実行するDatabricksクラスターID
  • CONDAPATH: Condaインストレーションへのパス
  • CONDAENV: ビルド依存関係のあるライブラリを含むConda環境名

パイプラインのセットアップ

Setupステージにおいては、接続情報を用いてDatabricks CLIとDatabricks Connectを設定します。

Groovy
def GITREPO         = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE   = "https://github.com/<repo>"
def GITHUBCREDID    = "<github-token>"
def CURRENTRELEASE  = "<release>"
def DBTOKEN         = "<databricks-token>"
def DBURL           = "https://<databricks-instance>"
def SCRIPTPATH      = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH    = "${GITREPO}/Workspace"
def LIBRARYPATH     = "${GITREPO}/Libraries"
def BUILDPATH       = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH     = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH  = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH   = "/Shared/<path>"
def DBFSPATH        = "dbfs:<dbfs-path>"
def CLUSTERID       = "<cluster-id>"
def CONDAPATH       = "<conda-path>"
def CONDAENV        = "<conda-env>"


stage('Setup') {
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
  sh """#!/bin/bash
      # Configure Conda environment for deployment & testing
      source ${CONDAPATH}/bin/activate ${CONDAENV}

      # Configure Databricks CLI for deployment
      echo "${DBURL}
      $TOKEN" | databricks configure --token

      # Configure Databricks Connect for testing
      echo "${DBURL}
      $TOKEN
      ${CLUSTERID}
      0
      15001" | databricks-connect configure
     """
  }
}

最新の変更の取得

Checkoutステージは指定されたブランチから、Jenkinsプラグインを用いて実行エージェントにコードをダウンロードします。

Groovy
stage('Checkout') { // for display purposes
  echo "Pulling ${CURRENTRELEASE} Branch from Github"
  git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}

ユニットテストの開発

お使いのコードに対してどのように、いつユニットテストを行うのかに関してはいくつかのオプションが存在します。Databricksノートブック外で開発されるライブラリコードに対しては、プロセスは従来型のソフトウェア開発プラクティスに似たものになります。PythonのpytestモジュールやJUnitフォーマットのXMLファイルのようなテストフレームワークを用いてユニットテストを記述し、結果を格納します。

Databricksのプロセスは、テストされるコードがDatabricksのSparkクラスターで実行されることを意図したApache Sparkコードか、頻繁にローカルで実行されるかによって変化します。この要件に対応するためにDatabricks Connectを使用します。SDKは事前に設定されているので、Databricksクラスターでテストを実行するためにテウsとコードを変更する必要はありません。あなたはDatabricks ConnectをCondaの仮想環境にインストールしました。Conda環境がアクティベートされるとPythonツール、pytestを用いてテストが実行され、テストで指定された場所に出力ファイルが生成されます。

Databricks Connectを用いたライブラリコードのテスト

Python
stage('Run Unit Tests') {
  try {
      sh """#!/bin/bash
         # Enable Conda environment for tests
         source ${CONDAPATH}/bin/activate ${CONDAENV}

         # Python tests for libs
         python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
         """
  } catch(err) {
    step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
    if (currentBuild.result == 'UNSTABLE')
      currentBuild.result = 'FAILURE'
    throw err
  }
}

以下のスニペットはDatabricksクラスターにインストールされるライブラリ関数となります。これは、Apache Sparkデータフレームに新規カラムを追加し、リテラルで充当するシンプルな関数となります。

Python
# addcol.py
import pyspark.sql.functions as F

def with_status(df):
    return df.withColumn("status", F.lit("checked"))

このテストでは、addcol.pyで定義されるwith_status関数にモックのデータフレームオブジェクトを引き渡します。そして、結果は期待する値を含むデータフレームオブジェクトと比較されます。値が一致すれば(この場合は一致しますが)テストを通過したことになります。

Python
# test-addcol.py
import pytest


from dbxdemo.spark import get_spark
from dbxdemo.appendcol import with_status


class TestAppendCol(object):

  def test_with_status(self):
    source_data = [
        ("paula", "white", "paula.white@example.com"),
        ("john", "baer", "john.baer@example.com")
    ]
    source_df = get_spark().createDataFrame(
        source_data,
        ["first_name", "last_name", "email"]
    )

    actual_df = with_status(source_df)

    expected_data = [
        ("paula", "white", "paula.white@example.com", "checked"),
        ("john", "baer", "john.baer@example.com", "checked")
    ]
    expected_df = get_spark().createDataFrame(
        expected_data,
        ["first_name", "last_name", "email", "status"]
    )

    assert(expected_df.collect() == actual_df.collect())

ライブラリコードのパッケージング

Packageステージでは、ライブラリコードをPython wheelにパッケージングします。

Groovy
stage('Package') {
  sh """#!/bin/bash
      # Enable Conda environment for tests
      source ${CONDAPATH}/bin/activate ${CONDAENV}

      # Package Python library to wheel
      cd ${LIBRARYPATH}/python/dbxdemo
      python3 setup.py sdist bdist_wheel
     """
}

開発アーティファクトの生成、格納

Databricksにおける開発アーティファクトのビルドには、適切なDatabricks環境にデプロイされる全ての新規、更新コードの収集が含まれます。Build Artifactステージでは、ワークスペースにデプロイされるノートブックコード、ビルドプロセスで生成されたすべてのwhlライブラリ、アーカイブの目的でテストの結果のサマリーが追加されます。これを行うためには、最新のgitマージに含まれる全ての新規ファイルにフラグを立てるためにgit diffを使用します。これはあくまで一例であり、お使いのパイプラインの実装は異なることがありますが、目的は現在のリリースで意図される全てのファイルを追加するというものです。

Groovy
stage('Build Artifact') {
  sh """mkdir -p ${BUILDPATH}/Workspace
        mkdir -p ${BUILDPATH}/Libraries/python
        mkdir -p ${BUILDPATH}/Validation/Output
        #Get Modified Files
        git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}

        # Get packaged libs
        find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/

        # Generate artifact
        tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
     """
  archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}

アーティファクトのデプロイ

Deployステージでは、Databricks Connectモジュールと同様にお使いのConda環境にインストールされたDatabricks CLIを使用しますので、シェルセッションのためにConda環境をアクティベートする必要があります。ノートブック、ライブラリをそれぞれアップロードするためにWorkspace CLIとDBFS CLIを使用します。

Bash
databricks workspace import_dir <local build path> <remote workspace path>
dbfs cp -r <local build path> <remote dbfs path>
Groovy
stage('Deploy') {
  sh """#!/bin/bash
        # Enable Conda environment for tests
        source ${CONDAPATH}/bin/activate ${CONDAENV}

        # Use Databricks CLI to deploy notebooks
        databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}

        dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
     """
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
    sh """#!/bin/bash

        #Get space delimited list of libraries
        LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")

        #Script to uninstall, reboot if needed & instsall library
        python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
                  --token=$TOKEN\
                  --clusterid=${CLUSTERID}\
                  --libs=\$LIBS\
                  --dbfspath=${DBFSPATH}
       """
  }
}

Databricksクラスターに新バージョンのライブラリをインストールするには、既存のライブラリを最初にアンインストールする必要があります。このため、以下のステップを実行するためにPythonスクリプトでDatabricks REST APIを呼び出します。

  1. ライブラリがインストールされているかどうかを確認
  2. ライブラリをアンインストール
  3. アンインストールが行われた場合にはクラスターを再起動
    1. 次に進む前にクラスターが稼働状態になるまで待機
  4. ライブラリをインストール
Python
# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time

def main():
  workspace = ''
  token = ''
  clusterid = ''
  libs = ''
  dbfspath = ''

  try:
      opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
                                 ['workspace=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
  except getopt.GetoptError:
      print(
          'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
      sys.exit(2)

  for opt, arg in opts:
      if opt == '-h':
          print(
              'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
          sys.exit()
      elif opt in ('-s', '--workspace'):
          workspace = arg
      elif opt in ('-t', '--token'):
          token = arg
      elif opt in ('-c', '--clusterid'):
          clusterid = arg
      elif opt in ('-l', '--libs'):
          libs=arg
      elif opt in ('-d', '--dbfspath'):
          dbfspath=arg

  print('-s is ' + workspace)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + libs)
  print('-d is ' + dbfspath)

  libslist = libs.split()

  # Uninstall Library if exists on cluster
  i=0
  for lib in libslist:
      dbfslib = dbfspath + lib
      print(dbfslib + ' before:' + getLibStatus(workspace, token, clusterid, dbfslib))

      if (getLibStatus(workspace, token, clusterid, dbfslib) != 'not found'):
          print(dbfslib + " exists. Uninstalling.")
          i = i + 1
          values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}

          resp = requests.post(workspace + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))
          runjson = resp.text
          d = json.loads(runjson)
          print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))

  # Restart if libraries uninstalled
  if i > 0:
      values = {'cluster_id': clusterid}
      print("Restarting cluster:" + clusterid)
      resp = requests.post(workspace + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))
      restartjson = resp.text
      print(restartjson)

      p = 0
      waiting = True
      while waiting:
          time.sleep(30)
          clusterresp = requests.get(workspace + '/api/2.0/clusters/get?cluster_id=' + clusterid,
                                 auth=("token", token))
          clusterjson = clusterresp.text
          jsonout = json.loads(clusterjson)
          current_state = jsonout['state']
          print(clusterid + " state:" + current_state)
          if current_state in ['RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
              break
          p = p + 1

  # Install Libraries
  for lib in libslist:
      dbfslib = dbfspath + lib
      print("Installing " + dbfslib)
      values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}

      resp = requests.post(workspace + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))
      runjson = resp.text
      d = json.loads(runjson)
      print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))


def getLibStatus(workspace, token, clusterid, dbfslib):
  resp = requests.get(workspace + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
  libjson = resp.text
  d = json.loads(libjson)
  if (d.get('library_statuses')):
      statuses = d['library_statuses']

      for status in statuses:
          if (status['library'].get('whl')):
              if (status['library']['whl'] == dbfslib):
                  return status['status']
              else:
                  return "not found"
  else:
      # No libraries found
      return "not found"

if __name__ == '__main__':
  main()

別のノートブックを使ったノートブックコードのテスト

アーティファクトがデプロイされたら、新規の環境で全てのコードが問題なく動いていることを確認するためにインテグレーションテストを実行することが重要です。このためには、デプロイメントをテストするためのアサートを含むノートブックを実行することができます。このケースでは、ユニットテストで用いたのと同じテストを使用しますが、今回は先ほどクラスターにインストールされたwhlからappendcolライブラリをインポートします。

このテストを自動化し、お使いのCI/CDパイプラインに含めるには、Jenkinsサーバーからノートブックを実行するためにDatabricks REST APIを使用します。これによって、pytestを用いてノートブックがテストに通過したのか、失敗したのかをチェックすることができるようになります。ノートブックのアサートが失敗した場合、これはREST APIから返却されるJSON出力に表示され、結果としてJUnitのテスト結果に表示されます。

Groovy
stage('Run Integration Tests') {
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
      sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
                      --token=$TOKEN\
                      --clusterid=${CLUSTERID}\
                      --localpath=${NOTEBOOKPATH}/VALIDATION\
                      --workspacepath=${WORKSPACEPATH}/VALIDATION\
                      --outfilepath=${OUTFILEPATH}
         """
  }
  sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
        python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
     """
}

このステージはえは二つのPython自動化スクリプトをコールしています。最初のスクリプトexecutenotebook.pyは非同期ジョブをサブミットするCreate and trigger a one-time run(POST /jobs/runs/submit)エンドポイントを用いたノートブックを実行します。このエンドポイントは非同期なので、RESTコールで最初に返却されるジョブIDを用いて、ジョブのステータスをポーリングします。ジョブが完了すると、呼び出し時に関数の引数に指定したパスに出力結果が保存されます。

Python
# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time


def main():
  workspace = ''
  token = ''
  clusterid = ''
  localpath = ''
  workspacepath = ''
  outfilepath = ''

  try:
      opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
                                 ['workspace=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
  except getopt.GetoptError:
      print(
          'executenotebook.py -s <workspace> -t <token>  -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
      sys.exit(2)

  for opt, arg in opts:
      if opt == '-h':
          print(
              'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
          sys.exit()
      elif opt in ('-s', '--workspace'):
          workspace = arg
      elif opt in ('-t', '--token'):
          token = arg
      elif opt in ('-c', '--clusterid'):
          clusterid = arg
      elif opt in ('-l', '--localpath'):
          localpath = arg
      elif opt in ('-w', '--workspacepath'):
          workspacepath = arg
      elif opt in ('-o', '--outfilepath'):
          outfilepath = arg

  print('-s is ' + workspace)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + localpath)
  print('-w is ' + workspacepath)
  print('-o is ' + outfilepath)
  # Generate array from walking local path

  notebooks = []
  for path, subdirs, files in os.walk(localpath):
      for name in files:
          fullpath = path + '/' + name
          # removes localpath to repo but keeps workspace path
          fullworkspacepath = workspacepath + path.replace(localpath, '')

          name, file_extension = os.path.splitext(fullpath)
          if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
              row = [fullpath, fullworkspacepath, 1]
              notebooks.append(row)

  # run each element in list
  for notebook in notebooks:
      nameonly = os.path.basename(notebook[0])
      workspacepath = notebook[1]

      name, file_extension = os.path.splitext(nameonly)

      # workpath removes extension
      fullworkspacepath = workspacepath + '/' + name

      print('Running job for:' + fullworkspacepath)
      values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}

      resp = requests.post(workspace + '/api/2.0/jobs/runs/submit',
                           data=json.dumps(values), auth=("token", token))
      runjson = resp.text
      print("runjson:" + runjson)
      d = json.loads(runjson)
      runid = d['run_id']

      i=0
      waiting = True
      while waiting:
          time.sleep(10)
          jobresp = requests.get(workspace + '/api/2.0/jobs/runs/get?run_id='+str(runid),
                           data=json.dumps(values), auth=("token", token))
          jobjson = jobresp.text
          print("jobjson:" + jobjson)
          j = json.loads(jobjson)
          current_state = j['state']['life_cycle_state']
          runid = j['run_id']
          if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
              break
          i=i+1

      if outfilepath != '':
          file = open(outfilepath + '/' +  str(runid) + '.json', 'w')
          file.write(json.dumps(j))
          file.close()

if __name__ == '__main__':
  main()

2つ目のスクリプトevaluatenotebookruns.pyは、ノートブックのアサート文が成功したのか、失敗したのかを判断するためにJSONをパースし、評価するtest_job_run関数を定義します。追加のテストtest_performanceは予想するよりも処理に時間を要するテストを捕捉します。

Python
# evaluatenotebookruns.py
import unittest
import json
import glob
import os

class TestJobOutput(unittest.TestCase):

  test_output_path = '#ENV#'

  def test_performance(self):
      path = self.test_output_path
      statuses = []

      for filename in glob.glob(os.path.join(path, '*.json')):
          print('Evaluating: ' + filename)
          data = json.load(open(filename))
          duration = data['execution_duration']
          if duration > 100000:
              status = 'FAILED'
          else:
              status = 'SUCCESS'

          statuses.append(status)

      self.assertFalse('FAILED' in statuses)


  def test_job_run(self):
      path = self.test_output_path
      statuses = []


      for filename in glob.glob(os.path.join(path, '*.json')):
          print('Evaluating: ' + filename)
          data = json.load(open(filename))
          status = data['state']['result_state']
          statuses.append(status)

      self.assertFalse('FAILED' in statuses)

if __name__ == '__main__':
  unittest.main()

ユニッテストのステージで見たように、テストを実行し、結果のサマリーを生成するためにpytestを使用します。

テスト結果の公開

Jenkinsプラグインjunitを用いて、JSONの結果はアーカイブされテスト結果が公開されます。これによって、ビルドプロセスに関連するレポート、ダッシュボードを可視化することができます。

Groovy
stage('Report Test Results') {
  sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
        touch ${TESTRESULTPATH}/TEST-*.xml
     """
  junit "**/reports/junit/*.xml"
}

この時点で、CI/CDパイプラインはインテグレーション、デプロイメントサイクルを完了しました。このプロセスを自動化することで、効率的で一貫性があり、再現可能なプロセスを用いてお使いのコードがテストされ、デプロイされることを保証することができます。

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?