LoginSignup
6
1

More than 1 year has passed since last update.

databricks と pytest の integration を試してみる

Last updated at Posted at 2022-05-07

はじめに

こちらに pytestdatabricksの連携するサンプルコードがあったので、実際に試しながら動きを確認してみました。

(追記)
GitHub Actionと連携させた記事はこちらです。
https://qiita.com/maroon-db/items/48668d5f4c4b65ab6c1d

実行内容

image.png

今回は上図のような動作環境で実行します。

  1. 予めメインコード(run_tests.py)とWheelライブラリをDBFS上にアップロード(コピー)しておきます。
  2. JOB APIを利用して、アップロードしたコードを実行します。
  3. 最後に結果を確認

環境準備

今回のサンプルコードをクローンしておきます。

git clone https://github.com/amesar/databricks-tests.git

Databricks Workspace and Authentication

DBFS上にファイルをアップロード(コピー)するため、databricks-cliをローカルPCにインストールし、対象ワークスペースの構成をセットアップします。databricks-cli のインストール&セットアップ詳細は@ktmrmshkさんの記事をご覧ください。

Conda環境の作成

wheelパッケージ作成のためconda環境を作成しdatabricks-testsをアクティベートします。(conda.yaml)

conda env create --file conda.yaml
conda activate databricks-tests.   ## conda.yaml ファイルに環境名(databricks-tests)が記載

以上でローカルPC側の環境準備は終了です。
それでは早速コードを実行していきましょう。

コード内容の確認

JOB実行内容 (run_submit)

まずは、JOB APIを利用して実行する内容を確認しておきます。 run_submit.json.templateをコピーして、{DBFS_DIR}の箇所を全て自分で指定するDirecotoryに書き換えます。(今回は、dbfs:/tmp/jmaru/jobs/myappに変更)
あと、クラスターの spark_versionも適宜最新版に変更しておきます。

以下を見てみると、libraryとしてdatabricks_tests_example-0.0.1-py3-none-any.whlというwheelファイルと、タスクとしてrun_tests.pyが必要であることがわかります。

これらを予めDBFSの指定したDirectory上にアップロードする必要がありそうです。

run_submit.json
{
  "run_name": "jmaru_DatabricksTestsExample",
  "new_cluster": {
    "spark_version": "10.4.x-scala2.12",
    "node_type_id": "i3.xlarge",
    "num_workers": 1
  },
  "timeout_seconds": 3600,
  "libraries": [
    { "pypi": { "package": "pytest==5.3.5" } },
    { "whl": "dbfs:/tmp/jmaru/jobs/myapp/databricks_tests_example-0.0.1-py3-none-any.whl" }
  ],
  "spark_python_task": {
    "python_file": "dbfs:/tmp/jmaru/jobs/myapp/run_tests.py",
    "parameters": [ "--pyargs", "tests", "-s", "-v", "--junit-xml", "dbfs:/tmp/jmaru/jobs/myapp/junit.xml" ]
  }
}

Wheelライブラリ(databricks_tests_example-0.0.1-py3-none-any.whl)

pytestのテストコードや、チェック対象となるコードは予めWheelパッケージとしてまとめておきます。Wheelを作成するため、example Directoryを見ると以下のような構成で配置してあります。

example/
  +-- setup.py
  +-- myapp/
       |-- __init.py__
       |-- processor.py
  +-- tests/
       |-- __init.py__
       |-- test_simple.py
       |-- test_spark.py

それぞれのコードの内容はこちらです。
まずは、チェック対象となるコードはこちらです。今回はシンプルに'num'列の値をn倍する関数になります。

processor.py
from pyspark.sql.functions import col

def multiply(df, n):
    return df.withColumn("num", col("num") * n)

pytestでテストするコードは2つあります。一つはそのままpassを返しますが、下のtest_spark.pyは計算して結果があっているかチェックしてくれます。

test_simple.py
import pytest

class TestSimple():
    def test_simple(self):
        pass
test_spark.py
from pyspark.sql import SparkSession
import pytest
from myapp import processor

spark = SparkSession.builder.getOrCreate()

class TestSpark():
    def test_multiply(self):
        df = spark.createDataFrame([[1], [2], [3]], [ "num" ] )
        df = processor.multiply(df, 2)
        df2 = spark.createDataFrame([[2], [4], [6]], [ "num" ] )
        assert df.collect() == df2.collect()

Wheelを作成するためのsetup.pyの中身はこちらです。

setup.py
from setuptools import setup

setup(name='databricks_tests_example',
      version='0.0.1',
      description='databricks-tests-example',
      author='Andre',
      packages=[
        'myapp',
        'tests'
        ],
      zip_safe=False)

メインコード(run_tests.py)

メインコードである、run_tests.pyはこちらです。 単純に読み込んだpytestのwheelライブラリを実行して結果を表示します。run_submitの引数で結果の出力先のDBFSに結果(junit.xml)が出力されます。

run_tests.py
import os
import sys

if __name__ == "__main__":
    import pytest
    args = [ arg.replace("dbfs:","/dbfs") for arg in sys.argv[1:] ]
    print("run_tests: args:",args)
    errcode = pytest.main(args)
    print("run_tests: errcode:",errcode)
    if errcode != 0:
        sys.exit(errcode)
    #sys.exit(0) # causes run failure!

セットアップ&実行

それでは、コードの概要がわかったので実際に試してみたいと思います。

1. Wheel作成

それでは example Directoryに移動して、Wheelを作成します。

python setup.py bdist_wheel

dist directoryにwheelファイルが作成されていれば成功です。

ls dist/
databricks_tests_example-0.0.1-py3-none-any.whl

2. DBFSへコードのアップロード

メインコードとWheelファイルをDBFS上にアップロードします。

run_tests.pyのコピー
databricks fs cp ../test-harness/databricks_test_harness/run_tests.py dbfs:/tmp/jmaru/jobs/myapp/run_tests.py --overwrite
wheelファイルのコピー
databricks fs cp dist/databricks_tests_example-0.0.1-py3-none-any.whl dbfs:/tmp/jmaru/jobs/myapp/databricks_tests_example-0.0.1-py3-none-any.whl

3. JOB(run submit)実行

こちらを実行すると、JOB用のクラスターが作成され、コードが実行されます。

databricks runs submit --json-file run_submit.json

実行すると、run_idが返ってきます。

{
  "run_id": 2404336
}

Jobのステータスはコマンドで確認します。

databricks runs get --run-id 2404336

image.png

Databricks workspace側でも実行結果が確認できます。(Jobs --> Job runs)
image.png

出力された結果(junit.xml)をダウンロードして確認します。

databricks fs cp dbfs:/tmp/jmaru/jobs/myapp/junit.xml .
junit.xml
<?xml version="1.0" ?>
<testsuites>
  <testsuite
     errors="0"
     failures="0"
     hostname="0716-190309-boll22-10-0-249-156"
     name="pytest"
     skipped="0"
     tests="2"
     time="8.975"
     timestamp="2020-07-16T19:06:17.155587">
     <testcase classname="test_simple.TestSimple"
       file="../python3/lib/python3.7/site-packages/tests/test_simple.py"
       line="3"
       name="test_simple"
       time="0.001"/>
     <testcase classname="test_spark.TestSpark"
       file="../python3/lib/python3.7/site-packages/tests/test_spark.py"
       line="6"
       name="test_multiply"
       time="8.829"/>
  </testsuite>
</testsuites>

最後に

上記は確認のためマニュアルでセットアップしていきましたが、これらを自動化するworkflowスクリプトも用意されております。
github actionsなどと組み合わせれば、CI/CDも実現可能です。

(追記)
GitHub Actionsと連携させた記事はこちらです。
https://qiita.com/maroon-db/items/48668d5f4c4b65ab6c1d

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1